1.介绍

Kafka 是⼀种分布式流处理平台,最初由 LinkedIn 公司开发,现在由 Apache 软件基⾦会维护。它被设计⽤于⾼吞吐量、低延迟的数据传输,并具有可扩展性和容错性。 Kafka 的核⼼概念是消息传递系统,它通过将数据以消息的形式进⾏发布和订阅来实现数据流的处理。以下是 Kafka 的⼀些关键特点和概念:

  • 消息发布和订阅:Kafka 使⽤发布-订阅模型,消息发布者将消息发送到⼀个或多个主题(Topics),⽽消息订阅者可以从这些主题中获取消息。
  • 分布式架构:Kafka 是⼀个分布式系统,可以在多个节点上部署,实现数据的分区和复制,以实现⾼可⽤性和可扩展性。
  • 消息持久化:Kafka 使⽤⽇志(Log)的⽅式持久化消息,每个主题的消息以追加的⽅式写⼊分区中,并且消息会在⼀段时间后根据配置进⾏保留。
  • 分区和复制:Kafka 将每个主题分成⼀个或多个分区(Partitions),每个分区可以在多个节点上进⾏复制,以实现负载均衡和容错性。
  • ⾼吞吐量:Kafka 具有⾮常⾼的吞吐量和低延迟,能够处理⼤量的消息流,并保持较低的延迟。
  • 实时数据处理:Kafka 可以与流处理框架(如 Apache Spark、Apache Flink)集成,⽀持实时的数据处理和流式分析。

2.常⽤场景

  • 数据流集成和数据管道:Kafka 可以作为数据流的中间件,⽤于连接不同的数据系统和应⽤程序。它可以集成多个数据源和数据⽬的地,实现可靠的数据传输和流⽔线处理。
  • ⽇志收集和集中式⽇志管理:Kafka 可以⽤作⽇志收集系统,收集和存储分布在多个应⽤程序和服务器上的⽇志数据。它提供⾼吞吐量和持久化存储,⽀持实时的⽇志数据处理和分析。
  • 事件驱动架构:Kafka 的发布-订阅模型使其成为构建事件驱动架构的理想选择。应⽤程序可以将事件发布到 Kafka 主题,然后其他应⽤程序可以订阅并处理这些事件,实现松耦合、可扩展和实时的事件驱动架构。
  • 流式处理和实时分析:Kafka 可以与流处理框架(如 Apache Spark、Apache Flink)集成,⽀持实时的流式数据处理和复杂的实时分析。通过将数据流导⼊Kafka 主题,应⽤程序可以实时处理和分析数据,实现实时的洞察和决策。
  • 异步消息队列:Kafka 可以⽤作⾼吞吐量的异步消息队列,实现应⽤程序之间的解耦和异步通信。应⽤程序可以将消息发送到 Kafka 主题,并异步处理这些消息,提⾼系统的可伸缩性和弹性。
  • 数据备份和容错性:Kafka ⽀持数据的分区和复制,可以在多个节点上复制和存储数据,以实现数据的冗余备份和容错性。

3.Go三⽅库

  • github.com/IBM/sarama: 是⼀个 Go 语⾔编写的 Apache Kafka 客户端库,⽤于与 Kafka 集群进⾏交互。它提供了丰富的功能和易于使⽤的 API,使开发⼈员能够⽅便地使⽤ Go 语⾔来⽣产和消费 Kafka 消息。
  • Github链接:https://github.com/IBM/sarama

4.Kafka使⽤

4.1 安装依赖包

go get github.com/IBM/sarama

4.2 初始化Client

package main

import (
	"fmt"
	"log"
	"strconv"
	"time"

	"github.com/IBM/sarama"
)

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
	client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化sarama客户端报错%s", err.Error())
		return
	}
	defer client.Close()
	
	//其他操作,如创建生产者或消费者
	fmt.Println("Client initialized successfully!")
}
  • 生产者
	producer, err := sarama.NewSyncProducerFromClient(client)
	if err != nil {
		//处理生产者创建错误
		return
	}

-消费者

	consumer, err := sarama.NewConsumerFromClient(client)
	if err != nil {
		//处理消费者创建错误
		return
	}

4.3初始化参数生产者

