mirror of
https://github.com/iLoveElysia/openbilibili.git
synced 2026-03-14 05:46:26 -05:00
515 lines
14 KiB
Go
515 lines
14 KiB
Go
package consumergroup
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Shopify/sarama"
|
|
"github.com/wvanbergen/kazoo-go"
|
|
)
|
|
|
|
var (
|
|
AlreadyClosing = errors.New("The consumer group is already shutting down.")
|
|
)
|
|
|
|
type Config struct {
|
|
*sarama.Config
|
|
|
|
Zookeeper *kazoo.Config
|
|
|
|
Offsets struct {
|
|
Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
|
|
ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
|
|
CommitInterval time.Duration // The interval between which the processed offsets are commited.
|
|
ResetOffsets bool // Resets the offsets for the consumergroup so that it won't resume from where it left off previously.
|
|
}
|
|
}
|
|
|
|
func NewConfig() *Config {
|
|
config := &Config{}
|
|
config.Config = sarama.NewConfig()
|
|
config.Zookeeper = kazoo.NewConfig()
|
|
config.Offsets.Initial = sarama.OffsetOldest
|
|
config.Offsets.ProcessingTimeout = 60 * time.Second
|
|
config.Offsets.CommitInterval = 10 * time.Second
|
|
|
|
return config
|
|
}
|
|
|
|
func (cgc *Config) Validate() error {
|
|
if cgc.Zookeeper.Timeout <= 0 {
|
|
return sarama.ConfigurationError("ZookeeperTimeout should have a duration > 0")
|
|
}
|
|
|
|
if cgc.Offsets.CommitInterval < 0 {
|
|
return sarama.ConfigurationError("CommitInterval should have a duration >= 0")
|
|
}
|
|
|
|
if cgc.Offsets.Initial != sarama.OffsetOldest && cgc.Offsets.Initial != sarama.OffsetNewest {
|
|
return errors.New("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest.")
|
|
}
|
|
|
|
if cgc.Config != nil {
|
|
if err := cgc.Config.Validate(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// The ConsumerGroup type holds all the information for a consumer that is part
|
|
// of a consumer group. Call JoinConsumerGroup to start a consumer.
|
|
type ConsumerGroup struct {
|
|
config *Config
|
|
|
|
consumer sarama.Consumer
|
|
kazoo *kazoo.Kazoo
|
|
group *kazoo.Consumergroup
|
|
instance *kazoo.ConsumergroupInstance
|
|
|
|
wg sync.WaitGroup
|
|
singleShutdown sync.Once
|
|
|
|
messages chan *sarama.ConsumerMessage
|
|
errors chan error
|
|
stopper chan struct{}
|
|
|
|
consumers kazoo.ConsumergroupInstanceList
|
|
|
|
offsetManager OffsetManager
|
|
}
|
|
|
|
// Connects to a consumer group, using Zookeeper for auto-discovery
|
|
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) {
|
|
|
|
if name == "" {
|
|
return nil, sarama.ConfigurationError("Empty consumergroup name")
|
|
}
|
|
|
|
if len(topics) == 0 {
|
|
return nil, sarama.ConfigurationError("No topics provided")
|
|
}
|
|
|
|
if len(zookeeper) == 0 {
|
|
return nil, errors.New("You need to provide at least one zookeeper node address!")
|
|
}
|
|
|
|
if config == nil {
|
|
config = NewConfig()
|
|
}
|
|
config.ClientID = name
|
|
|
|
// Validate configuration
|
|
if err = config.Validate(); err != nil {
|
|
return
|
|
}
|
|
|
|
var kz *kazoo.Kazoo
|
|
if kz, err = kazoo.NewKazoo(zookeeper, config.Zookeeper); err != nil {
|
|
return
|
|
}
|
|
|
|
brokers, err := kz.BrokerList()
|
|
if err != nil {
|
|
kz.Close()
|
|
return
|
|
}
|
|
|
|
group := kz.Consumergroup(name)
|
|
|
|
if config.Offsets.ResetOffsets {
|
|
err = group.ResetOffsets()
|
|
if err != nil {
|
|
kz.Close()
|
|
return
|
|
}
|
|
}
|
|
|
|
instance := group.NewInstance()
|
|
|
|
var consumer sarama.Consumer
|
|
if consumer, err = sarama.NewConsumer(brokers, config.Config); err != nil {
|
|
kz.Close()
|
|
return
|
|
}
|
|
|
|
cg = &ConsumerGroup{
|
|
config: config,
|
|
consumer: consumer,
|
|
|
|
kazoo: kz,
|
|
group: group,
|
|
instance: instance,
|
|
|
|
messages: make(chan *sarama.ConsumerMessage, config.ChannelBufferSize),
|
|
errors: make(chan error, config.ChannelBufferSize),
|
|
stopper: make(chan struct{}),
|
|
}
|
|
|
|
// Register consumer group
|
|
if exists, err := cg.group.Exists(); err != nil {
|
|
cg.Logf("FAILED to check for existence of consumergroup: %s!\n", err)
|
|
_ = consumer.Close()
|
|
_ = kz.Close()
|
|
return nil, err
|
|
} else if !exists {
|
|
cg.Logf("Consumergroup `%s` does not yet exists, creating...\n", cg.group.Name)
|
|
if err := cg.group.Create(); err != nil {
|
|
cg.Logf("FAILED to create consumergroup in Zookeeper: %s!\n", err)
|
|
_ = consumer.Close()
|
|
_ = kz.Close()
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Register itself with zookeeper
|
|
if err := cg.instance.Register(topics); err != nil {
|
|
cg.Logf("FAILED to register consumer instance: %s!\n", err)
|
|
return nil, err
|
|
} else {
|
|
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
|
|
}
|
|
|
|
offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval}
|
|
cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig)
|
|
|
|
go cg.topicListConsumer(topics)
|
|
|
|
return
|
|
}
|
|
|
|
// Returns a channel that you can read to obtain events from Kafka to process.
|
|
func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage {
|
|
return cg.messages
|
|
}
|
|
|
|
// Returns a channel that you can read to obtain events from Kafka to process.
|
|
func (cg *ConsumerGroup) Errors() <-chan error {
|
|
return cg.errors
|
|
}
|
|
|
|
func (cg *ConsumerGroup) Closed() bool {
|
|
return cg.instance == nil
|
|
}
|
|
|
|
func (cg *ConsumerGroup) Close() error {
|
|
shutdownError := AlreadyClosing
|
|
cg.singleShutdown.Do(func() {
|
|
defer cg.kazoo.Close()
|
|
|
|
shutdownError = nil
|
|
|
|
close(cg.stopper)
|
|
cg.wg.Wait()
|
|
|
|
if err := cg.offsetManager.Close(); err != nil {
|
|
cg.Logf("FAILED closing the offset manager: %s!\n", err)
|
|
}
|
|
|
|
if shutdownError = cg.instance.Deregister(); shutdownError != nil {
|
|
cg.Logf("FAILED deregistering consumer instance: %s!\n", shutdownError)
|
|
} else {
|
|
cg.Logf("Deregistered consumer instance %s.\n", cg.instance.ID)
|
|
}
|
|
|
|
if shutdownError = cg.consumer.Close(); shutdownError != nil {
|
|
cg.Logf("FAILED closing the Sarama client: %s\n", shutdownError)
|
|
}
|
|
|
|
close(cg.messages)
|
|
close(cg.errors)
|
|
cg.instance = nil
|
|
})
|
|
|
|
return shutdownError
|
|
}
|
|
|
|
func (cg *ConsumerGroup) Logf(format string, args ...interface{}) {
|
|
var identifier string
|
|
if cg.instance == nil {
|
|
identifier = "(defunct)"
|
|
} else {
|
|
identifier = cg.instance.ID[len(cg.instance.ID)-12:]
|
|
}
|
|
sarama.Logger.Printf("[%s/%s] %s", cg.group.Name, identifier, fmt.Sprintf(format, args...))
|
|
}
|
|
|
|
func (cg *ConsumerGroup) InstanceRegistered() (bool, error) {
|
|
return cg.instance.Registered()
|
|
}
|
|
|
|
func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error {
|
|
cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
|
|
return nil
|
|
}
|
|
|
|
func (cg *ConsumerGroup) FlushOffsets() error {
|
|
return cg.offsetManager.Flush()
|
|
}
|
|
|
|
func (cg *ConsumerGroup) topicListConsumer(topics []string) {
|
|
for {
|
|
select {
|
|
case <-cg.stopper:
|
|
return
|
|
default:
|
|
}
|
|
|
|
consumers, consumerChanges, err := cg.group.WatchInstances()
|
|
if err != nil {
|
|
cg.Logf("FAILED to get list of registered consumer instances: %s\n", err)
|
|
return
|
|
}
|
|
|
|
cg.consumers = consumers
|
|
cg.Logf("Currently registered consumers: %d\n", len(cg.consumers))
|
|
|
|
stopper := make(chan struct{})
|
|
|
|
for _, topic := range topics {
|
|
cg.wg.Add(1)
|
|
go cg.topicConsumer(topic, cg.messages, cg.errors, stopper)
|
|
}
|
|
|
|
select {
|
|
case <-cg.stopper:
|
|
close(stopper)
|
|
return
|
|
|
|
case <-consumerChanges:
|
|
registered, err := cg.instance.Registered()
|
|
if err != nil {
|
|
cg.Logf("FAILED to get register status: %s\n", err)
|
|
} else if !registered {
|
|
err = cg.instance.Register(topics)
|
|
if err != nil {
|
|
cg.Logf("FAILED to register consumer instance: %s!\n", err)
|
|
} else {
|
|
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
|
|
}
|
|
}
|
|
|
|
cg.Logf("Triggering rebalance due to consumer list change\n")
|
|
close(stopper)
|
|
cg.wg.Wait()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error, stopper <-chan struct{}) {
|
|
defer cg.wg.Done()
|
|
|
|
select {
|
|
case <-stopper:
|
|
return
|
|
default:
|
|
}
|
|
|
|
cg.Logf("%s :: Started topic consumer\n", topic)
|
|
|
|
// Fetch a list of partition IDs
|
|
partitions, err := cg.kazoo.Topic(topic).Partitions()
|
|
if err != nil {
|
|
cg.Logf("%s :: FAILED to get list of partitions: %s\n", topic, err)
|
|
cg.errors <- &sarama.ConsumerError{
|
|
Topic: topic,
|
|
Partition: -1,
|
|
Err: err,
|
|
}
|
|
return
|
|
}
|
|
|
|
partitionLeaders, err := retrievePartitionLeaders(partitions)
|
|
if err != nil {
|
|
cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err)
|
|
cg.errors <- &sarama.ConsumerError{
|
|
Topic: topic,
|
|
Partition: -1,
|
|
Err: err,
|
|
}
|
|
return
|
|
}
|
|
|
|
dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitionLeaders)
|
|
myPartitions := dividedPartitions[cg.instance.ID]
|
|
cg.Logf("%s :: Claiming %d of %d partitions", topic, len(myPartitions), len(partitionLeaders))
|
|
|
|
// Consume all the assigned partitions
|
|
var wg sync.WaitGroup
|
|
for _, pid := range myPartitions {
|
|
|
|
wg.Add(1)
|
|
go cg.partitionConsumer(topic, pid.ID, messages, errors, &wg, stopper)
|
|
}
|
|
|
|
wg.Wait()
|
|
cg.Logf("%s :: Stopped topic consumer\n", topic)
|
|
}
|
|
|
|
func (cg *ConsumerGroup) consumePartition(topic string, partition int32, nextOffset int64) (sarama.PartitionConsumer, error) {
|
|
consumer, err := cg.consumer.ConsumePartition(topic, partition, nextOffset)
|
|
if err == sarama.ErrOffsetOutOfRange {
|
|
cg.Logf("%s/%d :: Partition consumer offset out of Range.\n", topic, partition)
|
|
// if the offset is out of range, simplistically decide whether to use OffsetNewest or OffsetOldest
|
|
// if the configuration specified offsetOldest, then switch to the oldest available offset, else
|
|
// switch to the newest available offset.
|
|
if cg.config.Offsets.Initial == sarama.OffsetOldest {
|
|
nextOffset = sarama.OffsetOldest
|
|
cg.Logf("%s/%d :: Partition consumer offset reset to oldest available offset.\n", topic, partition)
|
|
} else {
|
|
nextOffset = sarama.OffsetNewest
|
|
cg.Logf("%s/%d :: Partition consumer offset reset to newest available offset.\n", topic, partition)
|
|
}
|
|
// retry the consumePartition with the adjusted offset
|
|
consumer, err = cg.consumer.ConsumePartition(topic, partition, nextOffset)
|
|
}
|
|
if err != nil {
|
|
cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err)
|
|
return nil, err
|
|
}
|
|
return consumer, err
|
|
}
|
|
|
|
// Consumes a partition
|
|
func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- error, wg *sync.WaitGroup, stopper <-chan struct{}) {
|
|
defer wg.Done()
|
|
|
|
select {
|
|
case <-stopper:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Since ProcessingTimeout is the amount of time we'll wait for the final batch
|
|
// of messages to be processed before releasing a partition, we need to wait slightly
|
|
// longer than that before timing out here to ensure that another consumer has had
|
|
// enough time to release the partition. Hence, +2 seconds.
|
|
maxRetries := int(cg.config.Offsets.ProcessingTimeout/time.Second) + 2
|
|
for tries := 0; tries < maxRetries; tries++ {
|
|
if err := cg.instance.ClaimPartition(topic, partition); err == nil {
|
|
break
|
|
} else if tries+1 < maxRetries {
|
|
if err == kazoo.ErrPartitionClaimedByOther {
|
|
// Another consumer still owns this partition. We should wait longer for it to release it.
|
|
time.Sleep(1 * time.Second)
|
|
} else {
|
|
// An unexpected error occurred. Log it and continue trying until we hit the timeout.
|
|
cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err)
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
} else {
|
|
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
|
|
cg.errors <- &sarama.ConsumerError{
|
|
Topic: topic,
|
|
Partition: partition,
|
|
Err: err,
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
defer func() {
|
|
err := cg.instance.ReleasePartition(topic, partition)
|
|
if err != nil {
|
|
cg.Logf("%s/%d :: FAILED to release partition: %s\n", topic, partition, err)
|
|
cg.errors <- &sarama.ConsumerError{
|
|
Topic: topic,
|
|
Partition: partition,
|
|
Err: err,
|
|
}
|
|
}
|
|
}()
|
|
|
|
nextOffset, err := cg.offsetManager.InitializePartition(topic, partition)
|
|
if err != nil {
|
|
cg.Logf("%s/%d :: FAILED to determine initial offset: %s\n", topic, partition, err)
|
|
return
|
|
}
|
|
|
|
if nextOffset >= 0 {
|
|
cg.Logf("%s/%d :: Partition consumer starting at offset %d.\n", topic, partition, nextOffset)
|
|
} else {
|
|
nextOffset = cg.config.Offsets.Initial
|
|
if nextOffset == sarama.OffsetOldest {
|
|
cg.Logf("%s/%d :: Partition consumer starting at the oldest available offset.\n", topic, partition)
|
|
} else if nextOffset == sarama.OffsetNewest {
|
|
cg.Logf("%s/%d :: Partition consumer listening for new messages only.\n", topic, partition)
|
|
}
|
|
}
|
|
|
|
consumer, err := cg.consumePartition(topic, partition, nextOffset)
|
|
|
|
if err != nil {
|
|
cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err)
|
|
return
|
|
}
|
|
|
|
defer consumer.Close()
|
|
|
|
err = nil
|
|
var lastOffset int64 = -1 // aka unknown
|
|
partitionConsumerLoop:
|
|
for {
|
|
select {
|
|
case <-stopper:
|
|
break partitionConsumerLoop
|
|
|
|
case err := <-consumer.Errors():
|
|
if err == nil {
|
|
cg.Logf("%s/%d :: Consumer encountered an invalid state: re-establishing consumption of partition.\n", topic, partition)
|
|
|
|
// Errors encountered (if any) are logged in the consumerPartition function
|
|
var cErr error
|
|
consumer, cErr = cg.consumePartition(topic, partition, lastOffset)
|
|
if cErr != nil {
|
|
break partitionConsumerLoop
|
|
}
|
|
continue partitionConsumerLoop
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case errors <- err:
|
|
continue partitionConsumerLoop
|
|
|
|
case <-stopper:
|
|
break partitionConsumerLoop
|
|
}
|
|
}
|
|
|
|
case message := <-consumer.Messages():
|
|
if message == nil {
|
|
cg.Logf("%s/%d :: Consumer encountered an invalid state: re-establishing consumption of partition.\n", topic, partition)
|
|
|
|
// Errors encountered (if any) are logged in the consumerPartition function
|
|
var cErr error
|
|
consumer, cErr = cg.consumePartition(topic, partition, lastOffset)
|
|
if cErr != nil {
|
|
break partitionConsumerLoop
|
|
}
|
|
continue partitionConsumerLoop
|
|
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-stopper:
|
|
break partitionConsumerLoop
|
|
|
|
case messages <- message:
|
|
lastOffset = message.Offset
|
|
continue partitionConsumerLoop
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
cg.Logf("%s/%d :: Stopping partition consumer at offset %d\n", topic, partition, lastOffset)
|
|
if err := cg.offsetManager.FinalizePartition(topic, partition, lastOffset, cg.config.Offsets.ProcessingTimeout); err != nil {
|
|
cg.Logf("%s/%d :: %s\n", topic, partition, err)
|
|
}
|
|
}
|