博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketMQ实战(四): hello world
阅读量:2169 次
发布时间:2019-05-01

本文共 3231 字,大约阅读时间需要 10 分钟。

step1:pom文件中引入依赖

org.apache.rocketmq
rocketmq-client
4.0.0-incubating
org.apache.rocketmq
rocketmq-all
4.0.0-incubating

step2 : 创建生产者生产消息

package com.example.demo.rocketmq.simpledemo;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;public class SyncProducer {    public static void main(String[] args) throws Exception {        //Instantiate with a producer group name.        DefaultMQProducer producer = new DefaultMQProducer("testProducer");        producer.setNamesrvAddr("ip:9876"); //多个nameserver用;隔开        producer.setVipChannelEnabled(false);//假设你brokerIP  10911的话VIP通道端口为10911-2(阿里云部署的特别注意)        //Launch the instance.        producer.start();        for (int i = 0; i < 100; i++) {            //Create a message instance, specifying topic, tag and message body.            Message msg = new Message("TopicTest" /* Topic */,                    "TagA" /* Tag */,                    ("Hello RocketMQ " +                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */            );            //Call send message to deliver message to one of brokers.            SendResult sendResult = producer.send(msg);            System.out.printf("%s%n", sendResult);        }        //Shut down once the producer instance is not longer in use.        producer.shutdown();    }}

创建生产者生产消息:

package com.example.demo.rocketmq.simpledemo;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;public class Consumer {    public static void main(String[] args) throws Exception {        //DefaultMQPushConsumer注册监听器消费,没有控制权。        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer");        consumer.setVipChannelEnabled(false);        consumer.setNamesrvAddr("ip:9876");        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);        consumer.subscribe("TopicTest", "TagA || TagC || TagD");//*订阅全部tag        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); }}
这个只是hello world级别的demo,rocketmq提供了丰富的生产消息机制和消费消息机制,后续章节会进行全面介绍。

转载地址:http://meazb.baihongyu.com/

你可能感兴趣的文章
用学习曲线 learning curve 来判别过拟合问题
查看>>
用验证曲线 validation curve 选择超参数
查看>>
用 Grid Search 对 SVM 进行调参
查看>>
用 Pipeline 将训练集参数重复应用到测试集
查看>>
PCA 的数学原理和可视化效果
查看>>
机器学习中常用评估指标汇总
查看>>
什么是 ROC AUC
查看>>
Bagging 简述
查看>>
详解 Stacking 的 python 实现
查看>>
简述极大似然估计
查看>>
用线性判别分析 LDA 降维
查看>>
用 Doc2Vec 得到文档/段落/句子的向量表达
查看>>
使聊天机器人具有个性
查看>>
使聊天机器人的对话更有营养
查看>>
一个 tflearn 情感分析小例子
查看>>
attention 机制入门
查看>>
手把手用 IntelliJ IDEA 和 SBT 创建 scala 项目
查看>>
双向 LSTM
查看>>
GAN 的 keras 实现
查看>>
AI 在 marketing 上的应用
查看>>