参数 示例 描述
sarama.MaxRequestSize 100 * 1024 * 1024 请求最大大小,默认100MB,可以调整,写入大于100MB的消息会直接报错
sarama.MaxResponseSize 100 * 1024 * 1024 响应最大大小,默认100MB,可以调整,获取大于100MB的消息会直接报错
config.Producer.RequiredAcks sarama.WaitForLocal(1) 默认值为sarama.WaitForLocal(1)
config.Producer.Retry.Max 3 生产者重试的最大次数,默认为3
config.Producer.Retry.Backoff 100 * time.Millisecond 生产者重试之间的等待时间,默认为100毫秒
config.Producer.Return.Successes false 是否返回成功的消息,默认为false
config.Producer.Return.Errors true 返回失败的消息,默认值为true
config.Producer.Compression CompressionNone 对消息是否压缩后发送,默认CompressionNone不压缩
config.Producer.CompressionLevel CompressionLevelDefault 指定压缩等级,在配置了压缩算法后生效
config.Producer.Flush.Frequency 0 producer缓存消息的时间, 默认缓存0毫秒
config.Producer.Flush.Bytes 0 达到多少字节时,触发一次broker请求,默认为0,直接发送,存在天然上限值MaxRequestSize
config.Producer.Flush.Messages 0 达到多少条消息时,强制,触发一次broker请求,这个是上限值,MaxMessages < Messages
config.Producer.Flush.MaxMessages 0 最大缓存多少消息,默认为0,有消息立刻发送,MaxMessages设置大于0时,必须设置 Messages,且需要保证:MaxMessages > Messages
config.Producer.Timeout 5 * time.Second 超时时间
config.Producer.Idempotent false 是否需要幂等,默认false
config.Producer.Transaction.Timeout 1 * time.Minute 事务超时时间默认1分钟
config.Producer.Transaction.Retry.Max 50 事务重试时间
config.Producer.Transaction.Retry.Backoff 100 * time.Millisecond 事务重试间隔时间
config.Net.MaxOpenRequests 5 默认值5,一次发送请求的数量
config.Producer.Transaction.ID “test” 事务ID
config.ClientID “your-client-id” 客户端ID

4.4初始化参数消费者

配置项 示例 描述
config.Consumer.Group.Rebalance.Strategy sarama.NewBalanceStrategyRange 消费者分配分区的默认方式
config.Consumer.Offsets.Initial sarama.OffsetNewest 在没有提交位点情况下,使用最新的位点还是最老的位点,默认是最新的消息位点
config.Consumer.Offsets.AutoCommit.Enable true 是否支持自动提交位点,默认支持
config.Consumer.Offsets.AutoCommit.Interval 1 * time.Second 自动提交位点时间间隔,默认1s
config.Consumer.MaxWaitTime 250 * time.Millisecond 在没有最新消费消息时候,客户端等待的时间,默认250ms
config.Consumer.MaxProcessingTime 100 * time.Millisecond
config.Consumer.Fetch.Min 1 消费请求中获取的最小消息字节数,Broker将等待至少这么多字节的消息然后返回。默认值为1,不能设置0,因为0会导致在没有消息可用时消费者空转
config.Consumer.Fetch.Max 0 消费请求最大的字节数。默认为0,表示不限制
config.Consumer.Fetch.Default 1024 * 1024 消费请求的默认消息字节数(默认为1MB),需要大于实例的大部分消息,否则Broker会花费大量时间计算消费数据是否达到这个值的条件
config.Consumer.Return.Errors true
config.Consumer.Group.Rebalance.Timeout 60 * time.Second 设置rebalance操作的超时时间,默认60s
config.Consumer.Group.Session.Timeout 10 * time.Second 设置消费者组会话的超时时间为,默认为10s
config.Consumer.Group.Heartbeat.Interval 3 * time.Second 心跳超时时间,默认为3s
config.Consumer.MaxProcessingTime 100 * time.Millisecond 消息处理的超时时间,默认100ms

5.常用的Topic 的增删查

5.1创建Topic

package main

import (
	"fmt"
	"github.com/IBM/sarama"
)

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
	client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化sarama客户端报错%s", err.Error())
		return
	}
	defer client.Close()

	//创建Topic,⾸先⽣成admin client
	admin, err := sarama.NewClusterAdminFromClient(client)
	if err != nil {
		fmt.Printf("初始化saramaAdmin客户端报错%s", err.Error())
		return
	}
	defer admin.Close()

	//设置分区以及副本
	topicDetail := &sarama.TopicDetail{
		NumPartitions:     2,
		ReplicationFactor: 1,
	}

	//创建Topic
	err = admin.CreateTopic("fei-topic", topicDetail, false)
	if err != nil {
		fmt.Printf("创建Topic报错%s", err.Error())
		return
	}
}

