架构图
组件说明
NameServer
NameServer是RocketMQ Broker的路由信息注册中心。它将各个Broker的Cluster信息、Topic信息、路由信息等存储在内存中,提供查询服务给Producer和Consumer。
Broker
Broker是RocketMQ系统的核心组件,它是负责消息的存储、传输和处理的服务。每个Broker都包含了Message Store、消息队列、消费队列和消息处理线程。
Producer
Producer是消息的发送端,向某个Topic发送消息,消息将由NameServer查询到对应的Broker,然后通过网络传输到Broker。Producer将数据源封装成消息后发送给Broker。
Consumer
Consumer是消息的接收端,向Broker订阅某个Topic,消费消费队列中的消息。消息处理完成后,Consumer同步向Broker发送消息确认信息。
Java代码示例
Producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class RocketMQProducer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start();
try { Message message = new Message("topic", "tag", "Hello RocketMQ".getBytes()); SendResult result = producer.send(message); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
|
Consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class RocketMQConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("topic", "tag"); consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); } }
|