RocketMQ集群搭建1namesrv + 1broker
直接用docker-compose创建
使用的windows系统,底层用的WSL,首先在桌面创建docker-compose.yml
里面的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 version : '3.8' services : namesrv : image : apache/rocketmq:5.2.0 container_name : rmqnamesrv ports : - 9876:9876 networks : - rocketmq command : sh mqnamesrv broker : image : apache/rocketmq:5.2.0 container_name : rmqbroker ports : - 10909:10909 - 10911:10911 - 10912:10912 environment : - NAMESRV_ADDR=rmqnamesrv:9876 depends_on : - namesrv networks : - rocketmq command : sh mqbroker proxy : image : apache/rocketmq:5.2.0 container_name : rmqproxy networks : - rocketmq depends_on : - broker - namesrv ports : - 8080:8080 - 8081:8081 restart : on-failure environment : - NAMESRV_ADDR=rmqnamesrv:9876 command : sh mqproxy networks : rocketmq : driver : bridge
然后启动集群
1 docker-compose -p rockermq_project up -d
注意windows启动docker-compose集群需要指定项目名
集群成功启动
完成RocketMQ Topic创建集群
进入broker容器并创建Topic
1 2 docker exec -it rmqbroker bash sh mqadmin updatetopic -t TestTopic -c DefaultCluster
消息发送者demo编写:支持普通消息
在IDEA中进行普通消息的编写
首先需要添加dependency,这里我添加的版本是5.0.6
1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client-java</artifactId > <version > $ {rocketmq-client-java-version} </version > </dependency >
然后编写普通消息代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class ProducerExample { public static void main (String[] args) throws ClientException { String endpoint = "localhost:8081" ; String topic = "TestTopic" ; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey" ) .setTag("messageTag" ) .setBody("messageBody" .getBytes()) .build(); try { SendReceipt sendReceipt = producer.send(message); System.out.println("Send message successfully, messageId=" +sendReceipt.getMessageId()); } catch (ClientException e) { System.out.println("Failed to send message" ); } } }
消息发送者demo编写,支持顺序消息
创建FIFO主题
1 sh mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -a +message.type = FIFO
和普通消息发送相比,顺序消息发送必须要设置消息组,创建FIFO订阅消费组
1 sh mqadmin updateSubGroup -c DefaultCluster -g FIFOGroup -o true
代码编写如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class ProducerExample { public static void main(String[] args) throws ClientException { String endpoint = "localhost:8081" String topic = "FIFOTopic" ClientServiceProvider provider = ClientServiceProvider.loadService() ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint) ClientConfiguration configuration = builder.build() Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build() Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey" ) .setTag("messageTag" ) .setMessageGroup("fifoGroup001" ) .setBody("messageBody" .getBytes()) .build() try { SendReceipt sendReceipt = producer.send(message) System.out.println(sendReceipt.getMessageId()) } catch (ClientException e) { e.printStackTrace() } } }
消息发送者demo编写,支持延时消息、事务消息
延时消息
创建延时主题
1 sh mqadmin updateTopic -c DefaultCluster -t DelayTopic -a +message.type =DELAY
代码主要区别是这两行
消息发送成功
事务消息
创建事务主题
1 sh mqadmin updatetopic -t TransactionTopic -c DefaultCluster -a +message .type =TRANSACTION
代码编写如下
(因为我没有下载rocketmq-client的源码,所以就把源码里的一些代码复制了过来)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class ProducerExample { private static volatile Producer TRANSACTIONAL_PRODUCER; private static final String ACCESS_KEY = "yourAccessKey" ; private static final String SECRET_KEY = "yourSecretKey" ; private static final String ENDPOINTS = "localhost:8080" ; private ProducerExample () { } private static Producer buildTransactionalProducer (TransactionChecker checker, String... topics) throws ClientException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider (ACCESS_KEY, SECRET_KEY); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(ENDPOINTS) .setCredentialProvider(sessionCredentialsProvider) .build(); final ProducerBuilder builder = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topics); if (checker != null ) { builder.setTransactionChecker(checker); } return builder.build(); } public static Producer getTransactionalInstance (TransactionChecker checker, String... topics) throws ClientException { if (null == TRANSACTIONAL_PRODUCER) { synchronized (ProducerExample.class) { if (null == TRANSACTIONAL_PRODUCER) { TRANSACTIONAL_PRODUCER = buildTransactionalProducer(checker, topics); } } } return TRANSACTIONAL_PRODUCER; } public static void main (String[] args) throws ClientException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "TransactionTopic" ; TransactionChecker checker = messageView -> { System.out.println("Receive transactional message check, message=" + messageView); return TransactionResolution.COMMIT; }; final Producer producer = getTransactionalInstance(checker, topic); final Transaction transaction = producer.beginTransaction(); byte [] body = "This is a transaction message for Apache RocketMQ" .getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA" ; final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey-565ef26f5727" ) .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message, transaction); System.out.println("Send transaction message successfully, messageId=" + sendReceipt.getMessageId()); } catch (Throwable t) { System.out.println("Failed to send message" ); return ; } transaction.commit(); } }
消息发送者demo编写
同步模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static void main (String[] args) throws ClientException, InterruptedException { String endpoint = "localhost:8081" ; String topic = "TestTopic" ; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); byte [] body = "This is a normal message for Apache RocketMQ" .getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA" ; final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey-0e094a5f9d85" ) .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); System.out.println("Send message successfully, messageId=" +sendReceipt.getMessageId()); } catch (Throwable t) { System.out.println("Failed to send message" ); } }
异步模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class AsyncProducerExample { private AsyncProducerExample () { } public static void main (String[] args) throws ClientException, InterruptedException { String endpoint = "localhost:8081" ; String topic = "TestTopic" ; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); byte [] body = "This is a normal message for Apache RocketMQ" .getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA" ; final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey-0e094a5f9d85" ) .setBody(body) .build(); final CompletableFuture<SendReceipt> future = producer.sendAsync(message); ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool(); future.whenCompleteAsync((sendReceipt, throwable) -> { if (null != throwable) { System.out.println("Failed to send message" +throwable); return ; } System.out.println("Send message successfully, messageId=" +sendReceipt.getMessageId()); }, sendCallbackExecutor); Thread.sleep(Long.MAX_VALUE); } }
OneWay模式
5.0版本在rocketmq-client-java并没有找到oneway模式,因此转为使用rocket-clinet库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class OnewayProducerExampke { public static void main (String[] args) throws Exception{ DefaultMQProducer producer = new DefaultMQProducer ("GroupName" ); producer.setNamesrvAddr("localhost:9875" ); producer.start(); for (int i = 0 ; i < 100 ; i++) { Message msg = new Message ("TestTopic" "TagA" ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); producer.sendOneway(msg); } producer.shutdown(); } }
oneway
发送模式时,RocketMQ 不提供发送回执或消息 ID,因为 oneway
模式设计为不等待服务器响应,只是单方面发送消息,所以无法直接获知消息是否发送成功。
消息消费者demo编写,完成push模式消费
编写Push消费代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class PushConsumerExample { private PushConsumerExample () { } public static void main (String[] args) throws ClientException, IOException, InterruptedException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String endpoints = "localhost:8081" ; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); String tag = "*" ; FilterExpression filterExpression = new FilterExpression (tag, FilterExpressionType.TAG); String consumerGroup = "YourConsumerGroup" ; String topic = "TestTopic" ; PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { System.out.println("Consume message successfully, messageId=" + messageView.getMessageId()); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); } }
消息消费者demo编写:完成tag过滤、消息幂等
tag过滤
在发送消息的时候设定特定的tag进行过滤
1 2 3 4 5 Message message = messageBuilder.set Topic("topic" ) .set Keys("messageKey" ) .set Tag("TagA" ) .set Body("messageBody" .getBytes()) .build();
消费者接收特定的tag
1 2 3 4 String topic = "Your Topic" ;FilterExpression filterExpression = new FilterExpression ("TagA" , FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
消息幂
为了实现消息幂,需要在生产者端配置key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class ProducerExample { public static void main (String[] args) throws ClientException { String endpoint = "localhost:8081" ; String topic = "TestTopic" ; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("MessageKey" ) .setTag("TagA" ) .setBody("messageBody" .getBytes()) .build(); try { for (int i=0 ;i<5 ;i++) { SendReceipt sendReceipt = producer.send(message); System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId()); } } catch (ClientException e) { System.out.println("Failed to send message" ); } } }
可见成功生产了五个消息
在消费者端利用集合维护已消费的key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class PushConsumerExample { private static final Set<String > consumedMessageKeys = ConcurrentHashMap.new KeySet (); private PushConsumerExample() { } public static void main(String [] args) throws ClientException, IOException, InterruptedException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String endpoints = "localhost:8081" ; ClientConfiguration clientConfiguration = ClientConfiguration.new Builder () .setEndpoints(endpoints) .build(); String tag = "TagA" ; FilterExpression filterExpression = new FilterExpression (tag, FilterExpressionType.TAG); String consumerGroup = "YourConsumerGroup" ; String topic = "TestTopic" ; PushConsumer pushConsumer = provider.new PushConsumerBuilder () .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { String messageKey = String .valueOf(messageView.getKeys()); if (consumedMessageKeys.contains(messageKey)) { System.out.println("Duplicate message detected, skipping. MessageKey=" + messageKey); return ConsumeResult.SUCCESS; } else { consumedMessageKeys.add(messageKey); System.out.println("Consume message successfully, messageTag=" + messageView.getTag() + ", messageKey=" + messageKey); return ConsumeResult.SUCCESS; } }) .build(); Thread.sleep(Long.MAX_VALUE); } }
可以看到只有一个消息成功消费
消息链路追踪开启与验证
本示例采用的rocketmq-clinet库,也就是Remoting协议
首先需要在broker.conf里添加
生产者开启消息链路追踪
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ProducerExample { public static void main (String[] args) throws MQClientException { String nameServerAddress = "localhost:9876" ; String topic = "TestTopic" ; DefaultMQProducer producer = new DefaultMQProducer ("ProducerGroup" ,true ); producer.setNamesrvAddr(nameServerAddress); producer.setInstanceName("ProducerInstance" ); producer.start(); try { Message message = new Message (topic, "TagA" , "MessageKey" , "Hello RocketMQ with message trace" .getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.println("Send message successfully, messageId=" + sendResult.getMsgId()); } catch (Exception e) { System.out.println("Failed to send message" ); } finally { producer.shutdown(); } } }
运行结果
消费者开启消息链路追踪
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class ConsumerExample { public static void main (String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("ConsumerGroup" ); consumer.setNamesrvAddr("localhost:9876" ); consumer.subscribe("TestTopic" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n" ); } }
运行结果
查询轨迹
1 mqadmin QueryMsgTraceById -n rmqnamesev:9876 -i "240CC001101480E48B14D937E169E6F53D441F89AB832C82B3D40000"
Broker ACL开启与验证
需要在broker.conf里添加如下配置
更新ACL配置文件中“account”的属性值
1 sh mqadmin updateAclConfig -n rmqnamesrv:9876 -b rmqbroker:10911 -a RocketMQ -s 1234567809123 -t TestTopic=SUB -g groupA=DENY
执行情况如下
RocketMQ console控制台部署
在docker中执行如下代码
1 docker run -d --name rocketmq-dashboard --net rocketmq -e "JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876" -p 8080 :8080 -t apacherocketmq/rocketmq-dashboard:latest
然后在浏览器输入localhost:8080即可
查看集群信息
尝试添加topic
成功
通过mqadmin 命令行完成操作:重置消费位点、Topic队列扩容
重置消费位点
1 mqadmin resetOffsetByTime -n [namesrvAddr] -b [brokerAddr] -t [topic] -g [consumerGroup] -s [timestamp]
执行结果如下
Topic队列扩容
1 mqadmin updateTopic -n [namesrvAddr] -b [brokerAddr] -t [topic] -r [readQueueNum] -w [writeQueueNum]
执行结果如下