Contents
Overview
Connecting streams to Oracle AQ with Reactive Messaging couldn’t be easier. This connector extends Helidon’s JMS connector with Oracle’s AQ-specific API.
Maven Coordinates
To enable AQ Connector,
add the following dependency to your project’s pom.xml (see
Managing Dependencies).
<dependency>
<groupId>io.helidon.messaging.aq</groupId>
<artifactId>helidon-messaging-aq</artifactId>
</dependency>
Configuration
Connector name: helidon-aq
|
name of the datasource bean used to connect Oracle DB with AQ |
|
jdbc connection string used to connect Oracle DB with AQ (forbidden when |
|
User name used to connect Oracle DB with AQ (forbidden when |
|
Password to connect Oracle DB with AQ (forbidden when |
|
Possible values are: |
|
Queue or topic name |
|
Possible values are: |
|
Indicates whether the session will use a local transaction. Default value: |
|
JMS API message selector expression based on a subset of the SQL92. Expression can only access headers and properties, not the payload. |
|
Client identifier for JMS connection. |
|
True for creating durable consumer (only for topic). Default value: |
|
Subscriber name for durable consumer used to identify subscription. |
|
If true then any messages published to the topic using this session’s connection,
or any other connection with the same client identifier,
will not be added to the durable subscription. Default value: |
|
Select in case factory is injected as a named bean or configured with name. |
|
Timeout for polling for next message in every poll cycle in millis. Default value: |
|
Period for executing poll cycles in millis. Default value: |
|
When multiple channels share same |
Configured JMS Factory
The simplest possible usage is leaving construction of AQjmsConnectionFactory to the connector.
mp:
messaging:
connector:
helidon-aq:
transacted: false
acknowledge-mode: CLIENT_ACKNOWLEDGE
url: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=TESTSID)))
user: gandalf
password: mellon
outgoing.to-aq:
connector: helidon-aq
destination: TESTQUEUE
type: queue
incoming.from-aq:
connector: helidon-aq
destination: TESTQUEUE
type: queue
Its also possible and preferable to refer to configured datasource, in our example Oracle UCP datasource:
javax:
sql:
DataSource:
aq-test-ds:
connectionFactoryClassName: oracle.jdbc.pool.OracleDataSource
URL: jdbc:oracle:thin:@exampledb_high?TNS_ADMIN=/home/gandalf/wallets/Wallet_EXAMPLEDB
user: gandalf
password: SuperSecretPassword1234
mp:
messaging:
connector:
helidon-aq:
transacted: false
acknowledge-mode: CLIENT_ACKNOWLEDGE
data-source: aq-test-ds
outgoing.toJms:
connector: helidon-aq
destination: TESTQUEUE
type: queue
incoming.fromJms:
connector: helidon-aq
destination: TESTQUEUE
type: queue
Injected JMS factory
If you need more advanced configurations, connector can work with injected AQjmsConnectionFactory:
@Produces
@ApplicationScoped
@Named("aq-orderdb-factory")
public AQjmsConnectionFactory connectionFactory() throws JMSException {
AQjmsQueueConnectionFactory fact = new AQjmsQueueConnectionFactory();
fact.setJdbcURL(config.get("jdbc.url").asString().get());
fact.setUsername(config.get("jdbc.user").asString().get());
fact.setPassword(config.get("jdbc.pass").asString().get());
return fact;
}
jdbc:
url: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=TESTSID)))
user: gandalf
pass: mellon
mp:
messaging:
connector:
helidon-aq:
named-factory: aq-orderdb-factory
outgoing.to-aq:
connector: helidon-aq
session-group-id: order-connection-1
destination: TESTQUEUE
type: queue
incoming.from-aq:
connector: helidon-aq
session-group-id: order-connection-1
destination: TESTQUEUE
type: queue
Usage
Consuming
@Incoming("from-aq")
public void consumeAq(String msg) {
System.out.println("Oracle AQ says: " + msg);
}
@Incoming("from-aq")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consumeAq(AqMessage<String> msg) {
// direct commit
//msg.getDbConnection().commit();
System.out.println("Oracle AQ says: " + msg.getPayload());
// ack commits only in non-transacted mode
return msg.ack();
}
Producing
@Outgoing("to-aq")
public PublisherBuilder<String> produceToAq() {
return ReactiveStreams.of("test1", "test2");
}