5.2查看所有Topic

package main

import (
	"fmt"
	"github.com/IBM/sarama"
)

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
	client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化sarama客户端报错%s", err.Error())
		return
	}
	defer client.Close()

	//创建Topic,⾸先⽣成admin client
	admin, err := sarama.NewClusterAdminFromClient(client)
	if err != nil {
		fmt.Printf("初始化saramaAdmin客户端报错%s", err.Error())
		return
	}
	defer admin.Close()

	topics, err := admin.ListTopics()
	if err != nil {
		fmt.Printf("查看Topic报错%s", err.Error())
		return
	}
	fmt.Println(topics)
}
fei@feideMacBook-Pro admin % go run admin.go
map[fei-topic:{2 1 map[0:[1001] 1:[1001]] map[segment.bytes:0xc00021c250]}]

5.3查看Topic详情

package main

import (
	"fmt"
	"github.com/IBM/sarama"
)

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
	client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化sarama客户端报错%s", err.Error())
		return
	}
	defer client.Close()

	//创建Topic,⾸先⽣成admin client
	admin, err := sarama.NewClusterAdminFromClient(client)
	if err != nil {
		fmt.Printf("初始化saramaAdmin客户端报错%s", err.Error())
		return
	}
	defer admin.Close()

	metadata, err := admin.DescribeTopics([]string{"fei-topic"})
	if err != nil {
		fmt.Printf("查看Topic详细报错%s", err.Error())
	}

	for _, partition := range metadata {
		fmt.Println(partition)
	}
}
fei@feideMacBook-Pro admin % go run admin.go
&{7 kafka server: Not an error, why are you printing me? fei-topic AAAAAAAAAAAAAAAAAAAAAA false [0xc000180660 0xc0001806c0] 0}

5.4查找指定主题和分区的偏移量

package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
	kaClient, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化客户端报错%s", err.Error())
	}
	//查看最老和最新offset
	offsetOld0, _ := kaClient.GetOffset("fei-topic", 0, sarama.OffsetOldest)
	OffsetNew0, err := kaClient.GetOffset("fei-topic", 0, sarama.OffsetNewest)

	offsetOld1, _ := kaClient.GetOffset("fei-topic", 1, sarama.OffsetOldest)
	OffsetNew1, _ := kaClient.GetOffset("fei-topic", 1, sarama.OffsetNewest)

	fmt.Printf("分区%v,OffsetOldest%d\n", 0, offsetOld0)
	fmt.Printf("分区%v,OffsetNewest%d\n", 0, OffsetNew0)

	fmt.Printf("分区%v,OffsetOldest%d\n", 1, offsetOld1)
	fmt.Printf("分区%v,OffsetNewest%d\n", 1, OffsetNew1)
}
fei@feideMacBook-Pro admin % go run admin.go
分区0,OffsetOldest0
分区0,OffsetNewest0
分区1,OffsetOldest0
分区1,OffsetNewest0

5.5获取单分区的消息偏移量

package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
	client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化客户端报错%s", err.Error())
	}

	//查找指定分区的消费位移
	consumer, _ := sarama.NewConsumerFromClient(client)
	defer consumer.Close()

	//获取分区消费者
	partitionConsumer, _ := consumer.ConsumePartition("fei-topic", 0, 0)
	defer partitionConsumer.Close()

	//获取消费者最新消费位移
	offset := partitionConsumer.HighWaterMarkOffset()
	fmt.Printf("Partition 0 Offset: %d\n", offset)
}
fei@feideMacBook-Pro admin % go run admin.go
Partition 0 Offset: 16
//这里由于向分区中生产了16条消息 所以这里显示offset为16。和5.4效果差不多,只是调用的方式和对象略有不同。

5.6获取消费者组消费情况

package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
	kaClient, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化客户端报错%s", err.Error())
	}

	consumer_groups := []string{"fei-group"}
	partitions := []int32{0, 1}
	for _, consumer_group := range consumer_groups {
		// 获取指定消费组在指定 topic 和 partition 的 offset
		offsetManager, err := sarama.NewOffsetManagerFromClient(consumer_group, kaClient)
		if err != nil {
			fmt.Println("Error creating offset manager:", err)
			return
		}
		defer offsetManager.Close()

		// 获取指定 topic 和 partition 的 offset
		for _, partition := range partitions {
			partitionOffset, _ := offsetManager.ManagePartition("fei-topic", partition)
			offset, _ := partitionOffset.NextOffset()
			fmt.Printf("Consumer Group offset for topic '%s', partition %v:%v\n", consumer_group, partition, offset)
		}
	}
}
fei@feideMacBook-Pro meta % go run query.go
Consumer Group offset for topic 'fei-group', partition 0:23
Consumer Group offset for topic 'fei-group', partition 1:27

