Apache Puslar:百万topic消息队列

Pulsar官方提供了对Spark Streaming的支持。我看了下Stream Native对Pulsar流处理的支持情况,Pulsar对Flink的扩展做得更全面些。我截下github的支持情况。

  • Apache Pulsar与架构
    • 产品定位
    • Pulsar关键特性
    • Pulsar架构
    • Pulsar客户端
    • Topic Compaction
    • Pulsar schema
    • Pulsar Functions
    • Pular IO Connector
    • Pulsar SQL
    • 关于Pulsar的读音
  • 本地安装Pulsar
    • 创建pulsar用户
    • 下载并解压pulsar
  • 使用Pulsar
    • CLI工具
    • API
    • Routing Mode(路由模式)
    • Pulsar SQL
  • 关于流式引擎的选择

Apache Pulsar与架构

产品定位

我们Pulsar的产品定位主要是云原生的、分布式消息平台、以及流计算平台。它是来自于Yahoo!的开源项目。Pulsar对于云原生的支持直接体现就在于多租户,我们在后续Pulsar的元数据中总是能看到租户的身影。所以,它更容易和云平台整合在一起。

百万topic消息队列——Apache Puslar

image-20210724153751586

而Kafka也是一个开源的分布式事件流处理平台。

百万topic消息队列——Apache Puslar

image-20210724153914400

可以说,它们在很大程度上,会有不少的重叠。这两个项目都是Apache的顶级项目。Kafka流行的时间比较长,2011年 LinkedIn就将它开源了,2012年就已经在Apache孵化器中孵化。而Pulsar要比Kafka晚一些,它是Yahoo!在2016年开源的,并且2018年称为Apache的顶级项目。它们虽然很像,但它们一定会存在不同的特点。我们先来了解下Pulsar。

Pulsar关键特性

部署方式

Pulsar官方宣称是非常容易部署的。我们也可以在官方文档中看到有三种方式的部署:

  1. Standalone,在做一些开发、测试、示例,可以使用Standalone模式来部署Pulsar,当然,如果我们想要独立部署Pulsar集群,作为企业的消息中间件,也可以将Pulsar单独部署。
  2. Docker,Docker部署的方式非常简单,只需要拉取指定的镜像启动即可。
  3. Kubernetes,Pulsar也提供了易于部署在K8s上的部署方式,它包含了众多相关的组件自动部署。

关键特征(key feature)

  1. 原生支持多个Pulsar集群,在不同Pulsar集群中要进行跨集群复制也是原生支持的。
  2. 可以支持低延迟的publish和端对端延迟。
  3. 无缝扩展到上百万个topic。
  4. 非常简单的客户端API,支持Java、Go、Python、C++语言。
  5. 支持多种订阅模式,包括:独占、共享、故障转移。
  6. 通过BookKeeper提供消息持久存储支持,保障消息传递。
  7. 通过无服务器,轻量级的计算框架,Puslar Functions提供流原生数据处理能力。
  8. 基于Pulsar Funstions构建了无服务器的connector,可以比较容易地进行数据导入、导出。
  9. 支持数据冷热分层。

Pulsar架构

百万topic消息队列——Apache Puslar

image-20210724162341629

我们前面提到了Pulsar集群可以原生支持多个Pulsar集群。所以,在架构的High Level层面,一个Pulsar实例对应的是多个Pulsar集群,不同的Pulsar集群之间可以相互复制数据。

broker

Kafka中也有broker的概念,broker是一个无状态的组件,状态存储在ZooKeeper中,它其实就是Kafka的服务器。而Pulsar集群也是由一个或者多个Broker组成的,Broker也是无状态的。Pulsar broker主要运行着两个组件:

  • 一个HTTP Server,它以REST API方式,对生产者、消费者提供了管理、查找主题的功能。生产者连接到Borker,然后发送消息,消费者连接到Broker然后消费消息。
  • 一个Dispatcher(分发器),它是一个异步的TCP服务器,负责将所有数据以二进制协议分发。

Managed Ledger

为了保证Pulsar性能,在生产者发送的消息积压不超过了Managed Ledger Cache(管理账簿缓存,或者我们把它称为写缓存),消息是基于Managed Ledger Cache来分发的。如果消息积压远远大于写缓存,将会从BookKeeper中读取。

