Contents
Overview
Connecting streams to JMS with Reactive Messaging couldn’t be easier.
Maven Coordinates
To enable JMS Connector,
add the following dependency to your project’s pom.xml (see
Managing Dependencies).
<dependency>
<groupId>io.helidon.messaging.jms</groupId>
<artifactId>helidon-messaging-jms</artifactId>
</dependency>
Configuration
Connector name: helidon-jms
|
User name used to connect JMS session |
|
Password to connect JMS session |
|
Possible values are: |
|
Queue or topic name |
|
Possible values are: |
|
Indicates whether the session will use a local transaction. Default value: |
|
JMS API message selector expression based on a subset of the SQL92. Expression can only access headers and properties, not the payload. |
|
Client identifier for JMS connection. |
|
True for creating durable consumer (only for topic). Default value: |
|
Subscriber name for durable consumer used to identify subscription. |
|
If true then any messages published to the topic using this session’s connection,
or any other connection with the same client identifier,
will not be added to the durable subscription. Default value: |
|
Select in case factory is injected as a named bean or configured with name. |
|
Timeout for polling for next message in every poll cycle in millis. Default value: |
|
Period for executing poll cycles in millis. Default value: |
|
When multiple channels share same |
|
JNDI name of JMS factory. |
|
JNDI destination identifier. |
|
Environment properties used for creating initial context |
|
property with producer prefix is set to producer instance (for example WLS Unit-of-Order |
Configured JMS factory
The simplest possible usage is looking up JMS ConnectionFactory in the naming context.
mp.messaging:
incoming.from-jms:
connector: helidon-jms
destination: messaging-test-queue-1
type: queue
outgoing.to-jms:
connector: helidon-jms
destination: messaging-test-queue-1
type: queue
connector:
helidon-jms:
user: Gandalf
password: mellon
jndi:
jms-factory: ConnectionFactory
env-properties:
java.naming:
factory.initial: org.apache.activemq.jndi.ActiveMQInitialContextFactory
provider.url: tcp://localhost:61616
Injected JMS factory
In case you need more advanced setup, connector can work with injected factory instance.
@Produces
@ApplicationScoped
@Named("active-mq-factory")
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(config.get("jms.url").asString().get());
}
jms:
url: tcp://127.0.0.1:61616
mp:
messaging:
connector:
helidon-jms:
named-factory: active-mq-factory
outgoing.to-jms:
connector: helidon-jms
session-group-id: order-connection-1
destination: TESTQUEUE
type: queue
incoming.from-jms:
connector: helidon-jms
session-group-id: order-connection-1
destination: TESTQUEUE
type: queue
Usage
Consuming
@Incoming("from-jms")
public void consumeJms(String msg) {
System.out.println("JMS says: " + msg);
}
@Incoming("from-jms")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consumeJms(JmsMessage<String> msg) {
System.out.println("JMS says: " + msg.getPayload());
return msg.ack();
}
Producing
@Outgoing("to-jms")
public PublisherBuilder<String> produceToJms() {
return ReactiveStreams.of("test1", "test2");
}
@Outgoing("to-jms")
public PublisherBuilder<Message<String>> produceToJms() {
return ReactiveStreams.of("test1", "test2")
.map(s -> JmsMessage.builder(s)
.correlationId(UUID.randomUUID().toString())
.property("stringProp", "cool property")
.property("byteProp", 4)
.property("intProp", 5)
.onAck(() -> CompletableFuture.completedStage(null)
.thenRun(() -> System.out.println("Acked!")))
.build());
}
@Outgoing("to-jms")
public PublisherBuilder<Message<String>> produceToJms() {
return ReactiveStreams.of("test1", "test2")
.map(s -> JmsMessage.builder(s)
.customMapper((p, session) -> {
TextMessage textMessage = session.createTextMessage(p);
textMessage.setStringProperty("custom-mapped-property", "XXX" + p);
return textMessage;
})
.build()
);
}