Chapter 6. Messaging (Gravity)

Chapter 6. Messaging (Gravity)

6.1. Example Usage with Consumer/Producer
6.2. Topics and Selectors
6.3. Common Configuration
6.3.1. Supported Application Servers
6.3.2. Advanced Configuration
6.3.3. Tomcat and JBoss/Tomcat Specific Configuration Tips
6.4. Integration with JMS
6.5. Using an Embedded ActiveMQ
6.6. Server to Client Publishing
6.7. Securing Messaging Destinations

Granite Data Services provides a messaging feature, code name Gravity, implemented as a -like service with AMF3 data polling over HTTP (producer/consumer based architecture). This implementation is freely based on the protocol specification (1.0draft1 at this time) and adapted from the Jetty 6.1.x implementation of a comet server.

For a basic sample of GDS/Gravity, download graniteds-***.zip and import the examples/graniteds_chat as a new project in Eclipse.

GraniteDS messaging relies on two main AS3 components on the Flex side: org.granite.gravity.Consumer and org.granite.gravity.Producer. These classes reproduce almost exactly the original Adobe Flex and with the specific internal implementation of GraniteDS. The only differences are that you must use topic instead of subtopic due to a change introduced in Flex 3.

Here is a quick example of GDS Consumer/Producer usage:

...
import org.granite.gravity.Consumer;
import org.granite.gravity.Producer;
...
private var consumer:Consumer = null;
private var producer:Producer = null;

private function connect():void {
    consumer = new Consumer();
    consumer.destination = "gravity";
    consumer.topic = "discussion";
    consumer.subscribe();
    consumer.addEventListener(MessageEvent.MESSAGE, messageHandler);

    producer = new Producer();
    producer.destination = "gravity";
    producer.topic = "discussion";
}

private function disconnect():void {
    consumer.unsubscribe();
    consumer.disconnect();
    consumer = null;

    producer.disconnect();
    producer = null;
}

private function messageHandler(event:MessageEvent):void {
    var msg:AsyncMessage = event.message as AsyncMessage;    
    trace("Received message: " + (msg.body as String));
}

private function send(message:String):void {
    var msg:AsyncMessage = new AsyncMessage();
    msg.body = message;
    producer.send(msg);
}
...
	   

In this code, the producer sends String messages, which could of course be of any type, and the producer receives String messages as well. These Strings are sent in AsyncMessage envelopes, which is the only envelope type allowed in GDS.

By default all messages sent by a producer are transmitted to all subscribed consumers. In most cases you will want to more finely control how the messages are routed. There are two main ways of doing this: the easiest is the topic and the most advanced is by using selectors.

Topics are a way to divide the destination in many parts. When a producer sends a message on a particular topic, only the consumers attached to this topic will receive the message. For example, if you have a destination for quotes, you could have a topic for each country:

var producer:Producer = new Producer();
producer.destination = "quotes";
producer.topic = "/germany";
producer.send(message);

var consumerGermany:Consumer = new Consumer();
consumerGermany.destination = "quotes";
consumerGermany.topic = "/germany";
consumerGermany.subscribe();

var consumerFrance:Consumer = new Consumer();
consumerFrance.destination = "quotes";
consumerFrance.topic = "/france";
consumerFrance.subscribe();
        