Global Replicators

Global Replicators翻译过来为全局复制器,顾名思义,它是用来复制数据的——将一个Topic的数据,复制到另一个Topic。它可以用于将本地的broker中的数据通过Pulsar的Java客户端,发送给远程服务器。

Cluster

一个Pulsar由多个集群组成。而一个Pulsar集群,包含:

  • 一个、或者多个Broker
  • 一个Cluster-Level的ZooKeeper,用于集群的配置和协调服务。
  • 一组用于存储消息的Bookie。

Meta Store

Pulsar的MetaStore负责存储Pulsar集群所有的元数据、schema、以及代理负载。Pulsar使用的是ZooKeeper进行元数据存储、集群配置和协调。

Configuration Store

顾名思义,Configuration Store存储Pulsar实例的所有配置。例如:集群、租户、命名空间、主题、分区等配置。配置存储可以在多个集群间共享。配置存储可以部署在单独的ZooKeeper集群,也可以部署在现有的ZooKeeper集群。

Persistent Storage

Pulsar保证应用程序传递的消息不丢失。如果消息到达Broker,会将消息存储到指定的可靠位置。Pulsar会将未被消费的消息以持久化的方式存储,这种模式也称为持久消息传递。简单来说,不管是否有消费者,都会先将消息持久化。我们之前所了解的Kafka,也会将消息持久化。Pulsar中,所有的消息的副本都会存储下来,并且同步到磁盘上。

Apache BookKeeper

百万topic消息队列——Apache Puslar

image-20210725171207117

BookKeeper是一个分布式预写日志(WAL)系统。它有一些优势:

  • Pulsar可以利用许多独立的日志,称为Ledger(分类账簿)。随着时间推移,可以为主题创建多个Ledger。(没看出来这有什么优势啊?)
  • BookKeeper为顺序的数据提供高效的存储。
  • 能够保证在系统出现故障时,读取Ledger的数据是一致的。
  • BooKeeper可以确保在bookies之间I/O均匀分布。
  • BookKeeper的扩展性好,可以在容量、和吞吐量上进行水平扩展。就像HDFS一样,可以往集群中添加更多的bookies来增加容量。
  • Bookies可以支持并发读取、和写入处理数千个Legder。通过使用多个磁盘设备,一个用于日志操作,另一个用于一般存储。Bookies能够将读取操作的影响和正在进行的写入操作的延迟隔离出来。
  • Pulsar中,消费者订阅消费消息的位置称为cursor(在Kafka中称为offset),cursor也存储在BookKeeper中。

以下这张图,说明了Pulsar和BookKeeper的交互方式。

百万topic消息队列——Apache Puslar

image-20210724224326622

Ledger

Ledger是Append-only的数据结构,它具备一个写入器,并分配给BookKeeper存储节点或者bookies。Ledger中的条目(Entries)被复制到多个Broker。Ledger具备有非常简单的语义:

  • Broker可以创建Ledger,将新的Entries添加到Ledger,还可以关闭Ledger。
  • 在Ledger关闭后(显示关闭或者crash),Ledger只能以只读的方式打开。
  • 当不再需要Ledger中的Entries时,可以从系统中删除整个Ledger。

读一致性保障

BookKeeper可以在出现故障时,保证Ledger读取的数据是一致的(Kafka是通过水位线来保证数据的一致性)。由于Ledger中只有一个写入器,是单进程写入,该进程可以非常高效地写入追加Entries,而不需要考虑并发写入锁的问题。一旦出现故障,Ledger将会执行一个Recovery进程,该进程会确定Ledger的最终状态,将状态设置为预写日志的最新Entries。然后,所有的消费者都能够看到完全一样的数据。

Managed Ledger

Pulsar中提供了一个基于BookKeeper Ledger开发的库,称为Managed Ledger。它代表的是单个主题的存储层。Managed Ledger是对消息流的抽象,其中一个写入器不断在流末尾追加数据,同时,可以由多个游标来消费流中的数据,每个游标对应都有自己的位置。

