On this page
article
8.Go-Kafka
Go-Kafka使用
1.介绍
Kafka 是⼀种分布式流处理平台,最初由 LinkedIn 公司开发,现在由 Apache 软件基⾦会维护。它被设计⽤于⾼吞吐量、低延迟的数据传输,并具有可扩展性和容错性。 Kafka 的核⼼概念是消息传递系统,它通过将数据以消息的形式进⾏发布和订阅来实现数据流的处理。以下是 Kafka 的⼀些关键特点和概念:
- 消息发布和订阅:Kafka 使⽤发布-订阅模型,消息发布者将消息发送到⼀个或多个主题(Topics),⽽消息订阅者可以从这些主题中获取消息。
- 分布式架构:Kafka 是⼀个分布式系统,可以在多个节点上部署,实现数据的分区和复制,以实现⾼可⽤性和可扩展性。
- 消息持久化:Kafka 使⽤⽇志(Log)的⽅式持久化消息,每个主题的消息以追加的⽅式写⼊分区中,并且消息会在⼀段时间后根据配置进⾏保留。
- 分区和复制:Kafka 将每个主题分成⼀个或多个分区(Partitions),每个分区可以在多个节点上进⾏复制,以实现负载均衡和容错性。
- ⾼吞吐量:Kafka 具有⾮常⾼的吞吐量和低延迟,能够处理⼤量的消息流,并保持较低的延迟。
- 实时数据处理:Kafka 可以与流处理框架(如 Apache Spark、Apache Flink)集成,⽀持实时的数据处理和流式分析。
2.常⽤场景
- 数据流集成和数据管道:Kafka 可以作为数据流的中间件,⽤于连接不同的数据系统和应⽤程序。它可以集成多个数据源和数据⽬的地,实现可靠的数据传输和流⽔线处理。
- ⽇志收集和集中式⽇志管理:Kafka 可以⽤作⽇志收集系统,收集和存储分布在多个应⽤程序和服务器上的⽇志数据。它提供⾼吞吐量和持久化存储,⽀持实时的⽇志数据处理和分析。
- 事件驱动架构:Kafka 的发布-订阅模型使其成为构建事件驱动架构的理想选择。应⽤程序可以将事件发布到 Kafka 主题,然后其他应⽤程序可以订阅并处理这些事件,实现松耦合、可扩展和实时的事件驱动架构。
- 流式处理和实时分析:Kafka 可以与流处理框架(如 Apache Spark、Apache Flink)集成,⽀持实时的流式数据处理和复杂的实时分析。通过将数据流导⼊Kafka 主题,应⽤程序可以实时处理和分析数据,实现实时的洞察和决策。
- 异步消息队列:Kafka 可以⽤作⾼吞吐量的异步消息队列,实现应⽤程序之间的解耦和异步通信。应⽤程序可以将消息发送到 Kafka 主题,并异步处理这些消息,提⾼系统的可伸缩性和弹性。
- 数据备份和容错性:Kafka ⽀持数据的分区和复制,可以在多个节点上复制和存储数据,以实现数据的冗余备份和容错性。
3.Go三⽅库
github.com/IBM/sarama
: 是⼀个 Go 语⾔编写的 Apache Kafka 客户端库,⽤于与 Kafka 集群进⾏交互。它提供了丰富的功能和易于使⽤的 API,使开发⼈员能够⽅便地使⽤ Go 语⾔来⽣产和消费 Kafka 消息。- Github链接:https://github.com/IBM/sarama
4.Kafka使⽤
4.1 安装依赖包
go get github.com/IBM/sarama
4.2 初始化Client
package main
import (
"fmt"
"log"
"strconv"
"time"
"github.com/IBM/sarama"
)
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化sarama客户端报错%s", err.Error())
return
}
defer client.Close()
//其他操作,如创建生产者或消费者
fmt.Println("Client initialized successfully!")
}
- 生产者
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
//处理生产者创建错误
return
}
-消费者
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
//处理消费者创建错误
return
}
4.3初始化参数生产者
参数 | 示例 | 描述 |
---|---|---|
sarama.MaxRequestSize | 100 * 1024 * 1024 | 请求最大大小,默认100MB,可以调整,写入大于100MB的消息会直接报错 |
sarama.MaxResponseSize | 100 * 1024 * 1024 | 响应最大大小,默认100MB,可以调整,获取大于100MB的消息会直接报错 |
config.Producer.RequiredAcks | sarama.WaitForLocal(1) | 默认值为sarama.WaitForLocal(1) |
config.Producer.Retry.Max | 3 | 生产者重试的最大次数,默认为3 |
config.Producer.Retry.Backoff | 100 * time.Millisecond | 生产者重试之间的等待时间,默认为100毫秒 |
config.Producer.Return.Successes | false | 是否返回成功的消息,默认为false |
config.Producer.Return.Errors | true | 返回失败的消息,默认值为true |
config.Producer.Compression | CompressionNone | 对消息是否压缩后发送,默认CompressionNone不压缩 |
config.Producer.CompressionLevel | CompressionLevelDefault | 指定压缩等级,在配置了压缩算法后生效 |
config.Producer.Flush.Frequency | 0 | producer缓存消息的时间, 默认缓存0毫秒 |
config.Producer.Flush.Bytes | 0 | 达到多少字节时,触发一次broker请求,默认为0,直接发送,存在天然上限值MaxRequestSize |
config.Producer.Flush.Messages | 0 | 达到多少条消息时,强制,触发一次broker请求,这个是上限值,MaxMessages < Messages |
config.Producer.Flush.MaxMessages | 0 | 最大缓存多少消息,默认为0,有消息立刻发送,MaxMessages设置大于0时,必须设置 Messages,且需要保证:MaxMessages > Messages |
config.Producer.Timeout | 5 * time.Second | 超时时间 |
config.Producer.Idempotent | false | 是否需要幂等,默认false |
config.Producer.Transaction.Timeout | 1 * time.Minute | 事务超时时间默认1分钟 |
config.Producer.Transaction.Retry.Max | 50 | 事务重试时间 |
config.Producer.Transaction.Retry.Backoff | 100 * time.Millisecond | 事务重试间隔时间 |
config.Net.MaxOpenRequests | 5 | 默认值5,一次发送请求的数量 |
config.Producer.Transaction.ID | “test” | 事务ID |
config.ClientID | “your-client-id” | 客户端ID |
4.4初始化参数消费者
配置项 | 示例 | 描述 |
---|---|---|
config.Consumer.Group.Rebalance.Strategy | sarama.NewBalanceStrategyRange | 消费者分配分区的默认方式 |
config.Consumer.Offsets.Initial | sarama.OffsetNewest | 在没有提交位点情况下,使用最新的位点还是最老的位点,默认是最新的消息位点 |
config.Consumer.Offsets.AutoCommit.Enable | true | 是否支持自动提交位点,默认支持 |
config.Consumer.Offsets.AutoCommit.Interval | 1 * time.Second | 自动提交位点时间间隔,默认1s |
config.Consumer.MaxWaitTime | 250 * time.Millisecond | 在没有最新消费消息时候,客户端等待的时间,默认250ms |
config.Consumer.MaxProcessingTime | 100 * time.Millisecond | |
config.Consumer.Fetch.Min | 1 | 消费请求中获取的最小消息字节数,Broker将等待至少这么多字节的消息然后返回。默认值为1,不能设置0,因为0会导致在没有消息可用时消费者空转 |
config.Consumer.Fetch.Max | 0 | 消费请求最大的字节数。默认为0,表示不限制 |
config.Consumer.Fetch.Default | 1024 * 1024 | 消费请求的默认消息字节数(默认为1MB),需要大于实例的大部分消息,否则Broker会花费大量时间计算消费数据是否达到这个值的条件 |
config.Consumer.Return.Errors | true | |
config.Consumer.Group.Rebalance.Timeout | 60 * time.Second | 设置rebalance操作的超时时间,默认60s |
config.Consumer.Group.Session.Timeout | 10 * time.Second | 设置消费者组会话的超时时间为,默认为10s |
config.Consumer.Group.Heartbeat.Interval | 3 * time.Second | 心跳超时时间,默认为3s |
config.Consumer.MaxProcessingTime | 100 * time.Millisecond | 消息处理的超时时间,默认100ms |
5.常用的Topic 的增删查
5.1创建Topic
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化sarama客户端报错%s", err.Error())
return
}
defer client.Close()
//创建Topic,⾸先⽣成admin client
admin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
fmt.Printf("初始化saramaAdmin客户端报错%s", err.Error())
return
}
defer admin.Close()
//设置分区以及副本
topicDetail := &sarama.TopicDetail{
NumPartitions: 2,
ReplicationFactor: 1,
}
//创建Topic
err = admin.CreateTopic("fei-topic", topicDetail, false)
if err != nil {
fmt.Printf("创建Topic报错%s", err.Error())
return
}
}
5.2查看所有Topic
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化sarama客户端报错%s", err.Error())
return
}
defer client.Close()
//创建Topic,⾸先⽣成admin client
admin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
fmt.Printf("初始化saramaAdmin客户端报错%s", err.Error())
return
}
defer admin.Close()
topics, err := admin.ListTopics()
if err != nil {
fmt.Printf("查看Topic报错%s", err.Error())
return
}
fmt.Println(topics)
}
fei@feideMacBook-Pro admin % go run admin.go
map[fei-topic:{2 1 map[0:[1001] 1:[1001]] map[segment.bytes:0xc00021c250]}]
5.3查看Topic详情
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化sarama客户端报错%s", err.Error())
return
}
defer client.Close()
//创建Topic,⾸先⽣成admin client
admin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
fmt.Printf("初始化saramaAdmin客户端报错%s", err.Error())
return
}
defer admin.Close()
metadata, err := admin.DescribeTopics([]string{"fei-topic"})
if err != nil {
fmt.Printf("查看Topic详细报错%s", err.Error())
}
for _, partition := range metadata {
fmt.Println(partition)
}
}
fei@feideMacBook-Pro admin % go run admin.go
&{7 kafka server: Not an error, why are you printing me? fei-topic AAAAAAAAAAAAAAAAAAAAAA false [0xc000180660 0xc0001806c0] 0}
5.4查找指定主题和分区的偏移量
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
kaClient, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化客户端报错%s", err.Error())
}
//查看最老和最新offset
offsetOld0, _ := kaClient.GetOffset("fei-topic", 0, sarama.OffsetOldest)
OffsetNew0, err := kaClient.GetOffset("fei-topic", 0, sarama.OffsetNewest)
offsetOld1, _ := kaClient.GetOffset("fei-topic", 1, sarama.OffsetOldest)
OffsetNew1, _ := kaClient.GetOffset("fei-topic", 1, sarama.OffsetNewest)
fmt.Printf("分区%v,OffsetOldest%d\n", 0, offsetOld0)
fmt.Printf("分区%v,OffsetNewest%d\n", 0, OffsetNew0)
fmt.Printf("分区%v,OffsetOldest%d\n", 1, offsetOld1)
fmt.Printf("分区%v,OffsetNewest%d\n", 1, OffsetNew1)
}
fei@feideMacBook-Pro admin % go run admin.go
分区0,OffsetOldest0
分区0,OffsetNewest0
分区1,OffsetOldest0
分区1,OffsetNewest0
5.5获取单分区的消息偏移量
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化客户端报错%s", err.Error())
}
//查找指定分区的消费位移
consumer, _ := sarama.NewConsumerFromClient(client)
defer consumer.Close()
//获取分区消费者
partitionConsumer, _ := consumer.ConsumePartition("fei-topic", 0, 0)
defer partitionConsumer.Close()
//获取消费者最新消费位移
offset := partitionConsumer.HighWaterMarkOffset()
fmt.Printf("Partition 0 Offset: %d\n", offset)
}
fei@feideMacBook-Pro admin % go run admin.go
Partition 0 Offset: 16
//这里由于向分区中生产了16条消息 所以这里显示offset为16。和5.4效果差不多,只是调用的方式和对象略有不同。
5.6获取消费者组消费情况
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
kaClient, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化客户端报错%s", err.Error())
}
consumer_groups := []string{"fei-group"}
partitions := []int32{0, 1}
for _, consumer_group := range consumer_groups {
// 获取指定消费组在指定 topic 和 partition 的 offset
offsetManager, err := sarama.NewOffsetManagerFromClient(consumer_group, kaClient)
if err != nil {
fmt.Println("Error creating offset manager:", err)
return
}
defer offsetManager.Close()
// 获取指定 topic 和 partition 的 offset
for _, partition := range partitions {
partitionOffset, _ := offsetManager.ManagePartition("fei-topic", partition)
offset, _ := partitionOffset.NextOffset()
fmt.Printf("Consumer Group offset for topic '%s', partition %v:%v\n", consumer_group, partition, offset)
}
}
}
fei@feideMacBook-Pro meta % go run query.go
Consumer Group offset for topic 'fei-group', partition 0:23
Consumer Group offset for topic 'fei-group', partition 1:27
6.生产者
6.1同步生产消息
package main
import (
"fmt"
"github.com/IBM/sarama"
"log"
)
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化sarama客户端报错%s", err.Error())
return
}
defer client.Close()
SyncProducer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
log.Fatal("Failed to cretae sync producer:", err)
}
defer SyncProducer.Close()
// 指定要发送的主题和消息内容
topic := "fei-topic"
massage := "Hello fei"
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(massage),
}
//发送消息
partition, offset, err := SyncProducer.SendMessage(msg)
if err != nil {
log.Println("Failed to send message: ", err)
} else {
log.Printf("Message sent successfully! Partition=%d,Offset=%d\n", partition, offset)
}
}
fei@feideMacBook-Pro producer % go run main.go
2024/11/10 03:17:34 Message sent successfully! Partition=0,Offset=20
fei@feideMacBook-Pro producer % go run main.go
2024/11/10 03:17:50 Message sent successfully! Partition=1,Offset=24
6.2异步生产消息
package main
import (
"fmt"
"log"
"strconv"
"time"
"github.com/IBM/sarama"
)
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
prClient, err := sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化异步生产者客户端报错%s", err.Error())
}
defer prClient.Close()
for i := 0; i <= 10; i++ {
//指定要发送的主题和消息
topic := "fei-topic"
message := "Hello,Kafka" + strconv.Itoa(i)
//创建消息对象
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(message),
}
prClient.Input() <- msg
select {
case success := <-prClient.Successes():
log.Printf("Msg sent successfully!,Partition:%d,Offset:%d", success.Partition, success.Offset)
case err := <-prClient.Errors():
log.Printf("Failed to send Msg:%s", err.Error())
case <-time.After(5 * time.Second):
log.Printf("Timeout reached. Failed to send Msg")
}
}
}
fei@feideMacBook-Pro producer % go run producer.go
2024/11/10 03:16:49 Msg sent successfully!,Partition:0,Offset:16
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:17
2024/11/10 03:16:49 Msg sent successfully!,Partition:0,Offset:17
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:18
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:19
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:20
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:21
2024/11/10 03:16:49 Msg sent successfully!,Partition:0,Offset:18
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:22
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:23
2024/11/10 03:16:49 Msg sent successfully!,Partition:0,Offset:19
6.3消息设置
// 设置消息的键(Key)
msg.Key = sarama.StringEncoder("my-key")
// 设置消息的分区(Partition)
msg.Partition = 0
// 设置消息的时间戳(Timestamp)
msg.Timestamp = time.Now()
// 设置消息的头部(Headers)
msg.Headers = []sarama.RecordHeader{
{Key: []byte("header-key"), Value: []byte("header-value")},
}
7.消费者
7.1单分区消费者
package main
import (
"fmt"
"github.com/IBM/sarama"
"sync"
)
var wg = sync.WaitGroup{}
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化客户端报错%s", err.Error())
}
defer client.Close()
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
fmt.Printf("初始化消费者客户端报错%s", err.Error())
}
defer consumer.Close()
topic := "fei-topic"
partition := int32(0) //指定消费的分区
offset := int64(0) //消费的起始偏移量
//根据指定分区和偏移量创建分区消费者
consumePartition, err := consumer.ConsumePartition(topic, partition, offset)
if err != nil {
fmt.Printf("创建分区消费者报错%s", err.Error())
}
defer consumePartition.Close()
//处理收到的消息
wg.Add(1)
go func() {
for msg := range consumePartition.Messages() {
fmt.Printf("Received msg: Topic=%s,Partition%d,Offsed=%d,Value%s\n",
msg.Topic, msg.Partition, msg.Offset, msg.Value)
}
wg.Done()
}()
wg.Wait()
}
fei@feideMacBook-Pro consumer % go run main.go
Received msg: Topic=fei-topic,Partition0,Offsed=0,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=1,ValueHello,Kafka5
Received msg: Topic=fei-topic,Partition0,Offsed=2,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=3,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition0,Offsed=4,ValueHello,Kafka1
Received msg: Topic=fei-topic,Partition0,Offsed=5,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition0,Offsed=6,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=7,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition0,Offsed=8,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=9,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition0,Offsed=10,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition0,Offsed=11,ValueHello,Kafka3
Received msg: Topic=fei-topic,Partition0,Offsed=12,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=13,ValueHello,Kafka6
Received msg: Topic=fei-topic,Partition0,Offsed=14,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=15,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition0,Offsed=16,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition0,Offsed=17,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition0,Offsed=18,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition0,Offsed=19,ValueHello,Kafka10
Received msg: Topic=fei-topic,Partition0,Offsed=20,ValueHello fei
7.2多分区消费者
package main
import (
"fmt"
"github.com/IBM/sarama"
"sync"
)
var wg = sync.WaitGroup{}
func main() {
//初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("初始化客户端报错%s", err.Error())
}
defer client.Close()
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
fmt.Printf("初始化消费者客户端报错%s", err.Error())
}
defer consumer.Close()
topic := "fei-topic"
partitions := []int32{0, 1} //指定消费的分区
offset := int64(0) //消费的起始偏移量
//处理收到的消息
wg.Add(len(partitions))
for _, partition := range partitions {
go func(p int32) {
defer wg.Done()
//根据指定分区和偏移量创建分区消费者
consumePartition, err := consumer.ConsumePartition(topic, p, offset)
if err != nil {
fmt.Printf("创建分区消费者报错%s", err.Error())
}
defer consumePartition.Close()
for msg := range consumePartition.Messages() {
fmt.Printf("Received msg: Topic=%s,Partition%d,Offsed=%d,Value%s\n",
msg.Topic, msg.Partition, msg.Offset, msg.Value)
}
}(partition)
}
wg.Wait()
}
fei@feideMacBook-Pro consumer % go run main.go
Received msg: Topic=fei-topic,Partition0,Offsed=0,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=1,ValueHello,Kafka5
Received msg: Topic=fei-topic,Partition0,Offsed=2,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=3,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition0,Offsed=4,ValueHello,Kafka1
Received msg: Topic=fei-topic,Partition0,Offsed=5,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition0,Offsed=6,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=7,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition0,Offsed=8,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=9,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition0,Offsed=10,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition0,Offsed=11,ValueHello,Kafka3
Received msg: Topic=fei-topic,Partition0,Offsed=12,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=13,ValueHello,Kafka6
Received msg: Topic=fei-topic,Partition0,Offsed=14,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=15,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition0,Offsed=16,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition1,Offsed=0,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition1,Offsed=1,ValueHello,Kafka1
Received msg: Topic=fei-topic,Partition1,Offsed=2,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition1,Offsed=3,ValueHello,Kafka3
Received msg: Topic=fei-topic,Partition1,Offsed=4,ValueHello,Kafka6
Received msg: Topic=fei-topic,Partition1,Offsed=5,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition0,Offsed=17,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition1,Offsed=6,ValueHello,Kafka10
Received msg: Topic=fei-topic,Partition0,Offsed=18,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition0,Offsed=19,ValueHello,Kafka10
Received msg: Topic=fei-topic,Partition0,Offsed=20,ValueHello fei
Received msg: Topic=fei-topic,Partition1,Offsed=7,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition1,Offsed=8,ValueHello,Kafka3
Received msg: Topic=fei-topic,Partition1,Offsed=9,ValueHello,Kafka5
Received msg: Topic=fei-topic,Partition1,Offsed=10,ValueHello,Kafka6
Received msg: Topic=fei-topic,Partition1,Offsed=11,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition1,Offsed=12,ValueHello,Kafka10
Received msg: Topic=fei-topic,Partition1,Offsed=13,ValueHello,Kafka1
Received msg: Topic=fei-topic,Partition1,Offsed=14,ValueHello,Kafka5
Received msg: Topic=fei-topic,Partition1,Offsed=15,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition1,Offsed=16,ValueHello,Kafka10
Received msg: Topic=fei-topic,Partition1,Offsed=17,ValueHello,Kafka1
Received msg: Topic=fei-topic,Partition1,Offsed=18,ValueHello,Kafka3
Received msg: Topic=fei-topic,Partition1,Offsed=19,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition1,Offsed=20,ValueHello,Kafka5
Received msg: Topic=fei-topic,Partition1,Offsed=21,ValueHello,Kafka6
Received msg: Topic=fei-topic,Partition1,Offsed=22,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition1,Offsed=23,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition1,Offsed=24,ValueHello fei
Received msg: Topic=fei-topic,Partition1,Offsed=25,ValueHello fei
Received msg: Topic=fei-topic,Partition0,Offsed=21,ValueHello fei
7.3消费者组⭐️
- kafka会记录不同消费者组已经消费的offset信息。只需在代码中将消息标记为已消费
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/IBM/sarama"
)
var wg = sync.WaitGroup{}
func main() {
//消费者组初始化客户端
config := sarama.NewConfig()
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Consumer.Offsets.AutoCommit.Enable = true //是否支持自动提交位点,默认支持
config.Consumer.Offsets.AutoCommit.Interval = 10 * time.Second //自动提交位点时间间隔,默认1s
ConsumerGroup, err := sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "fei-group", config)
if err != nil {
log.Fatal("Failed to create consumer group:", err)
}
defer ConsumerGroup.Close()
//指定订阅的主题
topics := []string{"fei-topic"}
//创建一个消费者组处理器
handler := ConsumerGroupHandler{}
ctx := context.Background()
wg.Add(1)
go func() {
for {
err := ConsumerGroup.Consume(ctx, topics, handler)
if err != nil {
log.Fatal("Error form consumer group:", err)
}
}
wg.Done()
}()
wg.Wait()
}
// 自定义消费者组处理器
type ConsumerGroupHandler struct{}
// 实现sarama.ConsumerGroupHandler 接口方法
// Setup方法在消费者组启动时调用,用于进行一些初始化操作
func (h ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
fmt.Println("Consumer group setup")
return nil
}
// Cleanup方法在消费者组结束时调用,用于进行清理操作
func (h ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
fmt.Println("Consumer group cleanup")
return nil
}
// ConsumeClaim方法在消费者组从 Kafka 中获取到新的消息分区时被执行,用于处理当前消费者负责消费的消息分区。
func (h ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("消息内容%s,对应Offset%d\n", string(msg.Value), msg.Offset)
session.MarkMessage(msg, "") //将消息标记为已消费,不加此行会导致重启消费者组之后重复消费
}
return nil
}
/*
ConsumerGroupHandler struct 实现了 sarama.ConsumerGroupHandler 接口方法:
Setup:在消费者组启动时进行初始化操作。
Cleanup:在消费者组结束时进行清理操作。
ConsumeClaim:消费消息,打印消息内容和 offset,并标记消息为已消费。
在37行将结构体传入 err := ConsumerGroup.Consume(ctx, topics, handler)。即用到了多态的特性,因为结构体实现了方法。那么在sarama中执行方法时会执行我们重写的这部分方法中的内容
*/
fei@feideMacBook-Pro consumer % go run consumer.go
Consumer group setup
消息内容Hello fei,对应Offset26
消息内容Hello fei,对应Offset22
Last updated 10 Nov 2024, 04:21 +0800 .