Streaming Queue - RabbitMQ

RabbitMQ introduces Streams in its recent version that model an append-only log that's immutable queue. It also allows one or more consumers subscribe to it and read the same message as many times as they want.

RabbitMQ Streams uses:

·????? Fan-out :?many consumers can read the same message

·????? Replay & time-travel:?consumers can read and reread a message from any point in the stream.

Let us try to deploy a stream and consume message from the stream using java API.

Configure Two Nodes RabbitMQ cluster before going ahead.

Two Nodes RabbitMQ Cluster

Here, we are having two nodes cluster.

Logon the web UI. Navigate Queues -> Add a new Queue.

Queue Name: my_qs

Node : rabbit@rabbitmq0

Type: Stream

Stream Console

Add Queue.

If you have successfully added the Stream Queue, you should able to see on the web console, Queues section.

The type should be Stream as shown above.

Declaring a queue with an?x-queue-type?argument set to?stream?will create a stream with a replica on each configured RabbitMQ node. Streams are quorum systems so uneven cluster sizes is strongly recommended.

Define an exchange, ex_qs and bind to the above Stream;my_qs

Perform Binding

Publishing messages to a Stream is no different from publishing messages to a queue. As an example, below, the previous snippet has been extended to publish a message to the?my_qs?declared.

# rabbitmqadmin -H rabbitmq0?? -u guest -p guest?? publish? exchange='ex_qs'? routing_key='' payload="A Streaming? Message"

You should observe a message in the earlier define stream.

While consuming Messages from a Stream it requires setting the?QoS prefetch and optionally an?offset?to start reading/consuming from any point in the log stream. If unspecified, the consumer grabs the most recent message written to the log after it starts.

Determine the port number from the web console and replace in your API.

Open java IDE of your choice and write the following code to consume the message from the above Queue.

--- Code Begins Here ----

package com.hp.streamming;

?

import java.io.IOException;

import java.util.Collections;

import java.util.concurrent.TimeoutException;

?

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

?

public class StreamConsumer {

??? private final static String QUEUE_HOST = "localhost";

??? private final static int PORT = 17673;

??? public static void main(String[] args) {

??????

?????? try {

?????????? consume();

?????? } catch (IOException e) {

?????????? // TODO Auto-generated catch block

?????????? e.printStackTrace();

?????? } catch (TimeoutException e) {

?????????? // TODO Auto-generated catch block

?????????? e.printStackTrace();

?????? }

?

??? }

?

??? public static void consume() throws IOException, TimeoutException {

?????? ConnectionFactory factory = new ConnectionFactory();

?????? factory.setHost(QUEUE_HOST);

?????? factory.setPort(PORT);

?????? factory.setUsername("guest");

?????? factory.setPassword("guest");

?????? Connection connection = factory.newConnection();

?????? Channel channel = connection.createChannel();

?????? channel.basicQos(100); // QoS must be specified

?????? channel.basicConsume(

????????????? "my_qs",

????????????? false,

????????????? Collections.singletonMap("x-stream-offset", "first"), // "first" offset specification

????????????? (consumerTag, message) -> {

????????????????? System.out.println(" Received : " + newString(message.getBody()));

????????????? ??? channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required

????????????? },

????????????? consumerTag -> { });

??? }

}

---- Code Ends Here -----

Execute the program.

Result:

It prints out the first message which was sent through CLI.

Send another message from the console, this time we will connect through the another node.

#rabbitmqadmin -H rabbitmq1?? -u guest -p guest?? publish? exchange='ex_qs'? routing_key='' payload="Another Streaming? Message"

You should be able to consume the message and vied in the java console.

In this lab, we have configure stream and consume message using the java client.

I have some questions about RabbitMQ. How I can contact you?

回复

要查看或添加评论,请登录

Henry Potsangbam的更多文章

社区洞察

其他会员也浏览了