本文共 3231 字,大约阅读时间需要 10 分钟。
step1:pom文件中引入依赖
org.apache.rocketmq rocketmq-client 4.0.0-incubating org.apache.rocketmq rocketmq-all 4.0.0-incubating
step2 : 创建生产者生产消息
package com.example.demo.rocketmq.simpledemo;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("testProducer"); producer.setNamesrvAddr("ip:9876"); //多个nameserver用;隔开 producer.setVipChannelEnabled(false);//假设你brokerIP 10911的话VIP通道端口为10911-2(阿里云部署的特别注意) //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); }}
创建生产者生产消息:
package com.example.demo.rocketmq.simpledemo;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;public class Consumer { public static void main(String[] args) throws Exception { //DefaultMQPushConsumer注册监听器消费,没有控制权。 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer"); consumer.setVipChannelEnabled(false); consumer.setNamesrvAddr("ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "TagA || TagC || TagD");//*订阅全部tag consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List这个只是hello world级别的demo,rocketmq提供了丰富的生产消息机制和消费消息机制,后续章节会进行全面介绍。msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); }}
转载地址:http://meazb.baihongyu.com/