Granite Data Services provides a messaging feature, code name Gravity, implemented as a -like service with AMF3 data polling over HTTP (producer/consumer based architecture). This implementation is freely based on the protocol specification (1.0draft1 at this time) and adapted from the Jetty 6.1.x implementation of a comet server.
For a basic sample of GDS/Gravity, download graniteds-***.zip
and import the examples/graniteds_chat
as a new project in Eclipse.
GraniteDS messaging relies on two main AS3 components on the Flex side: org.granite.gravity.Consumer
and org.granite.gravity.Producer
. These classes reproduce almost exactly the original Adobe Flex
and with the specific
internal implementation of GraniteDS.
The only differences are that you must use topic
instead of subtopic
due to a change introduced in Flex 3.
Here is a quick example of GDS Consumer
/Producer
usage:
... import org.granite.gravity.Consumer; import org.granite.gravity.Producer; ... private var consumer:Consumer = null; private var producer:Producer = null; private function connect():void { consumer = new Consumer(); consumer.destination = "gravity"; consumer.topic = "discussion"; consumer.subscribe(); consumer.addEventListener(MessageEvent.MESSAGE, messageHandler); producer = new Producer(); producer.destination = "gravity"; producer.topic = "discussion"; } private function disconnect():void { consumer.unsubscribe(); consumer.disconnect(); consumer = null; producer.disconnect(); producer = null; } private function messageHandler(event:MessageEvent):void { var msg:AsyncMessage = event.message as AsyncMessage; trace("Received message: " + (msg.body as String)); } private function send(message:String):void { var msg:AsyncMessage = new AsyncMessage(); msg.body = message; producer.send(msg); } ...
In this code, the producer sends String
messages, which could of course be of any type, and the producer receives
String
messages as well. These Strings
are sent in AsyncMessage
envelopes,
which is the only envelope type allowed in GDS.
var producer:Producer = new Producer(); producer.destination = "quotes"; producer.topic = "/germany"; producer.send(message); var consumerGermany:Consumer = new Consumer(); consumerGermany.destination = "quotes"; consumerGermany.topic = "/germany"; consumerGermany.subscribe(); var consumerFrance:Consumer = new Consumer(); consumerFrance.destination = "quotes"; consumerFrance.topic = "/france"; consumerFrance.subscribe();
A consumer can specify its message selector before it subscribes to the destination:
var consumerFrance:Consumer = new Consumer(); consumerFrance.destination = "quotes"; consumerFrance.selector = "COUNTRY = 'France'"; consumerFrance.subscribe();
There are three main steps to configure Gravity in an application:
<web-app version="2.4" ...>
...
<listener>
<listener-class>org.granite.config.GraniteConfigListener</listener-class>
</listener>
<servlet>
<servlet-name>GravityServlet</servlet-name>
<servlet-class>org.granite.gravity.tomcat.GravityTomcatServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>GravityServlet</servlet-name>
<url-pattern>/gravityamf/*</url-pattern>
</servlet-mapping>
...
</web-app>
<services-config> <services> <service id="messaging-service" class="flex.messaging.services.MessagingService" messageTypes="flex.messaging.messages.AsyncMessage"> <adapters> <adapter-definition id="default" class="org.granite.gravity.adapters.SimpleServiceAdapter" default="true"/> </adapters> <destination id="topic"> <channels> <channel ref="my-gravityamf"/> </channels> </destination> </service> </services> <channels> <channel-definition id="my-gravityamf" class="org.granite.gravity.channels.GravityChannel"> <endpoint uri="http://{server.name}:{server.port}/{context.root}/gravityamf/amf" class="flex.messaging.endpoints.AMFEndpoint"/> </channel-definition> </channels> </services-config>
Here, we define a GravityChannel
(my-gravityamf
) and we use it in the destination named topic
.
See above destination usage in Consumer/Producer usage.
The topic we have defined uses the default Gravity adapter SimpleServiceAdapter
that is a simple fast in-memory message bus. If you need
more advanced features such as persistent messages or clustering, you should consider using a dedicated messaging implementation such as
.
The simple adapter exposes two configuration properties:
no-local
: default is true
, if set to false
the client producing messages will receive their
own messages
session-selector
: this is an advanced option and instructs Gravity to store the message selector string in the user session. This allows
the server part of the application to override the selector string defined by the Flex Consumer
.
The selector is stored and read from the session attribute named org.granite.gravity.selector.{destinationId}
.
Here is the table of the supported implementations:
Application server | Servlet class | Specific notes |
---|---|---|
Tomcat 6.0.18+ | org.granite.gravity.tomcat.GravityTomcatServlet | Only with APR/NIO enabled (APR highly recommended) |
JBoss 4.2.x | org.granite.gravity.tomcat.GravityTomcatServlet | APR/NIO, disable CommonHeadersFilter |
Jetty 6.1.x | org.granite.gravity.jetty.GravityJettyServlet | Jetty 7 not supported, Jetty 8 using Servlet 3 API |
JBoss 5+ | org.granite.gravity.jbossweb.GravityJBossWebServlet | Only with APR/NIO enabled (APR highly recommended) |
WebLogic 9.1+ | org.granite.gravity.weblogic.GravityWebLogicServlet | See WebLogic documentation for configuration tuning |
GlassFish 3.x | org.granite.gravity.async.GravityAsyncServlet | Using Servlet 3.0 |
Tomcat 7.x / Jetty 8.x | org.granite.gravity.async.GravityAsyncServlet | Using Servlet 3.0 |
Any other | org.granite.gravity.generic.GravityGenericServlet | Using blocking I/O (no asynchronous support) |
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE granite-config PUBLIC "-//Granite Data Services//DTD granite-config internal//EN"
"http://www.graniteds.org/public/dtd/2.3.0/granite-config.dtd">
<granite-config>
<gravity
factory="org.granite.gravity.DefaultGravityFactory"
channel-idle-timeout-millis="1800000"
long-polling-timeout-millis="20000"
reconnect-interval-millis="30000"
reconnect-max-attempts="60">
<thread-pool
core-pool-size="5"
maximum-pool-size="20"
keep-alive-time-millis="10000"
queue-capacity="2147483647" />
</gravity>
</granite-config>
This <gravity> section is purely optional and you may omit it if you accept default values.
Some explanations about these options:
channel-idle-timeout-millis
: the elapsed time after which an idle channel (pure producer or dead client)
may be silently unsubcribed and removed by Gravity. Default is 30 minutes.
long-polling-timeout-millis
: the elapsed time after which an idle connect request is closed, asking the client to reconnect.
Default is 20 seconds. Note that setting this value isn't supported in Tomcat/APR configurations.
thread-pool
attributes: all options are standard parameters for the Gravity ThreadPoolExecutor
instance.
All other configuration options are for advanced use only and you should keep default values.
At least for now, APR is the easiest to configure and the most reliable.
To configure APR, see documentation .
On Windows®, it's simply a matter of downloading a native and putting it
in your WINDOWS/system32
directory – while other and better configurations are possible.
For more recent versions of Tomcat such as the one embedded in JBoss 5 or 6, you will need the latest APR library,
see here.
For JBoss 4.2.*, you must comment out a specific filter in the default global web.xml
(<JBOSS_HOME>/server/default/deploy/jboss-web.deployer/conf/web.xml
):
...
<!-- Comment this out!
<filter>
<filter-name>CommonHeadersFilter</filter-name>
<filter-class>org.jboss.web.tomcat.filters.ReplyHeaderFilter</filter-class>
<init-param>
<param-name>X-Powered-By</param-name>
<param-value>...</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>CommonHeadersFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
-->
...
See above for Tomcat configuration.
For JBoss 5+ servers, you must use a specific servlet. JBoss 5 implements its own version of Tomcat, named JBossWeb:
<web-app version="2.4" ...>
...
<servlet>
<servlet-name>GravityServlet</servlet-name>
<servlet-class>org.granite.gravity.jbossweb.GravityJBossWebServlet</servlet-class>
... (see Tomcat configuration above for options)
</servlet>
...
</web-app>
Note that you do not need to comment out the CommonHeadersFilter
with JBoss 5, but you still need to enable APR.
Here is a sample configuration for a default JBoss installation with a brief description of the different options:
<adapters>
<adapter-definition id="jms" class="org.granite.gravity.adapters.JMSServiceAdapter"/>
</adapters>
<destination id="chat-jms">
<properties>
<jms>
<destination-type>Topic</destination-type>
<!-- Optional: forces usage of simple text messages
<message-type>javax.jms.TextMessage</message-type>
-->
<connection-factory>ConnectionFactory</connection-factory>
<destination-jndi-name>topic/testTopic</destination-jndi-name>
<destination-name>TestTopic</destination-name>
<acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
<transacted-sessions>false</transacted-sessions>
<!-- Optional JNDI environment. Specify the external JNDI configuration to access
a remote JMS provider. Sample for a remote JBoss server.
-->
<initial-context-environment>
<property>
<name>Context.SECURITY_PRINCIPAL</name>
<value>guest</value>
</property>
<property>
<name>Context.SECURITY_CREDENTIALS</name>
<value>guest</value>
</property>
<property>
<name>Context.PROVIDER_URL</name>
<value>http://my.host.com:1099</value>
</property>
<property>
<name>Context.INITIAL_CONTEXT_FACTORY</name>
<value>org.jnp.interfaces.NamingContextFactory</value>
</property>
<property>
<name>Context.URL_PKG_PREFIXES</name>
<value>org.jboss.naming:org.jnp.interfaces</value>
</property>
</initial-context-environment>
</jms>
...
</properties>
...
<adapter ref="jms"/>
</destination>
Comments on configuration options:
destination-type
must be Topic
for the moment. Queues may be supported later.
message-type
may be forced to simple text messages by specifying javax.jms.TextMessage
.
connection-factory
and destination-jndi-name
are the JNDI names respectively of the JMS ConnectionFactory
and of the JMS topic.
destination-name
is just a label but still required.
acknowledge-mode
can have the standard values accepted by any JMS provider: AUTO_ACKNOWLEDGE
,
CLIENT_ACKNOWLEDGE
, and DUPS_OK_ACKNOWLEDGE
.
transacted-sessions
allows the use of transactions in sessions when set to true
.
initial-context-environment
: The initial-context
parameters allow to access a remote JMS server
by setting the JNDI context options.
The JMS headers are always copied between Flex and JMS messages
Durable subscriptions are not yet supported
Here is a sample configuration:
<adapters>
<adapter-definition
id="activemq"
class="org.granite.gravity.adapters.ActiveMQServiceAdapter"/>
</adapters>
<destination id="chat-activemq">
<properties>
<jms>
<destination-type>Topic</destination-type>
<!-- Optional: forces usage of simple text messages
<message-type>javax.jms.TextMessage</message-type>
-->
<connection-factory>ConnectionFactory</connection-factory>
<destination-jndi-name>topic/testTopic</destination-jndi-name>
<destination-name>TestTopic</destination-name>
<acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
<transacted-sessions>false</transacted-sessions>
</jms>
<server>
<durable>true</durable>
<file-store-root>/var/activemq/data</file-store-root>
<create-broker>true</create-broker>
<wait-for-start>false</wait-for-start>
</server>
</properties>
...
<adapter ref="activemq"/>
</destination>
Comments on configuration options:
<jms>...</jms>
) are identical to those used in the default JMS configuration.
See above.
durable
, if set to true
, allows for durable messages, stored in the filesystem.
The data store directory of ActiveMQ can be specified by the file-store-root
parameter.
create-broker
is optional, as well as the dependant wait-for-start
attribute.
When create-broker
is false
, creation of the broker is not automatic and has to be done by the application itself.
In this case, wait-for-start
set to true
tells the ActiveMQConnectionFactory
to wait for the effective creation of the broker. Please refer to the ActiveMQ documentation for more details on these options.
Gravity gravity = GravityManager.getGravity(servletContext);
AsyncMessage message = new AsyncMessage();
message.setDestination("my-gravity-destination");
message.setHeader(AsyncMessage.SUBTOPIC_HEADER, "my-topic");
message.setBody("Message content");
gravity.publishMessage(message);
Here is an example on an EJB3 sending a message:
@Stateless
@Local(Test.class)
public class TestBean implements Test {
@Resource
SessionContext ctx;
@Resource(mappedName="java:/ConnectionFactory")
ConnectionFactory jmsConnectionFactory;
@Resource(mappedName="topic/testTopic")
Topic jmsTopic;
public TestBean() {
super();
}
public void notifyClient(Object object) {
try {
Connection connection = jmsConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Message jmsMessage = session.createObjectMessage(person);
MessageProducer producer = session.createProducer(jmsTopic);
producer.send(jmsMessage);
session.close();
connection.close();
}
catch (Exception e) {
log.error("Could not publish notification", e);
}
}
}
Here is an example on a Seam 2 component sending a message:
@Stateless
@Local(Test.class)
@Name("test")
public class TestBean implements Test {
private static Logger log = Logger.getLogger(TestBean.class.getName());
@In
private TopicPublisher testTopicPublisher;
@In
private TopicSession topicSession;
public void notifyClient(Serializable object) {
try {
testTopicPublisher.publish(topicSession.createObjectMessage(object));
}
catch (Exception e) {
log.error("Could not publish notification", e);
}
}
}
jms/destination-name
configuration parameter.
destination-name
parameter with the subtopic
.
Wildcards are supported in the subtopic
following Flex convention and are converted to the ActiveMQ format
(see ), meaning that toto.**
is converted to
toto.>
.
public class Test throws JMSException {
// adapterId should be the id of the JMS adapter as defined in services-config.xml
ConnectionFactory f = new ActiveMQConnectionFactory("vm://adapterId");
Connection connection = jmsConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic activeMQTopic= new ActiveMQTopic("destination");
javax.jms.Message jmsMessage = session.createObjectMessage(person);
MessageProducer producer = session.createProducer(activeMQTopic);
producer.send(jmsMessage);
session.close();
connection.close();
}
Securing messaging destination is very similar to security remoting destinations (see here) and most concepts apply to messaging services as well as remoting services.
You can for example setup role-based security on a Gravity destination with the following definition in services-config.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<services-config>
<services>
<service id="messaging-service"
class="flex.messaging.services.MessagingService"
messageTypes="flex.messaging.messages.AsyncMessage">
<adapters>
<adapter-definition
id="default"
class="org.granite.gravity.adapters.SimpleServiceAdapter"
default="true"/>
</adapters>
<destination id="restrictedTopic">
<channels>
<channel ref="my-gravityamf"/>
</channels>
<security>
<security-constraint>
<auth-method>Custom</auth-method>
<roles>
<role>admin</role>
</roles>
</security-constraint>
</security>
</destination>
</service>
</services>
...
</services-config>
In this case, only users with the role admin
will be able to subscribe to the topic restrictedTopic
.
You may write and configure a specific GravityDestinationSecurizer
in order to add fine grained security checks for specific actions.
In particular you can control who can subscribe or publish messages to a particular topic.
public interface GravityDestinationSecurizer extends DestinationSecurizer {
public void canSubscribe(GravityInvocationContext context)
throws SecurityServiceException;
public void canPublish(GravityInvocationContext context)
throws SecurityServiceException;
}
You then have to tell GraniteDS where to use your securizer:
<services-config>
<services>
<service ...>
<destination id="restrictedDestination">
...
<properties>
<securizer>path.to.MyDestinationSecurizer</securizer>
</properties>
</destination>
</service>
</services>
...
</services-config>