RocketMQ集群搭建1namesrv + 1broker

直接用docker-compose创建

使用的windows系统,底层用的WSL,首先在桌面创建docker-compose.yml

1
ni docker-compose.yml

里面的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.2.0
container_name: rmqnamesrv
ports:
- 9876:9876
networks:
- rocketmq
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.2.0
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
command: sh mqbroker
proxy:
image: apache/rocketmq:5.2.0
container_name: rmqproxy
networks:
- rocketmq
depends_on:
- broker
- namesrv
ports:
- 8080:8080
- 8081:8081
restart: on-failure
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
command: sh mqproxy
networks:
rocketmq:
driver: bridge

然后启动集群

1
docker-compose -p rockermq_project up -d

注意windows启动docker-compose集群需要指定项目名

集群成功启动

完成RocketMQ Topic创建集群

进入broker容器并创建Topic

1
2
docker exec -it rmqbroker bash
sh mqadmin updatetopic -t TestTopic -c DefaultCluster

消息发送者demo编写:支持普通消息

在IDEA中进行普通消息的编写

首先需要添加dependency,这里我添加的版本是5.0.6

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>${rocketmq-client-java-version}</version>
</dependency>

然后编写普通消息代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ProducerExample {
public static void main(String[] args) throws ClientException {
String endpoint = "localhost:8081";
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
Message message = provider.newMessageBuilder()
.setTopic(topic)
// Set the message index key, which can be used to accurately find a specific message.
.setKeys("messageKey")
// Set the message Tag, used by the consumer to filter messages by specified Tag.
.setTag("messageTag")
// Message body.
.setBody("messageBody".getBytes())
.build();
try {
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId="+sendReceipt.getMessageId());
} catch (ClientException e) {
System.out.println("Failed to send message");
}
// producer.close();
}
}

消息发送者demo编写,支持顺序消息

创建FIFO主题

1
sh mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -a +message.type=FIFO

和普通消息发送相比,顺序消息发送必须要设置消息组,创建FIFO订阅消费组

1
sh mqadmin updateSubGroup -c DefaultCluster -g FIFOGroup -o true

代码编写如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ProducerExample {
public static void main(String[] args) throws ClientException {
String endpoint = "localhost:8081";
String topic = "FIFOTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();

Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();

Message message = provider.newMessageBuilder()
.setTopic(topic)
.setKeys("messageKey")
.setTag("messageTag")
.setMessageGroup("fifoGroup001")
.setBody("messageBody".getBytes())
.build();
try {
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}

消息发送者demo编写,支持延时消息、事务消息

延时消息

创建延时主题

1
sh mqadmin updateTopic -c DefaultCluster -t DelayTopic -a +message.type=DELAY

代码主要区别是这两行

消息发送成功

事务消息

创建事务主题

1
sh mqadmin updatetopic -t TransactionTopic -c DefaultCluster -a +message.type=TRANSACTION

代码编写如下

(因为我没有下载rocketmq-client的源码,所以就把源码里的一些代码复制了过来)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class ProducerExample {
private static volatile Producer TRANSACTIONAL_PRODUCER;
private static final String ACCESS_KEY = "yourAccessKey";
private static final String SECRET_KEY = "yourSecretKey";
private static final String ENDPOINTS = "localhost:8080";

private ProducerExample() {
}

private static Producer buildTransactionalProducer(TransactionChecker checker, String... topics) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(ENDPOINTS)
.setCredentialProvider(sessionCredentialsProvider)
.build();
final ProducerBuilder builder = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topics);

if (checker != null) {
builder.setTransactionChecker(checker);
}

return builder.build();
}

public static Producer getTransactionalInstance(TransactionChecker checker, String... topics) throws ClientException {
if (null == TRANSACTIONAL_PRODUCER) {
synchronized (ProducerExample.class) {
if (null == TRANSACTIONAL_PRODUCER) {
TRANSACTIONAL_PRODUCER = buildTransactionalProducer(checker, topics);
}
}
}
return TRANSACTIONAL_PRODUCER;
}

public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

String topic = "TransactionTopic";
TransactionChecker checker = messageView -> {
System.out.println("Receive transactional message check, message=" + messageView);
return TransactionResolution.COMMIT;
};

final Producer producer = getTransactionalInstance(checker, topic);
final Transaction transaction = producer.beginTransaction();

byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("yourMessageKey-565ef26f5727")
.setBody(body)
.build();

try {
final SendReceipt sendReceipt = producer.send(message, transaction);
System.out.println("Send transaction message successfully, messageId=" + sendReceipt.getMessageId());
} catch (Throwable t) {
System.out.println("Failed to send message");
return;
}

transaction.commit(); // Commit the transaction
// transaction.rollback(); // Or rollback the transaction

// producer.close(); // Close the producer when not needed anymore
}
}

消息发送者demo编写

同步模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) throws ClientException, InterruptedException {
String endpoint = "localhost:8081";
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();

Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();

byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("yourMessageKey-0e094a5f9d85")
.setBody(body)
.build();

try {
final SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId="+sendReceipt.getMessageId());
} catch (Throwable t) {
System.out.println("Failed to send message");
}
}

异步模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class AsyncProducerExample {
private AsyncProducerExample() {
}

public static void main(String[] args) throws ClientException, InterruptedException {
String endpoint = "localhost:8081";
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();

Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();

byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("yourMessageKey-0e094a5f9d85")
.setBody(body)
.build();

final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
future.whenCompleteAsync((sendReceipt, throwable) -> {
if (null != throwable) {
System.out.println("Failed to send message"+throwable);
return;
}
System.out.println("Send message successfully, messageId="+sendReceipt.getMessageId());
}, sendCallbackExecutor);
Thread.sleep(Long.MAX_VALUE);
// producer.close();
}
}