6.生产者

6.1同步生产消息

package main

import (
	"fmt"
	"github.com/IBM/sarama"
	"log"
)

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
	client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化sarama客户端报错%s", err.Error())
		return
	}
	defer client.Close()

	SyncProducer, err := sarama.NewSyncProducerFromClient(client)
	if err != nil {
		log.Fatal("Failed to cretae sync producer:", err)
	}
	defer SyncProducer.Close()

	// 指定要发送的主题和消息内容
	topic := "fei-topic"
	massage := "Hello fei"

	msg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder(massage),
	}

	//发送消息
	partition, offset, err := SyncProducer.SendMessage(msg)
	if err != nil {
		log.Println("Failed to send message: ", err)
	} else {
		log.Printf("Message sent successfully! Partition=%d,Offset=%d\n", partition, offset)
	}

}
fei@feideMacBook-Pro producer % go run main.go 
2024/11/10 03:17:34 Message sent successfully! Partition=0,Offset=20
fei@feideMacBook-Pro producer % go run main.go
2024/11/10 03:17:50 Message sent successfully! Partition=1,Offset=24

6.2异步生产消息

package main

import (
	"fmt"
	"log"
	"strconv"
	"time"

	"github.com/IBM/sarama"
)

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。

	prClient, err := sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化异步生产者客户端报错%s", err.Error())
	}
	defer prClient.Close()

	for i := 0; i <= 10; i++ {
		//指定要发送的主题和消息
		topic := "fei-topic"
		message := "Hello,Kafka" + strconv.Itoa(i)

		//创建消息对象
		msg := &sarama.ProducerMessage{
			Topic: topic,
			Value: sarama.StringEncoder(message),
		}
		prClient.Input() <- msg
		select {
		case success := <-prClient.Successes():
			log.Printf("Msg sent successfully!,Partition:%d,Offset:%d", success.Partition, success.Offset)
		case err := <-prClient.Errors():
			log.Printf("Failed to send Msg:%s", err.Error())
		case <-time.After(5 * time.Second):
			log.Printf("Timeout reached. Failed to send Msg")
		}
	}

}
fei@feideMacBook-Pro producer % go run producer.go
2024/11/10 03:16:49 Msg sent successfully!,Partition:0,Offset:16
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:17
2024/11/10 03:16:49 Msg sent successfully!,Partition:0,Offset:17
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:18
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:19
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:20
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:21
2024/11/10 03:16:49 Msg sent successfully!,Partition:0,Offset:18
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:22
2024/11/10 03:16:49 Msg sent successfully!,Partition:1,Offset:23
2024/11/10 03:16:49 Msg sent successfully!,Partition:0,Offset:19

6.3消息设置

 // 设置消息的键(Key)
 msg.Key = sarama.StringEncoder("my-key")

 // 设置消息的分区(Partition)
 msg.Partition = 0

 // 设置消息的时间戳(Timestamp)
 msg.Timestamp = time.Now()

 // 设置消息的头部(Headers)
 msg.Headers = []sarama.RecordHeader{
 	{Key: []byte("header-key"), Value: []byte("header-value")},
 }

7.消费者

7.1单分区消费者

package main

import (
	"fmt"
	"github.com/IBM/sarama"
	"sync"
)

