jbpm4 jms

jbpm4 是怎么整合jms的呢,找了jbpm4给的几个文档,没看到有这样的说明或者例子,以前对jms用的很少,不是很懂这东西。

能够提供jbpm4 与jms整合的朋友提供下,在此谢过了。

jbpm4 jms

在开发手册里面找到的,不知道对你有用没,我最近也在用jbpm4.所以在学习中,一起努力吧。^_^

2.7. jms 活动

澄清一下:这个活动还不稳定。两个方面需要在 下个发布中重新审核:

  • 把没有非xa的JMS绑定到标准事务上。
  • 我们依然指出为什么在JBoss的java:JmsXA连接工厂 没有实现XAConnectionFactory。我们需要使用非xa的JMS API 来发送和接收消息。这是为什么我们在 我们的企业QA运行流程时设置transacted="false" (在文档中使用到了)。这是为什么我们使用jmsConsumeMessageFromQueue("java:JmsXA", "queue/jbpm-test-queue", 1000, false, Session.AUTO_ACKNOWLEDGE);方法 在我们的测试用例中,运行在JBoss(也在文档这里用到了)。

jms活动为用户提供了发送JMS消息的简单方法。 目前支持发送以下三种不同类型的JMS消息:文本,对象和map。 其他的消息属性还不支持。

表 2.5. jms 属性:

属性 类型 默认 必填 备注
connection-factory jndi名称   必填 jms连接工厂的jndi名称。
destination jndi名称   必填 jms队列或主题的jndi名称。
transacted boolean: {true, false} true 可选 指定JMS消息发送应该在事务内。 参考QueueConnection.createQueueSession(boolean transactional, int acknowledgeMode)
acknowledge { auto | client | dups-ok } auto optional 指定acknowledge模式 参考 QueueConnection.createQueueSession(boolean transactional, int acknowledgeMode)



 

这里有三种类型的JMS消息,你可以发送到目的地:text, object和map。 connection-factorydestination属性是强制的, 期待包含连接工厂和目的地(队列或主题)的名称, 它会用来在jndi中查找对应的对象。特定的消息会被发送到目的地,会通过查找获得。查找代码像下面这样:

InitialContext initialContext = new InitialContext();
Destination destination = (Destination) initialContext.lookup(destinationName);
Object connectionFactory = initialContext.lookup(connectionFactoryName);

jms 活动会使用JMS队列api,如果目的地是 一个Queue。这与主题类似。

jms 活动会使用XA JMS api,如果connectionFactory是 XAConnectionFactory的实例,与单纯的ConnectionFactory类似。

因此,如果你运行在一个appserver内部,new InitialContext() 就会查找到appserver内部的队列和主题配置。

当你使用单独测试模式中使用JMS mocking时, 可以使用JbpmTestCase.jmsCreateQueueJbpmTestCase.jmsCreateTopic创建队列和主题。

当你运行在一个远程应用客户端时,你应该使用 系统属性指定jndi环境

表 2.6. jms 元素:

元素 数目 备注
text 0..1 用来作为JMS消息发送的字符串。
object 0..1 用来作为JMS消息发送的序列化对象。
map 0..1 用来作为JMS消息发送的key-value map。



 

必须使用这些元素之一textobjectmap。 使用的元素会决定使用的消息类型,消息会被发送给上面提到的查找获得的队列。 这个消息会是一个TextMessageObjectMessageMapMessage

在下面的子章节中,支持的不同类型的消息会被解释。 三种类型的流程用法都是类似的。下面就是图形化的流程。

2.7.1. 为简化测试模拟JMS提供器

可以配置一个真实的JMS,确保可以通过JNDI查找到它, jms活动也可以使用模拟的JMS提供器来进行测试。 这样就更容易进行你的流程测试。

下面的测试帮助方法是基于单独的在原始的JMS api上实现,因此它们 也可以工作在标准工作环境里,就想在一个appserver环境里一样:

  • JbpmTestCase.jmsConsumeMessageFromQueue(String connectionFactoryJndiName, String queueJndiName) 使用默认的 1000, true, Session.AUTO_ACKNOWLEDGE 作为参数 timeout, transacted 和 acknowledgeMode的值
  • JbpmTestCase.jmsConsumeMessageFromQueue(String connectionFactoryJndiName, String queueJndiName, long timeout, boolean transacted, int acknowledgeMode)
  • JbpmTestCase.jmsConsumeMessageFromQueueXA(String connectionFactoryJndiName, String queueJndiName, long timeout)
  • JbpmTestCase.jmsAssertQueueEmpty(String connectionFactoryJndiName, String queueJndiName, long timeout, boolean transacted, int acknowledgeMode)
  • JbpmTestCase.jmsAssertQueueEmptyXA(String connectionFactoryJndiName, String queueJndiName, long timeout)
  • JbpmTestCase.jmsStartTopicListener(String connectionFactoryJndiName, String topicJndiName, boolean transacted, int acknowledgeMode)
  • JbpmTestCase.jmsStartTopicListenerXA(String connectionFactoryJndiName, String topicJndiName)
  • JmsTopicListener.getNextMesssage(long timeout)
  • JmsTopicListener.stop()