而在内部,一个Managed Ledger使用多个BookKeeper Ledger来存储数据。使用多个Ledger的原因为:

  • 当出现故障时,Ledger不再可写,需要重新创建一个新的Ledger。前面介绍BookKeeper提到过,一旦Ledger closed,不管是正常关闭,或者应用crash,都会创建一个新的Ledger。
  • 多个文件可以允许Ledger滚动,方便数据的清理。例如:当一个cursor已经完成了Ledger的消费,那么就可以删除该Ledger了。

Journal Storage

前面,我们提到了BookKeeper是一种基于WAL的存储系统。这些WAL日志就包含了BookKeeper的写入事务日志。在将数据更新到Ledger之前,Broker必须要先确保数据能够将事务日志写入到持久化存储。一旦Bookies启动,或者是就的日志大小到达阈值,就会创建一个新的WAL日志文件。

Pulsar代理

Pulsar代理是一种客户端连接到Pulsar的方式。在某些情况下,例如:生产环境的网络不能被外部访问,只能通过一个跳板机访问。此时,客户端是无法直接连接到Broker的。此时,就可以通过Pulsar代理来访问。它相当于是一个用于访问集群中所有Broker的单一网关。所有的客户端都会与Pulsar Proxy连接,而不是与Broker直连。

服务发现

Pulsar提供了一个内置的服务发现机制,客户端可以通过连接单一的URL来与整个Pulsar示例通信。当然,我们也可以配置自己的服务发现系统。要实现自己的服务发现,只需要保证:当客户端访问http://host:8080执行HTTP请求时,客户端需要重定向到活动的broker即可,不管是通过DNS、HTTP或者IP重定向。

Pulsar客户端

Pulsar的客户端支持多种语言,Client实现的背后,还支持透明的重新连接、failover、消息排队、以及试探法,例如:带有退避的连接重试。

Pulsar客户端连接步骤

在应用程序创建Producer或者是Consumer之前,Pulsar客户端需要进行两个步骤:

  1. 客户端会尝试向Proxy发送HTTP请求,以确定topic的对应的broker。该请求最终会被一个broker响应,并通过查看zookeeper,已确定谁为topic提供服务,如果没有broker为topic提供服务,它会尝试将请求分配给负载最少的broker。
  2. 一旦客户端拥有了Broker的有效地址,就会创建一个TCP连接(或者重用池中的连接),并进行身份验证。

而每当TCP连接终端时,客户端会立即重新启动连接,并尝试重新创建生产者或者消费者,直到操作成功。

读取接口

在Pulsar中,标准的消费者会监听主题、处理传入的消息,并在处理消息后确认消息。而每当创建新的消费者时,默认都会从Topic的尾部开始消费。而当消费者使用之前的订阅信息消费topic的数据时,会从未被确认的最早消息开始读取。通过消费者接口,订阅游标由Pulsar自己管理。

当然,读取接口也允许应用程序自己管理Cursor。当使用Reader读取Topic中的数据时,我们可以指定Reader从何处开始读取。例如:我们可以指定从earliest、latest开始消费,或者我们可以直接指定Message ID开始消费。通过Reader接口,可以实现一次性语义。

与Subscription/Consumer不同,Reader是非持久的,也不会组织topic中的数据被删除,官方推荐我们配置数据保留。如果topic的数据保留时间没有配置足够长的时间,那么Reader未读取的消息可能会被删除。这就有可能会导致数据丢失。而配置了保留一定的时间,就可以保证Reader有足够的时间来处理消息。

在Pulsar中,有两套读取topic数据的接口。一套是Consumer Interface、一套是Reader Interface。

百万topic消息队列——Apache Puslar

image-20210725002851482

这两套接口非常类似于Kafka的High-level API和Low-level API。其中Consumer API会自动管理Topic的cursor(游标),而Reader API是手动控制Topic的cursor。

百万topic消息队列——Apache Puslar

image-20210725003110587

大家看到了吗?Reader是由应用程序指定,从哪个位置读取数据。

Topic Compaction

Pulsar主题可以根据需要持久化尽可能多的未确认消息,同时保留消息的顺序。默认情况,Pulsar会存储一个主题上产生的所有未确认、未处理的消息。这和Kafka不一样,Kafka一旦到达配置的阈值(时间、或者大小)就会开始清理数据。如果消费者数据存在严重积压,就会出现数据丢失,造成应用程序退出。Pulsar也提供了topic compation功能,会减少相同key存储的最新消息。这个操作和Kafka类似。

