博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Boot 中使用 RocketMQ
阅读量:5899 次
发布时间:2019-06-19

本文共 5168 字,大约阅读时间需要 17 分钟。

hot3.png

本文快速入门,RocketMQ消息系统的安装部署,发送,和接收消息,监控消息,的详细说明。

环境需要

64位操作系统,建议使用Linux / Unix /

  • CentOs7.3
  • 64bit JDK 1.8+
  • Maven 3.2.x
  • Git 1.8.3.1

环境安装

请参考我的另一篇文章

新加项目

新建一个 maven 项目,这里就不详细操作了,大家都会的

不过也可以下载我的示例源码,下载地址如下

GitHub 源码:

添加依赖

在POM 中添加如下依赖

org.apache.rocketmq
rocketmq-client
4.1.0-incubating
org.apache.rocketmq
rocketmq-common
4.1.0-incubating

配置文件

在配置文件 application.properties 添加一下内容

# 消费者的组名apache.rocketmq.consumer.PushConsumer=PushConsumer# 生产者的组名apache.rocketmq.producer.producerGroup=Producer# NameServer地址apache.rocketmq.namesrvAddr=192.168.252.121:9876

消息生产者

@Componentpublic class Producer {    /**     * 生产者的组名     */    @Value("${apache.rocketmq.producer.producerGroup}")    private String producerGroup;    /**     * NameServer 地址     */    @Value("${apache.rocketmq.namesrvAddr}")    private String namesrvAddr;    @PostConstruct    public void defaultMQProducer() {        //生产者的组名        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);        //指定NameServer地址,多个地址以 ; 隔开        producer.setNamesrvAddr(namesrvAddr);        try {            /**             * Producer对象在使用之前必须要调用start初始化,初始化一次即可             * 注意:切记不可以在每次发送消息时,都调用start方法             */            producer.start();            for (int i = 0; i < 100; i++) {                String messageBody = "我是消息内容:" + i;                String message = new String(messageBody.getBytes(), "utf-8");                //构建消息                Message msg = new Message("PushTopic" /* PushTopic */, "push"/* Tag  */, "key_" + i /* Keys */, message.getBytes());                //发送消息                SendResult result = producer.send(msg);                System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());            }        } catch (Exception e) {            e.printStackTrace();        } finally {            producer.shutdown();        }    }}

消息消费者

@Componentpublic class Consumer {    /**     * 消费者的组名     */    @Value("${apache.rocketmq.consumer.PushConsumer}")    private String consumerGroup;    /**     * NameServer地址     */    @Value("${apache.rocketmq.namesrvAddr}")    private String namesrvAddr;    @PostConstruct    public void defaultMQPushConsumer() {        //消费者的组名        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);        //指定NameServer地址,多个地址以 ; 隔开        consumer.setNamesrvAddr(namesrvAddr);        try {            //订阅PushTopic下Tag为push的消息            consumer.subscribe("PushTopic", "push");            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费            //如果非第一次启动,那么按照上次消费的位置继续消费            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);            consumer.registerMessageListener(new MessageListenerConcurrently() {                @Override                public ConsumeConcurrentlyStatus consumeMessage(List
list, ConsumeConcurrentlyContext context) { try { for (MessageExt messageExt : list) { System.out.println("messageExt: " + messageExt);//输出消息内容 String messageBody = new String(messageExt.getBody(), "utf-8"); System.out.println("消费响应:Msg: " + messageExt.getMsgId() + ",msgBody: " + messageBody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } }}

启动服务

@SpringBootApplicationpublic class SpringBootRocketmqApplication {	public static void main(String[] args) {		SpringApplication.run(SpringBootRocketmqApplication.class, args);	}}

控制台会有响应

发送响应:MsgId:0AFF015E556818B4AAC208A0504F0063,发送状态:SEND_OKmessageExt: MessageExt [queueId=0, storeSize=195, queueOffset=113824, sysFlag=0, bornTimestamp=1517559124047, bornHost=/192.168.252.1:62165, storeTimestamp=1517559135052, storeHost=/192.168.252.121:10911, msgId=C0A8FC7900002A9F00000000056F499C, commitLogOffset=91179420, bodyCRC=1687852546, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=113825, KEYS=key_99, CONSUME_START_TIME=1517559124049, UNIQ_KEY=0AFF015E556818B4AAC208A0504F0063, WAIT=true, TAGS=push}, body=21]]消费响应:Msg: 0AFF015E556818B4AAC208A0504F0063,msgBody: 我是消息内容:99...

监控服务

RocketMQ web界面监控RocketMQ-Console-Ng部署

下载并且编译

下载并且 maven 编译

git clone https://github.com/apache/rocketmq-externals.gitrocketmq-externals/rocketmq-console/mvn clean package -Dmaven.test.skip=true

启动监控服务

rocketmq.config.namesrvAddr NameServer 地址,默认启动端口8080

nohup java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=127.0.0.1:9876

访问监控服务

GitHub 源码:

Gitee 源码:

Contact

  • 作者:鹏磊
  • 出处:
  • Email:
  • 版权归作者所有,转载请注明出处
  • Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享

关注公众号-搜云库

转载于:https://my.oschina.net/yanpenglei/blog/1617468

你可能感兴趣的文章
论文笔记之:MatchNet: Unifying Feature and Metric Learning for Patch-Based Matching
查看>>
自己编写的一个数据结构,类似优先级队列
查看>>
2.10. Spring boot with Session share
查看>>
How to Configure the Gradient Boosting Algorithm
查看>>
首场微信小论坛上他们都聊了哪些小程序的议题
查看>>
【Redis】Redis 初探
查看>>
关于大数据和数据库的讨论
查看>>
[20150126]datadump的非文档参数.txt
查看>>
18.8. sudo, sudoedit - execute a command as another user
查看>>
SAP MM取消采购订单审批--- BAPI_PO_RESET_RELEASE
查看>>
[20150924]SYS_CONTEXT函数的使用.txt
查看>>
PHPEXCEL 不能输出中文内容,只显示空白
查看>>
HDOJ/HDU 1113 Word Amalgamation(字典顺序~Map)
查看>>
Scanner类及正则表达式
查看>>
Javascript之旅——第八站:说说instanceof踩了一个坑
查看>>
C# 如何获取某个类型或类型实例对象的大小
查看>>
Xamarin for android:为button设置click事件的几种方法
查看>>
Dispatcher.BeginInvoke()方法使用不当导致UI界面卡死的原因分析
查看>>
安装SQLserver 2014(For AlwaysOn)
查看>>
Linux部署ASP.NET 5 (vNext)
查看>>