Here only consumerGermany will receive the messages published by the producer. Note the slash (/) to start the name of the topic. You can define more sections for the topic name and use wildcards (*) and (**) to match a part of the topic. For example you could define a hierarchy /europe/germany, /europe/france, /america/US, and define a consumer for the topic /europe/* that will receive only messages for Germany and France. Finally a consumer with /** will receive everything, whatever topic is used by the producer.

Topics are a simple way of filtering the message, but in some cases you may want to use more sophisticated rules to route the messages from producers to consumers. Gravity uses the concept of message selectors from JMS to do this. It works by defining a SQL-like select string that will define the criteria that a consumer wants on the message headers.

A consumer can specify its message selector before it subscribes to the destination:

var consumerFrance:Consumer = new Consumer();
consumerFrance.destination = "quotes";
consumerFrance.selector = "COUNTRY = 'France'";
consumerFrance.subscribe();
        

This consumer will receive all messages that have a header named COUNTRY with the value France. Many header values can be combined in the selector with AND and OR, and you can use operators. See for details.

There are three main steps to configure Gravity in an application:



<web-app version="2.4" ...>
    ...
    <listener>
        <listener-class>org.granite.config.GraniteConfigListener</listener-class>
    </listener>

    <servlet>
        <servlet-name>GravityServlet</servlet-name>
        <servlet-class>org.granite.gravity.tomcat.GravityTomcatServlet</servlet-class>
    </servlet>
    <servlet-mapping>
        <servlet-name>GravityServlet</servlet-name>
        <url-pattern>/gravityamf/*</url-pattern>
    </servlet-mapping>
    ...
</web-app>
       

This declaration is the one specific to the Tomcat application server. See below for all available Gravity servlet implementations.

<services-config>
    <services>
        <service id="messaging-service"
            class="flex.messaging.services.MessagingService"
            messageTypes="flex.messaging.messages.AsyncMessage">
            <adapters>
                <adapter-definition
                    id="default"
                    class="org.granite.gravity.adapters.SimpleServiceAdapter"
                    default="true"/>
            </adapters>

            <destination id="topic">
                <channels>
                    <channel ref="my-gravityamf"/>
                </channels>
            </destination>
        </service>
    </services>

    <channels>
        <channel-definition
            id="my-gravityamf"
            class="org.granite.gravity.channels.GravityChannel">
            <endpoint
                uri="http://{server.name}:{server.port}/{context.root}/gravityamf/amf"
                class="flex.messaging.endpoints.AMFEndpoint"/>
        </channel-definition>
    </channels>
</services-config>
	   

Here, we define a GravityChannel (my-gravityamf) and we use it in the destination named topic. See above destination usage in Consumer/Producer usage.

The topic we have defined uses the default Gravity adapter SimpleServiceAdapter that is a simple fast in-memory message bus. If you need more advanced features such as persistent messages or clustering, you should consider using a dedicated messaging implementation such as .

The simple adapter exposes two configuration properties:

  • no-local: default is true, if set to false the client producing messages will receive their own messages
  • session-selector: this is an advanced option and instructs Gravity to store the message selector string in the user session. This allows the server part of the application to override the selector string defined by the Flex Consumer. The selector is stored and read from the session attribute named org.granite.gravity.selector.{destinationId}.

GraniteDS provides a generic servlet implementation that can work in any compliant servlet container. However it will use blocking IO and thus will provide relatively limited scalability.

Before the release of the Servlet 3.0 specification, there was no standard way of writing asynchronous non blocking servlets and each server provided its own specific API (for example Tomcat CometProcessor or Jetty continuations). GraniteDS thus provides implementations of non blocking messaging for the most popular application servers.

Here is the table of the supported implementations:

Application serverServlet classSpecific notes
Tomcat 6.0.18+org.granite.gravity.tomcat.GravityTomcatServletOnly with APR/NIO enabled (APR highly recommended)
JBoss 4.2.xorg.granite.gravity.tomcat.GravityTomcatServletAPR/NIO, disable CommonHeadersFilter
Jetty 6.1.xorg.granite.gravity.jetty.GravityJettyServletJetty 7 not supported, Jetty 8 using Servlet 3 API
JBoss 5+org.granite.gravity.jbossweb.GravityJBossWebServletOnly with APR/NIO enabled (APR highly recommended)
WebLogic 9.1+org.granite.gravity.weblogic.GravityWebLogicServletSee WebLogic documentation for configuration tuning
GlassFish 3.xorg.granite.gravity.async.GravityAsyncServletUsing Servlet 3.0
Tomcat 7.x / Jetty 8.xorg.granite.gravity.async.GravityAsyncServletUsing Servlet 3.0
Any otherorg.granite.gravity.generic.GravityGenericServletUsing blocking I/O (no asynchronous support)

Whichever Gravity servlet implementation is used in your application, the advanced configuration is done in granite-config.xml. Here is a sample Gravity configuration with all default options:



<?xml version="1.0" encoding="UTF-8"?>

<!DOCTYPE granite-config PUBLIC "-//Granite Data Services//DTD granite-config internal//EN"
    "http://www.graniteds.org/public/dtd/2.3.0/granite-config.dtd">

<granite-config>

    <gravity
        factory="org.granite.gravity.DefaultGravityFactory"
        channel-idle-timeout-millis="1800000"
        long-polling-timeout-millis="20000"
        reconnect-interval-millis="30000"
        reconnect-max-attempts="60">
        
        <thread-pool
            core-pool-size="5"
            maximum-pool-size="20"
            keep-alive-time-millis="10000"
            queue-capacity="2147483647" />
        
    </gravity>

</granite-config>
           

This <gravity> section is purely optional and you may omit it if you accept default values.

Some explanations about these options:

All other configuration options are for advanced use only and you should keep default values.

GraniteDS messaging for Tomcat relies on the org.apache.catalina.CometProcessor interface. In order to enable Comet support in Tomcat, you must configure an .

At least for now, APR is the easiest to configure and the most reliable. To configure APR, see documentation . On Windows®, it's simply a matter of downloading a native and putting it in your WINDOWS/system32 directory – while other and better configurations are possible. For more recent versions of Tomcat such as the one embedded in JBoss 5 or 6, you will need the latest APR library, see here.

For JBoss 4.2.*, you must comment out a specific filter in the default global web.xml (<JBOSS_HOME>/server/default/deploy/jboss-web.deployer/conf/web.xml):



...
<!-- Comment this out!
<filter>
  <filter-name>CommonHeadersFilter</filter-name>
  <filter-class>org.jboss.web.tomcat.filters.ReplyHeaderFilter</filter-class>
  <init-param>
    <param-name>X-Powered-By</param-name>
    <param-value>...</param-value>
  </init-param>
</filter>

<filter-mapping>
  <filter-name>CommonHeadersFilter</filter-name>
  <url-pattern>/*</url-pattern>
</filter-mapping>
-->
...
            

See above for Tomcat configuration.

For JBoss 5+ servers, you must use a specific servlet. JBoss 5 implements its own version of Tomcat, named JBossWeb:



<web-app version="2.4" ...>
    ...
    <servlet>
        <servlet-name>GravityServlet</servlet-name>
        <servlet-class>org.granite.gravity.jbossweb.GravityJBossWebServlet</servlet-class>
        ... (see Tomcat configuration above for options)
    </servlet>
    ...
</web-app>
            

Note that you do not need to comment out the CommonHeadersFilter with JBoss 5, but you still need to enable APR.

The GraniteDS JMS adapter configuration follows as closely as possible the standard Adobe Flex configuration for the JMS adapter. See .

Here is a sample configuration for a default JBoss installation with a brief description of the different options:



<adapters>
  <adapter-definition id="jms" class="org.granite.gravity.adapters.JMSServiceAdapter"/>
</adapters>

<destination id="chat-jms">
  <properties>
    <jms>
      <destination-type>Topic</destination-type>
      <!-- Optional: forces usage of simple text messages
      <message-type>javax.jms.TextMessage</message-type>
      -->
      <connection-factory>ConnectionFactory</connection-factory>
      <destination-jndi-name>topic/testTopic</destination-jndi-name>
      <destination-name>TestTopic</destination-name>
      <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
      <transacted-sessions>false</transacted-sessions>
      <!-- Optional JNDI environment. Specify the external JNDI configuration to access 
        a remote JMS provider. Sample for a remote JBoss server.
      -->
      <initial-context-environment>
        <property>
          <name>Context.SECURITY_PRINCIPAL</name>
          <value>guest</value>
        </property>
        <property>
          <name>Context.SECURITY_CREDENTIALS</name>
          <value>guest</value>
        </property>
        <property>
          <name>Context.PROVIDER_URL</name>
          <value>http://my.host.com:1099</value>
        </property>
        <property>
          <name>Context.INITIAL_CONTEXT_FACTORY</name>
          <value>org.jnp.interfaces.NamingContextFactory</value>
        </property>
        <property>
          <name>Context.URL_PKG_PREFIXES</name>
          <value>org.jboss.naming:org.jnp.interfaces</value>
        </property>
      </initial-context-environment>
    </jms>
    ...
  </properties>
  ...
  <adapter ref="jms"/>
</destination>
        

Comments on configuration options:

  • destination-type must be Topic for the moment. Queues may be supported later.
  • message-type may be forced to simple text messages by specifying javax.jms.TextMessage.
  • connection-factory and destination-jndi-name are the JNDI names respectively of the JMS ConnectionFactory and of the JMS topic.
  • destination-name is just a label but still required.
  • acknowledge-mode can have the standard values accepted by any JMS provider: AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, and DUPS_OK_ACKNOWLEDGE.
  • transacted-sessions allows the use of transactions in sessions when set to true.
  • initial-context-environment: The initial-context parameters allow to access a remote JMS server by setting the JNDI context options.

Note

The JMS headers are always copied between Flex and JMS messages

Note

Durable subscriptions are not yet supported

In the case of a simple Tomcat/Jetty installation without JMS provider, or to allow Flex-to-Flex interactions with advanced capabilities such as durable messages, Gravity can be integrated with an embedded Apache ActiveMQ instance.

To enable ActiveMQ, just put the activemq-xx.jar in your WEB-INF/lib directory. The necessary topic will be lazily created on first use, except if the property create-broker is set to false. The uri of the created ActiveMQ broker will be vm://adapterId.

Here is a sample configuration:



<adapters>
  <adapter-definition
    id="activemq"
    class="org.granite.gravity.adapters.ActiveMQServiceAdapter"/>
</adapters>

<destination id="chat-activemq">
  <properties>
    <jms>
      <destination-type>Topic</destination-type>
      <!-- Optional: forces usage of simple text messages
      <message-type>javax.jms.TextMessage</message-type>
      -->
      <connection-factory>ConnectionFactory</connection-factory>
      <destination-jndi-name>topic/testTopic</destination-jndi-name>
      <destination-name>TestTopic</destination-name>
      <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
      <transacted-sessions>false</transacted-sessions>
    </jms>
    
    <server>
      <durable>true</durable>
      <file-store-root>/var/activemq/data</file-store-root>
      <create-broker>true</create-broker>
      <wait-for-start>false</wait-for-start>
    </server>
  </properties>
  ...
  <adapter ref="activemq"/>
</destination>
        

Comments on configuration options:

  • The main parameters (<jms>...</jms>) are identical to those used in the default JMS configuration. See above.
  • durable, if set to true, allows for durable messages, stored in the filesystem. The data store directory of ActiveMQ can be specified by the file-store-root parameter.
  • create-broker is optional, as well as the dependant wait-for-start attribute. When create-broker is false, creation of the broker is not automatic and has to be done by the application itself. In this case, wait-for-start set to true tells the ActiveMQConnectionFactory to wait for the effective creation of the broker. Please refer to the ActiveMQ documentation for more details on these options.

There are mostly two kinds of requirements for messaging: client-to-client interactions, that can be easily handled by the Consumer/Producer pattern, and server-to-client push that can be done with either the low-level Gravity API or directly using the JMS API when the JMS adapter is used.

Server to Client Messaging with the Low-level Gravity API

If you use the SimpleAdapter, the message sending will have to be done at a lower level and you will need a compilation dependency on the Gravity API. It's also possible but not recommended to use this low-level API with the JMS and ActiveMQ adapters. It first requires to get the Gravity object from the ServletContext. It is set as an attribute named org.granite.gravity.Gravity. When using Spring, Seam 2 or CDI, you can also get this object by injection (see the corresponding documentation). Then you can send messages of type flex.messaging.messages.Message by calling the method gravity.publish(message);.



Gravity gravity = GravityManager.getGravity(servletContext);
AsyncMessage message = new AsyncMessage();
message.setDestination("my-gravity-destination");
message.setHeader(AsyncMessage.SUBTOPIC_HEADER, "my-topic");
message.setBody("Message content");
gravity.publishMessage(message);
        

It you need to simulate a publish from the client subscribed in the current session, you can get the clientId in the session attribute named org.granite.gravity.channel.clientId.{destination} and set it in the message.

Server to Client Messaging with JMS

Sending messages from the server to Flex clients simply consists of sending JMS messages to the corresponding JMS topic. Text messages are received as simple text on the Flex side, object messages are serialized in AMF3 and deserialized and received as ActionScript 3 objects. The Gravity messaging channel supports lazily loaded collections and objects, exactly as the Granite remoting channel.

Here is an example on an EJB3 sending a message:



@Stateless
@Local(Test.class)
public class TestBean implements Test {
    @Resource
    SessionContext ctx;
    @Resource(mappedName="java:/ConnectionFactory")
    ConnectionFactory jmsConnectionFactory;
    @Resource(mappedName="topic/testTopic")
    Topic jmsTopic;
    public TestBean() {
       super();
    }
    public void notifyClient(Object object) {
        try {
            Connection connection = jmsConnectionFactory.createConnection();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            javax.jms.Message jmsMessage = session.createObjectMessage(person);
            MessageProducer producer = session.createProducer(jmsTopic);
            producer.send(jmsMessage);
            session.close();
            connection.close();
        }
        catch (Exception e) {
            log.error("Could not publish notification", e);
        }
    }
}
        

Here is an example on a Seam 2 component sending a message:



@Stateless
@Local(Test.class)
@Name("test")
public class TestBean implements Test {
    private static Logger log = Logger.getLogger(TestBean.class.getName());
    @In
    private TopicPublisher testTopicPublisher;   
    @In 
    private TopicSession topicSession;
  
    public void notifyClient(Serializable object) {
        try {
            testTopicPublisher.publish(topicSession.createObjectMessage(object));
        } 
        catch (Exception e) {
            log.error("Could not publish notification", e);
        }
    }
}
        
Server to Client Messaging with Embedded ActiveMQ

The only difference with standard JMS is that you can get a ConnectionFactory more easily. Also ActiveMQ supports subtopics. The name of the topic is built with the following rule:


public class Test throws JMSException {
    // adapterId should be the id of the JMS adapter as defined in services-config.xml
    ConnectionFactory f = new ActiveMQConnectionFactory("vm://adapterId");
    Connection connection = jmsConnectionFactory.createConnection();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    ActiveMQTopic activeMQTopic= new ActiveMQTopic("destination");
    javax.jms.Message jmsMessage = session.createObjectMessage(person);
    MessageProducer producer = session.createProducer(activeMQTopic);
    producer.send(jmsMessage);
    session.close();
    connection.close();
}
        

Securing messaging destination is very similar to security remoting destinations (see here) and most concepts apply to messaging services as well as remoting services.

You can for example setup role-based security on a Gravity destination with the following definition in services-config.xml:



<?xml version="1.0" encoding="UTF-8"?>
<services-config>
    <services>
        <service id="messaging-service"
            class="flex.messaging.services.MessagingService"
            messageTypes="flex.messaging.messages.AsyncMessage">
            <adapters>
                <adapter-definition
                    id="default"
                    class="org.granite.gravity.adapters.SimpleServiceAdapter"
                    default="true"/>
            </adapters>
            
            <destination id="restrictedTopic">
                <channels>
                    <channel ref="my-gravityamf"/>
                </channels>
                <security>
                    <security-constraint>
                        <auth-method>Custom</auth-method>
                        <roles>
                            <role>admin</role>
                        </roles>
                    </security-constraint>
                </security>
            </destination>
        </service>
    </services>
    ...
</services-config>
        

In this case, only users with the role admin will be able to subscribe to the topic restrictedTopic.

Fine-grained Per-destination Security

You may write and configure a specific GravityDestinationSecurizer in order to add fine grained security checks for specific actions. In particular you can control who can subscribe or publish messages to a particular topic.


public interface GravityDestinationSecurizer extends DestinationSecurizer {
    public void canSubscribe(GravityInvocationContext context)
        throws SecurityServiceException;
    public void canPublish(GravityInvocationContext context)
        throws SecurityServiceException;
}
        

You then have to tell GraniteDS where to use your securizer:



<services-config>
    <services>
        <service ...>
            <destination id="restrictedDestination">
                ...
                <properties>
                    <securizer>path.to.MyDestinationSecurizer</securizer>
                </properties>
            </destination>
        </service>
    </services>
    ...
</services-config>