Pulsar schema

Schema注册表

我们之前接触的大部分消息队列都是无Schema的。例如:很多时候,我们会将消息以JSON文本的方式写入到Kafka分区中,然后我们通过流处理程序解析schema。这个过程中,如果数据中出现问题,我们只有到流程序中才能发现,所以Kafka的schema非常简单,就是key-value。Pulsar将类型安全引入了进来。在Pulsar中,可以基于客户端、或者服务端来注册schema。

客户端方法

生产者和消费者不仅仅是负责序列化和反序列化消息,还需要指定哪个主题需要传输什么样类型的消息。客户端方法就是将schema注册交给应用程序。应用程序在创建消息的时候,就将类型信息直接写入到消息中。这样,当消费消息时,就可以根据消息中的schema来解析数据了。这种方式是比较灵活的,是约定式的,而不是强制的。

服务端方法

服务端方法就是schema是在服务器端定义的。生产者和消费者可以告诉Pulsar,某个主题应该使用什么样的类型。这种方式,Pulsar会强制执行schema检查,可以确保生产者和消费者的schema是保持同步的。Pulsar有个内置的模式注册表。客户端在主题的基础上上传数据的schema,这些schema决定了,哪些数据类型对于Pulsar是有效的。

Pulsar Functions

我们知道Kafka有一个不太流行的Kafka Streaming。而,Pulsar也推出了自己的流处理疫情——Pulsar Functions,它是一个轻量级的计算框架。它做的事情非常简单:

  1. 从一个或者多个Pulsar topic中消费消息。
  2. 对于每条消息执行用户自定义的逻辑。
  3. 将执行结果发布到另一个主题。

其实,现在对于我们来说,一般开发一些流应用,一般会选择像Strucutred Streaming、Flink这样的流计算平台。基于这些框架开发出来的应用,我们也会单独部署。而和Kafka Streaming一样,Pulsar Functions是不需要额外部署的。

Pular IO Connector

Pulsar IO Connector可以让我们很容易实现与外部系统的导入导出。Pulsar主要有两种连接器,一种是source,另一种是sink。

百万topic消息队列——Apache Puslar

image-20210725104017682

Pulsar SQL

前面,我们看到了Pulsar可以指定topic的schema。我们可以在Pulsar中存储有schema的数据,然后使用Trino来查询数据(通过Presto SQL)来实现。

百万topic消息队列——Apache Puslar

image-20210725104337409

Presto SQL连接器可以直接从BookKeeper中读取数据。

百万topic消息队列——Apache Puslar

image-20210725104532756

关于Pulsar的读音

国内有不少人读音是:帕萨。但看标准读音不应该发这个音,因为 / l / 这个字母是要发音的。所以,正确的发音应该是:剖萨。大家也可以听一下国外的伙伴是怎么读的。
https://www.youtube.com/watch?v=JzExXJYiGvY&t=15s

百万topic消息队列——Apache Puslar

image-20210724121834121

本地安装Pulsar

如果要进行本地开发、测试,我们可以在我们本机上以Standalone方式来运行Pulsar。Standalone模式的Pulsar需要部署以下几个组件:

  • Pulsar Broker
  • ZooKeeper
  • BookKeeper

这几个组件都需要运行在JVM之上。

创建pulsar用户

useradd pulsar
usermod pulsar -G hadoop

下载并解压pulsar

当前,我们基于Pulsar的2.8版本。

wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz

[pulsar@ha-node1 ~]$ ll
总用量 323052
-rw-r--r-- 1 root root 330804999 7月  25 11:15 apache-pulsar-2.8.0-bin.tar.gz

[pulsar@ha-node1 ~]$ tar -xvzf apache-pulsar-2.8.0-bin.tar.gz -C /opt/

[pulsar@ha-node1 ~]$ ln -s /opt/apache-pulsar-2.8.0/ /opt/pulsar

安装目录结构

