当前位置: 首页 > java >正文

Using Spring for Apache Pulsar:Publishing and Consuming Partitioned Topics

在下面的示例中,我们发布了一个名为hello pulser participated的主题。这是一个被分区的主题,对于这个示例,我们假设该主题已经创建了三个分区。

@SpringBootApplication
public class PulsarBootPartitioned {public static void main(String[] args) {SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");}@Beanpublic ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");return args -> {for (int i = 0; i < 10; i++) {pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());}};}@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")public void listen(String message) {System.out.println("Message Received: " + message);}static class FooRouter implements MessageRouter {@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {return 0;}}static class BarRouter implements MessageRouter {@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {return 1;}}static class BuzzRouter implements MessageRouter {@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {return 2;}}}

在前面的示例中,我们发布到一个分区的主题,我们想将一些数据段发布到特定的分区。如果您将其保留为Pulsar的默认值,它将遵循分区分配的轮转模式,我们希望覆盖该模式。为此,我们提供了一个带有send方法的消息路由器对象。考虑实现的三个消息路由器。FooRouter始终将数据发送到分区0,BarRouter发送到分区1,BuzzRouter发送给分区2。还要注意,我们现在使用PulsarTemplate的sendAsync方法,该方法返回CompletableFuture。运行应用程序时,我们还需要将生产者上的messageRoutingMode设置为CustomPartition(spring.pulsinger.producer.message路由模式)。

在消费者端,我们使用具有独占订阅类型的PulsarListener。这意味着来自所有分区的数据最终都在同一个消费者中,并且没有订购保证。

如果我们希望每个分区由一个不同的消费者使用,我们该怎么办?我们可以切换到故障转移订阅模式,并添加三个单独的消费者:

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {System.out.println("Message Received 1: " + foo);
}@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {System.out.println("Message Received 2: " + foo);
}@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {System.out.println("Message Received 3: " + foo);
}

当你遵循这种方法时,一个分区总是被一个专用的消费者占用。

同样,如果你想使用Pulsar的共享消费者类型,你可以使用共享订阅类型。但是,当您使用共享模式时,您将失去任何排序保证,因为单个消费者可能会在另一个消费者有机会之前收到来自所有分区的消息。

考虑以下示例:

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {System.out.println("Message Received 1: " + foo);
}@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {System.out.println("Message Received 2: " + foo);
}

http://www.xdnf.cn/news/14982.html

相关文章:

  • swiglu 激活函数学习笔记
  • Rust与Cypress应用
  • 技术支持丨解决 ServBay 在 Windows 启动时反复提示安装 .NET 的问题
  • Flask3.1打造极简CMS系统
  • leetcode11.盛最多水的容器
  • 微信小程序91~100
  • STM32-待机唤醒实验
  • 搭建一款结合传统黄历功能的日历小程序
  • S7-200 SMART :通过以太网下载程序详细步骤
  • ServBay Windows 1.2.0 更新!新增 PHP 设置与 Ollama 支持
  • Docker 高级管理 -- 容器通信技术与数据持久化
  • 人工智能-基础篇-27-模型上下文协议--MCP到底怎么理解?对比HTTP的区别?
  • 如何卸载本机的node.js
  • 【视频观看系统】- 需求分析
  • 沃丰科技海外客服系统综合解决方案
  • 【DB2】load报错SQL3501W、SQL3109N、SQL2036N
  • 持续更新!国内免费使用 claude code 方案
  • LLaMA-Omni 深度解析:打开通往无缝人机语音交互的大门
  • C++学习笔记三
  • 使用 Docker Compose 简化 INFINI Console 与 Easysearch 环境搭建
  • 跨部门协作难以对齐项目进度,如何促进协同
  • 【动手学深度学习】4.10 实战Kaggle比赛:预测房价
  • S7-1500——(一)从入门到精通1、基于TIA 博途解析PLC程序结构(一)
  • 【04】MFC入门到精通——MFC 自己手动新添加对话框模板 并 创建对话框类
  • 从零开始学前端html篇2
  • React 编译器与性能优化:告别手动 Memoization
  • 网关助力航天喷涂:Devicenet与Modbus TCP的“跨界对话“
  • windows指定某node及npm版本下载
  • Linux入门篇学习——Linux 编写第一个自己的命令
  • 【TCP/IP】3. IP 地址