var wg = sync.WaitGroup{}

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
	client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化客户端报错%s", err.Error())
	}
	defer client.Close()

	consumer, err := sarama.NewConsumerFromClient(client)
	if err != nil {
		fmt.Printf("初始化消费者客户端报错%s", err.Error())
	}
	defer consumer.Close()

	topic := "fei-topic"
	partition := int32(0) //指定消费的分区
	offset := int64(0)    //消费的起始偏移量

	//根据指定分区和偏移量创建分区消费者
	consumePartition, err := consumer.ConsumePartition(topic, partition, offset)
	if err != nil {
		fmt.Printf("创建分区消费者报错%s", err.Error())
	}

	defer consumePartition.Close()

	//处理收到的消息
	wg.Add(1)
	go func() {
		for msg := range consumePartition.Messages() {
			fmt.Printf("Received msg: Topic=%s,Partition%d,Offsed=%d,Value%s\n",
				msg.Topic, msg.Partition, msg.Offset, msg.Value)
		}
		wg.Done()
	}()

	wg.Wait()
}
fei@feideMacBook-Pro consumer % go run main.go 
Received msg: Topic=fei-topic,Partition0,Offsed=0,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=1,ValueHello,Kafka5
Received msg: Topic=fei-topic,Partition0,Offsed=2,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=3,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition0,Offsed=4,ValueHello,Kafka1
Received msg: Topic=fei-topic,Partition0,Offsed=5,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition0,Offsed=6,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=7,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition0,Offsed=8,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=9,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition0,Offsed=10,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition0,Offsed=11,ValueHello,Kafka3
Received msg: Topic=fei-topic,Partition0,Offsed=12,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=13,ValueHello,Kafka6
Received msg: Topic=fei-topic,Partition0,Offsed=14,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=15,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition0,Offsed=16,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition0,Offsed=17,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition0,Offsed=18,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition0,Offsed=19,ValueHello,Kafka10
Received msg: Topic=fei-topic,Partition0,Offsed=20,ValueHello fei

7.2多分区消费者

package main

import (
	"fmt"
	"github.com/IBM/sarama"
	"sync"
)

var wg = sync.WaitGroup{}

func main() {
	//初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true //在成功发送消息后返回成功的消息元数据。
	client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("初始化客户端报错%s", err.Error())
	}
	defer client.Close()

	consumer, err := sarama.NewConsumerFromClient(client)
	if err != nil {
		fmt.Printf("初始化消费者客户端报错%s", err.Error())
	}
	defer consumer.Close()

	topic := "fei-topic"
	partitions := []int32{0, 1} //指定消费的分区
	offset := int64(0)          //消费的起始偏移量

	//处理收到的消息
	wg.Add(len(partitions))
	for _, partition := range partitions {
		go func(p int32) {
			defer wg.Done()
			//根据指定分区和偏移量创建分区消费者
			consumePartition, err := consumer.ConsumePartition(topic, p, offset)
			if err != nil {
				fmt.Printf("创建分区消费者报错%s", err.Error())
			}
			defer consumePartition.Close()

			for msg := range consumePartition.Messages() {
				fmt.Printf("Received msg: Topic=%s,Partition%d,Offsed=%d,Value%s\n",
					msg.Topic, msg.Partition, msg.Offset, msg.Value)
			}
		}(partition)
	}

	wg.Wait()
}
fei@feideMacBook-Pro consumer % go run main.go 
Received msg: Topic=fei-topic,Partition0,Offsed=0,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=1,ValueHello,Kafka5
Received msg: Topic=fei-topic,Partition0,Offsed=2,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=3,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition0,Offsed=4,ValueHello,Kafka1
Received msg: Topic=fei-topic,Partition0,Offsed=5,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition0,Offsed=6,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=7,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition0,Offsed=8,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=9,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition0,Offsed=10,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition0,Offsed=11,ValueHello,Kafka3
Received msg: Topic=fei-topic,Partition0,Offsed=12,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition0,Offsed=13,ValueHello,Kafka6
Received msg: Topic=fei-topic,Partition0,Offsed=14,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition0,Offsed=15,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition0,Offsed=16,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition1,Offsed=0,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition1,Offsed=1,ValueHello,Kafka1
Received msg: Topic=fei-topic,Partition1,Offsed=2,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition1,Offsed=3,ValueHello,Kafka3
Received msg: Topic=fei-topic,Partition1,Offsed=4,ValueHello,Kafka6
Received msg: Topic=fei-topic,Partition1,Offsed=5,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition0,Offsed=17,ValueHello,Kafka2
Received msg: Topic=fei-topic,Partition1,Offsed=6,ValueHello,Kafka10
Received msg: Topic=fei-topic,Partition0,Offsed=18,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition0,Offsed=19,ValueHello,Kafka10
Received msg: Topic=fei-topic,Partition0,Offsed=20,ValueHello fei
Received msg: Topic=fei-topic,Partition1,Offsed=7,ValueHello,Kafka0
Received msg: Topic=fei-topic,Partition1,Offsed=8,ValueHello,Kafka3
Received msg: Topic=fei-topic,Partition1,Offsed=9,ValueHello,Kafka5
Received msg: Topic=fei-topic,Partition1,Offsed=10,ValueHello,Kafka6
Received msg: Topic=fei-topic,Partition1,Offsed=11,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition1,Offsed=12,ValueHello,Kafka10
Received msg: Topic=fei-topic,Partition1,Offsed=13,ValueHello,Kafka1
Received msg: Topic=fei-topic,Partition1,Offsed=14,ValueHello,Kafka5
Received msg: Topic=fei-topic,Partition1,Offsed=15,ValueHello,Kafka7
Received msg: Topic=fei-topic,Partition1,Offsed=16,ValueHello,Kafka10
Received msg: Topic=fei-topic,Partition1,Offsed=17,ValueHello,Kafka1
Received msg: Topic=fei-topic,Partition1,Offsed=18,ValueHello,Kafka3
Received msg: Topic=fei-topic,Partition1,Offsed=19,ValueHello,Kafka4
Received msg: Topic=fei-topic,Partition1,Offsed=20,ValueHello,Kafka5
Received msg: Topic=fei-topic,Partition1,Offsed=21,ValueHello,Kafka6
Received msg: Topic=fei-topic,Partition1,Offsed=22,ValueHello,Kafka8
Received msg: Topic=fei-topic,Partition1,Offsed=23,ValueHello,Kafka9
Received msg: Topic=fei-topic,Partition1,Offsed=24,ValueHello fei
Received msg: Topic=fei-topic,Partition1,Offsed=25,ValueHello fei
Received msg: Topic=fei-topic,Partition0,Offsed=21,ValueHello fei