[pulsar@ha-node1 pulsar]$ ll
总用量 72
drwxr-xr-x 3 pulsar pulsar   225 1月  22 2020 bin
drwxr-xr-x 5 pulsar pulsar  4096 1月  22 2020 conf
drwxrwxr-x 3 pulsar pulsar   132 7月  25 11:16 examples
drwxrwxr-x 4 pulsar pulsar    66 7月  25 11:16 instances
drwxrwxr-x 3 pulsar pulsar 16384 7月  25 11:16 lib
-rw-r--r-- 1 pulsar pulsar 31639 1月  22 2020 LICENSE
drwxr-xr-x 2 pulsar pulsar  4096 1月  22 2020 licenses
-rw-r--r-- 1 pulsar pulsar  6612 1月  22 2020 NOTICE
-rw-r--r-- 1 pulsar pulsar  1269 1月  22 2020 README

解压完后,我们可以在Pulsar的安装目录中看到以下结构:

  • bin:包含了Pulsar的命令行工具。
  • conf:包含了Pulsar的配置文件,包括了broker配置、zk配置以及其他。
  • examples:包含了Pulsar Functions的示例。
  • lib:Pulsar使用的JAR包。

而我们一旦启动了Pulsar,会自动创建以下几个目录:

  • data:用于存储bk和zk的数据。
  • instances:供Pulsar Functions使用。
  • logs:日志文件。

启动与关闭

以应用程序方式启动

# 启动standalone pulsar
bin/pulsar standalone

当我们在控制台中看到以下输出,表示启动成功。

1:18:57.475 [main] INFO  org.apache.pulsar.websocket.WebSocketService - Pulsar WebSocket Service started

如果要关闭pulsar standalone,直接ctrl + c即可。

以守护进程方式启动

# 启动
bin/pulsar-daemon start standalone

# 关闭
bin/pulsar-daemon stop standalone

生产和消费消息

Pulsar提供了一个CLI工具,让我们生产和消费Pulsar集群中的消息。

消费消息

# -s表示指定消费者的名字
bin/pulsar-client consume test-topic -s "consumer1"

如果Pulsar中没有test-topic,会自动创建一个。

生产消息

# 往pulsar中的test-topic主题,发送一条消息
bin/pulsar-client produce test-topic --messages "hello-pulsar"

当我们生产数据后,可以看到消费者中打印了以下消息。

----- got message -----
key:[null], properties:[], content:hello-pulsar
11:28:59.965 [main] INFO  org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
11:28:59.997 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test-topic] [consumer1] Closed consumer
11:29:00.003 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0x79b909a6, L:/127.0.0.1:22369 ! R:localhost/127.0.0.1:6650] Disconnected
11:29:02.017 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully consumed

好了,我们看到了,这个Standalone模式,我们没有配置任何组件,它自动帮助我们启动了内置的ZooKeeper、以及BookKeeper。然后我们就可以使用它了。这种方式显然不能用于生产环境。在生产环境中,我们一般都会提供独立的ZooKeeper集群、以及BookKeeper集群。

使用Pulsar

CLI工具

CLI工具是最基本的工具,我们经常会使用到。

pulsar命令

我们前面启动、关闭pulsar,都是使用pulsar命令。它就是专门用来在前台启动pulsar的各种组件的。例如:bookies和zookeeper,如果要启动在后台,则可以使用pulsar-daemon。更多命令请参考:
https://pulsar.apache.org/docs/en/reference-cli-tools/

pulsar-client

前面我们就是用pulsar-client来启动生产者和消费者的。

pulsar-admin

我们可以通过pulsar-admin来管理pulsar集群。

查看pulsar集群列表

bin/pulsar-admin clusters list

[pulsar@ha-node1 pulsar]$ bin/pulsar-admin clusters list
"standalone"

可以看到,当前就启动了一个名为standalone的集群。通过clusters子命令,我们可以创建、删除pulsar集群。

查看租户

bin/pulsar-admin tenants list

[pulsar@ha-node1 pulsar]$ bin/pulsar-admin tenants list
"public"
"pulsar"
"sample"

当前有三个租户。我们也自己来创建、删除租户。

查看brokers

# 查看当前活动的broker
[pulsar@ha-node1 pulsar]$ bin/pulsar-admin brokers list use
"localhost:8080"

# 查看当前的Leader broker
[pulsar@ha-node1 pulsar]$ bin/pulsar-admin brokers leader-broker
{
  "serviceUrl" : "http://localhost:8080"
}

