本文快速入门,RocketMQ消息系统的安装部署,发送,和接收消息,监控消息,的详细说明。
环境需要
64位操作系统,建议使用Linux / Unix /
- CentOs7.3
- 64bit JDK 1.8+
- Maven 3.2.x
- Git 1.8.3.1
环境安装
请参考我的另一篇文章
新加项目
新建一个 maven 项目,这里就不详细操作了,大家都会的
不过也可以下载我的示例源码,下载地址如下
GitHub 源码:
添加依赖
在POM 中添加如下依赖
org.apache.rocketmq rocketmq-client 4.1.0-incubating org.apache.rocketmq rocketmq-common 4.1.0-incubating
配置文件
在配置文件 application.properties
添加一下内容
# 消费者的组名apache.rocketmq.consumer.PushConsumer=PushConsumer# 生产者的组名apache.rocketmq.producer.producerGroup=Producer# NameServer地址apache.rocketmq.namesrvAddr=192.168.252.121:9876
消息生产者
@Componentpublic class Producer { /** * 生产者的组名 */ @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQProducer() { //生产者的组名 DefaultMQProducer producer = new DefaultMQProducer(producerGroup); //指定NameServer地址,多个地址以 ; 隔开 producer.setNamesrvAddr(namesrvAddr); try { /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可 * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); for (int i = 0; i < 100; i++) { String messageBody = "我是消息内容:" + i; String message = new String(messageBody.getBytes(), "utf-8"); //构建消息 Message msg = new Message("PushTopic" /* PushTopic */, "push"/* Tag */, "key_" + i /* Keys */, message.getBytes()); //发送消息 SendResult result = producer.send(msg); System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } }}
消息消费者
@Componentpublic class Consumer { /** * 消费者的组名 */ @Value("${apache.rocketmq.consumer.PushConsumer}") private String consumerGroup; /** * NameServer地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQPushConsumer() { //消费者的组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); //指定NameServer地址,多个地址以 ; 隔开 consumer.setNamesrvAddr(namesrvAddr); try { //订阅PushTopic下Tag为push的消息 consumer.subscribe("PushTopic", "push"); //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 //如果非第一次启动,那么按照上次消费的位置继续消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listlist, ConsumeConcurrentlyContext context) { try { for (MessageExt messageExt : list) { System.out.println("messageExt: " + messageExt);//输出消息内容 String messageBody = new String(messageExt.getBody(), "utf-8"); System.out.println("消费响应:Msg: " + messageExt.getMsgId() + ",msgBody: " + messageBody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } }}
启动服务
@SpringBootApplicationpublic class SpringBootRocketmqApplication { public static void main(String[] args) { SpringApplication.run(SpringBootRocketmqApplication.class, args); }}
控制台会有响应
发送响应:MsgId:0AFF015E556818B4AAC208A0504F0063,发送状态:SEND_OKmessageExt: MessageExt [queueId=0, storeSize=195, queueOffset=113824, sysFlag=0, bornTimestamp=1517559124047, bornHost=/192.168.252.1:62165, storeTimestamp=1517559135052, storeHost=/192.168.252.121:10911, msgId=C0A8FC7900002A9F00000000056F499C, commitLogOffset=91179420, bodyCRC=1687852546, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=113825, KEYS=key_99, CONSUME_START_TIME=1517559124049, UNIQ_KEY=0AFF015E556818B4AAC208A0504F0063, WAIT=true, TAGS=push}, body=21]]消费响应:Msg: 0AFF015E556818B4AAC208A0504F0063,msgBody: 我是消息内容:99...
监控服务
RocketMQ web界面监控RocketMQ-Console-Ng部署
下载并且编译
下载并且 maven 编译
git clone https://github.com/apache/rocketmq-externals.gitrocketmq-externals/rocketmq-console/mvn clean package -Dmaven.test.skip=true
启动监控服务
rocketmq.config.namesrvAddr
NameServer
地址,默认启动端口8080
nohup java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=127.0.0.1:9876
访问监控服务
GitHub 源码:
Gitee 源码:
Contact
- 作者:鹏磊
- 出处:
- Email:
- 版权归作者所有,转载请注明出处
- Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享