OneWay模式

5.0版本在rocketmq-client-java并没有找到oneway模式,因此转为使用rocket-clinet库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class OnewayProducerExampke {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("GroupName");
producer.setNamesrvAddr("localhost:9875");

producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TestTopic"
"TagA"
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
producer.sendOneway(msg);
}
producer.shutdown();
}
}

oneway 发送模式时,RocketMQ 不提供发送回执或消息 ID,因为 oneway 模式设计为不等待服务器响应,只是单方面发送消息,所以无法直接获知消息是否发送成功。

消息消费者demo编写,完成push模式消费

编写Push消费代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class PushConsumerExample {
private PushConsumerExample() {
}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String endpoints = "localhost:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "YourConsumerGroup";
String topic = "TestTopic";
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
System.out.println("Consume message successfully, messageId="+ messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// pushConsumer.close();
}
}

消息消费者demo编写:完成tag过滤、消息幂等

tag过滤

在发送消息的时候设定特定的tag进行过滤

1
2
3
4
5
Message message = messageBuilder.setTopic("topic")
.setKeys("messageKey")
.setTag("TagA")
.setBody("messageBody".getBytes())
.build();

消费者接收特定的tag

1
2
3
4
String topic = "Your Topic";
//只订阅消息标签为"TagA"的消息。
FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);

消息幂

为了实现消息幂,需要在生产者端配置key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ProducerExample {
public static void main(String[] args) throws ClientException {
String endpoint = "localhost:8081";
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
Message message = provider.newMessageBuilder()
.setTopic(topic)
// Set the message index key, which can be used to accurately find a specific message.
.setKeys("MessageKey")
// Set the message Tag, used by the consumer to filter messages by specified Tag.
.setTag("TagA")
// Message body.
.setBody("messageBody".getBytes())
.build();
try {
for(int i=0;i<5;i++) {
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
}
} catch (ClientException e) {
System.out.println("Failed to send message");
}
// producer.close();
}
}

可见成功生产了五个消息

在消费者端利用集合维护已消费的key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class PushConsumerExample {
private static final Set<String> consumedMessageKeys = ConcurrentHashMap.newKeySet();

private PushConsumerExample() {
}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String endpoints = "localhost:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
String tag = "TagA";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "YourConsumerGroup";
String topic = "TestTopic";
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group.
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set the message listener.
.setMessageListener(messageView -> {
String messageKey = String.valueOf(messageView.getKeys());
if (consumedMessageKeys.contains(messageKey)) {
System.out.println("Duplicate message detected, skipping. MessageKey=" + messageKey);
return ConsumeResult.SUCCESS;
} else {
consumedMessageKeys.add(messageKey);
System.out.println("Consume message successfully, messageTag=" + messageView.getTag() + ", messageKey=" + messageKey);
// Add your message processing logic here
return ConsumeResult.SUCCESS;
}
})
.build();
Thread.sleep(Long.MAX_VALUE);
}
}

可以看到只有一个消息成功消费

消息链路追踪开启与验证

本示例采用的rocketmq-clinet库,也就是Remoting协议

首先需要在broker.conf里添加

1
traceTopicEnable=true

生产者开启消息链路追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ProducerExample {
public static void main(String[] args) throws MQClientException {
String nameServerAddress = "localhost:9876"; // 替换为你的 NameServer 地址
String topic = "TestTopic";

// 创建生产者,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup",true);
producer.setNamesrvAddr(nameServerAddress);
producer.setInstanceName("ProducerInstance");

producer.start();

try {
// 构建消息
Message message = new Message(topic, "TagA", "MessageKey",
"Hello RocketMQ with message trace".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println("Send message successfully, messageId=" + sendResult.getMsgId());
} catch (Exception e) {
System.out.println("Failed to send message");
} finally {
// 关闭生产者
producer.shutdown();
}
}
}

运行结果

消费者开启消息链路追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ConsumerExample {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");

// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TestTopic", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

运行结果

查询轨迹

1
mqadmin QueryMsgTraceById -n rmqnamesev:9876 -i "240CC001101480E48B14D937E169E6F53D441F89AB832C82B3D40000"

Broker ACL开启与验证

需要在broker.conf里添加如下配置

1
aclEnable=true

更新ACL配置文件中“account”的属性值

1
sh mqadmin updateAclConfig -n rmqnamesrv:9876 -b rmqbroker:10911 -a RocketMQ -s 1234567809123 -t TestTopic=SUB -g groupA=DENY

执行情况如下

RocketMQ console控制台部署

在docker中执行如下代码

1
docker run -d --name rocketmq-dashboard --net rocketmq -e "JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876" -p 8080:8080 -t apacherocketmq/rocketmq-dashboard:latest

然后在浏览器输入localhost:8080即可

查看集群信息

尝试添加topic

成功

通过mqadmin 命令行完成操作:重置消费位点、Topic队列扩容

重置消费位点

1
mqadmin resetOffsetByTime -n [namesrvAddr] -b [brokerAddr] -t [topic] -g [consumerGroup] -s [timestamp]

执行结果如下

Topic队列扩容

1
mqadmin updateTopic -n [namesrvAddr] -b [brokerAddr] -t [topic] -r [readQueueNum] -w [writeQueueNum]

执行结果如下