比如,在流程执行完jms活动以后, 消息可以像这样进行验证:

MapMessage mapMessage = (MapMessage)
  jmsConsumeMessageFromQueue("java:/JmsXA", "queue/ProductQueue");
assertEquals("shampoo", mapMessage.getString("product"));

下面的jms帮助方法是基于mockrunner的, 因此它们只工作在单独运行环境中:

(我们使用mockrunner协同工作,人们也可以让这些方法工作在appserver环境下)

  • void jmsCreateQueue(String connectionFactoryJndiName, String queueJndiName)
  • void jmsRemoveQueue(String connectionFactoryJndiName, String queueJndiName)
  • void jmsCreateTopic(String connectionFactoryJndiName, String topicJndiName)
  • void jmsRemoveTopic(String connectionFactoryJndiName, String topicJndiName)

比如,一个队列可以被创建和删除,在一个测试的setup和teardown方法中,像这样:

protected void setUp() throws Exception {
  super.setUp();
  jmsCreateQueue("java:/JmsXA", "queue/ProductQueue");
}

protected void tearDown() throws Exception {
  jmsRemoveQueue("java:/JmsXA", "queue/ProductQueue");
  super.tearDown();
}

2.7.2. 文本消息

第一个可以发送的JMS消息是使用文本作为载体。这种情况下 一个JMS TextMessage会被创建,并发送给指定的目的地。 参考下面的流程定义:

<process name="JmsQueueText">

  <start>
    <transition to="send message"/>
  </start>

  <jms name="send message"
       connection-factory="java:JmsXA"
       destination="queue/jbpm-test-queue"
       transacted="false">
    <text>This is the body</text>
    <transition to="wait"/>
  </jms>

  <state name="wait"/>

</process>

像你期待的一样,下面的测试用例中启动这个流程会导致 JMS节点发送一个消息到队列,使用"queue/jbpm-test-queue"名称。 用来创建连接的工厂来连接这个名为"Java:JmsXA"的队列。 消息的载体是文本"This is the body"。

executionService.startProcessInstanceByKey("JmsQueueText");

TextMessage textMessage = (TextMessage)
  jmsConsumeMessageFromQueue("java:JmsXA", "queue/jbpm-test-queue", 1000, false, Session.AUTO_ACKNOWLEDGE);
assertEquals("This is the body", textMessage.getText());

对应的代码在上面用粗体显示了。方法的其余部分是样板代码,需要设置一个消息消费者。 我们会在其他子章节中的例子中忽略这些代码。

2.7.3. 对象消息

第二个可选的是使用序列化对象作为消息的载体。 在这种情况下,一个JMS ObjectMessage会被创建和发送给指定的目的地。 参考下面的流程定义:

<process name="JmsQueueObject">

  <start>
    <transition to="send message"/>
  </start>

  <jms name="send message"
       connection-factory="java:JmsXA"
       destination="queue/jbpm-test-queue"
       transacted="false">
    <object expr="${object}"/>
    <transition to="wait"/>
  </jms>

  <state name="wait"/>

</process>

像上面例子中,消息会被发送到名为"queue/jbpm-test-queue"的队列。 也是使用了名为"Java:JmsXA"的连接工厂 来创建连接,并连接到队列。但是在这种情况下,消息的载体是序列化的对象, 通过执行expr属性中定义的表达式来获得这个对象。这个例子如下所示。

Map<String, Object> variables = new HashMap<String, Object>();
variables.put("object", "this is the object");
executionService.startProcessInstanceByKey("JmsQueueObject", variables);

ObjectMessage objectMessage = (ObjectMessage)
  jmsConsumeMessageFromQueue("java:JmsXA", "queue/jbpm-test-queue", 1000, false, Session.AUTO_ACKNOWLEDGE);
assertEquals("this is the object", objectMessage.getObject());

2.7.4. Map消息

第三个可能的载体是map的key-value结构。这次,一个JMS MapMessage 会被创建,发送到指定目的地。 参考下面的流程定义:

<process name="JmsQueueMap">

  <start>
    <transition to="send message"/>
  </start>

  <jms name="send message"
       connection-factory="java:JmsXA"
       destination="queue/jbpm-test-queue"
       transacted="false">
    <map>
      <entry>
        <key><string value="x"/></key>
        <value><string value="foo"/></value>
      </entry>
    </map>
    <transition to="wait"/>
  </jms>

  <state name="wait"/>

</process>

一个消息会被发送到名为"queue/jbpm-test-queue"的队列, 并且使用名为"Java:JmsXA"的工厂创建连接,链接到队列。 在这种情况,消息的载体是特定的key-value map。 这个例子如下所示。

executionService.startProcessInstanceByKey("JmsQueueMap");

MapMessage mapMessage = (MapMessage)
  jmsConsumeMessageFromQueue("java:JmsXA", "queue/jbpm-test-queue", 1000, false, Session.AUTO_ACKNOWLEDGE);
assertTrue(mapMessage.itemExists("x"));
assertEquals("foo", mapMessage.getObject("x"));