查看指定broker的Namespace

bin/pulsar-admin brokers namespaces use 
--url localhost:8080

> --url localhost:8080
"public/default/0x00000000_0x40000000    [broker_assignment=shared is_controlled=false is_active=true]"
"public/functions/0x40000000_0x80000000    [broker_assignment=shared is_controlled=false is_active=true]"
"pulsar/standalone/localhost:8080/0x00000000_0xffffffff    [broker_assignment=shared is_controlled=false is_active=true]"

Pulsar中的namespace是组织topic的逻辑分组。我们看到,上述命令的输出中,我们看到localhost:8080的broker中有三个namespace。注意,namespace是必须包含在租户下的。

namespace

查看pulsar租户下的所有namespace。

bin/pulsar-admin namespaces list pulsar
"pulsar/system"

bin/pulsar-admin namespaces list public
"public/default"
"public/functions"

可以看到,pulsar租户只有一个system命名空间,而public租户有default和functions命名空间。我们还可以通过namepsaces其他操作来创建、删除命名空间,还可以配置命名空间的积压策略、持久化策略、TTL、保留策略、限流策略等。

topics

Pulsar中topic可以分为:持久化的、和非持久化的。我们先来查看下当前pulsar实例下的system名称空间的所有topic:

bin/pulsar-admin topics list public/default
"persistent://public/default/test-topic"

我们看到了,我们之前创建的test-topic是一个持久化topic,并且我们当时没有指定命名空间,所以它在public/default名称空间下。如果

删除topic:

bin/pulsar-admin topics delete "persistent://public/default/test-topic"

创建topic:

bin/pulsar-admin topics create persistent://public/default/my-topic

获取topic信息:

bin/pulsar-admin topics stats persistent://public/default/my-topic

{
  "count" : 0,
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 0,
  "msgInCounter" : 0,
  "bytesOutCounter" : 0,
  "msgOutCounter" : 0,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 0,
  "backlogSize" : 0,
  "offloadedStorageSize" : 0,
  "publishers" : [ ],
  "waitingPublishers" : 0,
  "subscriptions" : { },
  "replication" : { },
  "deduplicationStatus" : "Disabled",
  "nonContiguousDeletedMessagesRanges" : 0,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 0
}

当前创建出来的topic是一个单分区的topic。

创建多分区的topic:

# 创建一个带有3个分区的topic
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/partitioned-topic --partitions 3

# 获取topic的分区数量
bin/pulsar-admin topics get-partitioned-topic-metadata persistent://public/default/partitioned-topic
{
  "partitions" : 3
}

# 查看分区状态
bin/pulsar-admin topics partitioned-stats persistent://public/default/partitioned-topic

API

接下来,我通过Scala来编写代码操作Pulsar,其实就是通过代码往Pulsar中生产、和消费消息。API文档,请参考:
https://pulsar.apache.org/docs/en/client-libraries-java/。

Pulsar原生API

导入依赖

    <properties>
        <pulsar.version>2.8.0</pulsar.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>${pulsar.version}</version>
        </dependency>
    </dependencies>

配置对象

object PulsarConf {
    // Pulsar的连接URL,如果有多个broker使用逗号分隔
    val pulsarConnectionUrl = "pulsar://ha-node1:6650"

    // Topic名称
    val topicName = "persistent://public/default/partitioned-topic"
}

消费者

object PulsarConsumerDemo {
    def main(args: Array[String]): Unit = {

        import PulsarConf._

        // 创建客户端对象
        val client = PulsarClient.builder()
            .serviceUrl(pulsarConnectionUrl)
            .build();

        // 创建消费者并订阅主题
        val consumer = client.newConsumer()
            .topic(topicName)
            .subscriptionName("subscription-1")
            .subscribe()

        // 消费消息
        while(true) {
            val message = consumer.receive()
            val key = new String(message.getKey)
            val value = new String(message.getValue)
            // 打印并确认消息
            println(s"收到消息:key = ${key}, value = ${value}")
            consumer.acknowledge(message)
        }
    }
}

生产者