7.3消费者组⭐️

  • kafka会记录不同消费者组已经消费的offset信息。只需在代码中将消息标记为已消费

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/IBM/sarama"
)

var wg = sync.WaitGroup{}

func main() {
	//消费者组初始化客户端
	config := sarama.NewConfig()
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true
	config.Consumer.Offsets.AutoCommit.Enable = true               //是否支持自动提交位点,默认支持
	config.Consumer.Offsets.AutoCommit.Interval = 10 * time.Second //自动提交位点时间间隔,默认1s

	ConsumerGroup, err := sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "fei-group", config)
	if err != nil {
		log.Fatal("Failed to create consumer group:", err)
	}
	defer ConsumerGroup.Close()

	//指定订阅的主题
	topics := []string{"fei-topic"}
	//创建一个消费者组处理器
	handler := ConsumerGroupHandler{}
	ctx := context.Background()
	wg.Add(1)
	go func() {
		for {
			err := ConsumerGroup.Consume(ctx, topics, handler)
			if err != nil {
				log.Fatal("Error form consumer group:", err)
			}
		}
		wg.Done()
	}()
	wg.Wait()
}

// 自定义消费者组处理器
type ConsumerGroupHandler struct{}

// 实现sarama.ConsumerGroupHandler 接口方法
// Setup方法在消费者组启动时调用,用于进行一些初始化操作
func (h ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
	fmt.Println("Consumer group setup")
	return nil
}

// Cleanup方法在消费者组结束时调用,用于进行清理操作
func (h ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
	fmt.Println("Consumer group cleanup")
	return nil
}
// ConsumeClaim方法在消费者组从 Kafka 中获取到新的消息分区时被执行,用于处理当前消费者负责消费的消息分区。
func (h ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("消息内容%s,对应Offset%d\n", string(msg.Value), msg.Offset)
		session.MarkMessage(msg, "") //将消息标记为已消费,不加此行会导致重启消费者组之后重复消费
	}
	return nil
}

/*
ConsumerGroupHandler struct 实现了 sarama.ConsumerGroupHandler 接口方法:
	Setup:在消费者组启动时进行初始化操作。
	Cleanup:在消费者组结束时进行清理操作。
	ConsumeClaim:消费消息,打印消息内容和 offset,并标记消息为已消费。

在37行将结构体传入 err := ConsumerGroup.Consume(ctx, topics, handler)。即用到了多态的特性,因为结构体实现了方法。那么在sarama中执行方法时会执行我们重写的这部分方法中的内容
*/
fei@feideMacBook-Pro consumer % go run consumer.go 
Consumer group setup
消息内容Hello fei,对应Offset26
消息内容Hello fei,对应Offset22

Last updated 10 Nov 2024, 04:21 +0800 . history