object PulsarProducerDemo {
    def main(args: Array[String]): Unit = {

        import PulsarConf._

        // 创建客户端对象
        val client = PulsarClient.builder()
            .serviceUrl(pulsarConnectionUrl)
            .build();

        // 创建生产者
        val producer = client.newProducer(Schema.STRING)
            .topic(topicName)
            .create()

        // 发送消息
        (1 to 10).foreach { id =>
            producer.newMessage()
                .key(s"$id")
                .value(s"[${new Date()} MSG:${id}] This is a test!")
                .send()
        }

        // 关闭生产者、客户端连接
        producer.close()
        client.close()
    }
}

先启动消费者,然后再启动生产者,就可以看到打印出来了消费到的消息了。

Kafka API Wrapper

看到这个,大家应该明白啥意思了。Pulsar可以说是很贴心了,为了方便迁移Kafka应用,Pulsar专门设计了这样一套API。也就是说,我们可以使用之前的Kafka API来操作Pulsar。文档在:
https://pulsar.apache.org/docs/en/adaptors-kafka/,大家可自行参考。

Routing Mode(路由模式)

前面我们使用了生产者来发送消息到topic。我们原先在使用Kafka的时候,Kafka发送消息有几种策略,例如:可以使用轮询方式、也可以使用hash方式分区、或者指定到分区等。同样,在Pulsar中也有对应的概念,它叫Routing Mode。一共有三种:

  • RoundRobinPartition:这种是默认的方式,如果没有提供key,那么生产者会轮询发送消息到不同的分区。而如果设置了key,就会按照key的hash值将消费分布到不同的分区中。
  • SinglePartition:如果没有提供key值,生产者会随机选择分区,然后将消息发送到分区中。
  • CustomPartition:自定义分区,实现MessageRouter来进行分区。

代码如下:

Producer<byte[]> producer = pulsarClient.newProducer()
        .topic(topic)
        .messageRoutingMode(MessageRoutingMode.SinglePartition)
        .create();

Pulsar SQL

我们可以直接使用Pulsar中内嵌的Presto直接查询Pulsar中的数据。

启动Pulsar SQL

nohup ./bin/pulsar sql-worker run 1>/dev/null 2>&1 &

./bin/pulsar sql
presto> 

执行SQL查询

# 查看pulsar中的catalog
show catalogs;
 Catalog 
---------
 pulsar  
 system

# 查看catalog中的schema
presto> show schemas in pulsar;
       Schema       
--------------------
 information_schema 
 public/default     
 public/functions   
 pulsar/system      
 sample/ns1         
(5 rows)

# 查看schema中的表
show tables in pulsar."public/default";
presto> show tables in pulsar."public/default";
       Table       
-------------------
 partitioned-topic 
(1 row)

查看数据:

select * from pulsar."public/default"."partitioned-topic" limit 10;
百万topic消息队列——Apache Puslar

image-20210725165447354

按照时间排序:

select * from pulsar."public/default"."partitioned-topic" order by __publish_time__ desc limit 10;

查询指定分区的数据:

select * from pulsar."public/default"."partitioned-topic" where __partition__ = 0 limit 10;

获取分区的数据量:

select __partition__, count(1) from pulsar."public/default"."partitioned-topic" group by __partition__;

 __partition__ | _col1 
---------------+-------
             2 |    11 
             1 |    15 
             0 |    11 
(3 rows)

很方便。

关于流式引擎的选择

Pulsar官方提供了对Spark Streaming的支持。我看了下Stream Native对Pulsar流处理的支持情况,Pulsar对Flink的扩展做得更全面些。我截下github的支持情况。

百万topic消息队列——Apache Puslar

image-20210725182719975

对于Spark的支持,我们需要对scala 2.12、spark 3.x进行重新编译。

百万topic消息队列——Apache Puslar

image-20210725182750135

后续,如果用到了再来测试吧,先mark。

好了,本次是Pulsar的基本体验。我们后续如果要使用Pulsar,不一定会将现有的Kafka替换掉,因为影响面积太大了,我们也很难在短时间内完成如此庞大的工作。但对于大数据应用来说,Pulsar可以作为一个很好的管道来使用。

内容出处:,

声明:本网站所收集的部分公开资料来源于互联网,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。文章链接:http://www.yixao.com/procedure/26303.html

发表评论

登录后才能评论