मेरे पास एक गो रूटीन है जो मूल रूप से KafkaConsumer के रूप में कार्य कर रहा है, यह एक विषय से संदेश पढ़ता है और फिर प्राप्त होने वाले प्रत्येक संदेश के लिए एक और go routine उत्पन्न करता है। अब यह Consumer go routine तब बंद होना चाहिए जब main go routine एप्लिकेशन बंद हो जाए। लेकिन मुझे इसे ठीक से बंद करने में कठिनाइयों का सामना करना पड़ रहा है। नीचे Kafka Consumer परिभाषा दी गई है

    package svc    

import (
    "event-service/pkg/pb"
    "fmt"
    "github.com/gogo/protobuf/proto"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
    "log"
    "os"
    "sync"
)    

type EventConsumer func(event eventService.Event)    

type KafkaConsumer struct {
    done            chan bool
    eventChannels   []string
    consumer        *kafka.Consumer
    consumerMapping map[string]EventConsumer
    wg              *sync.WaitGroup
}    

func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
    configMap := &kafka.ConfigMap{}
    for key, value := range config {
        err := configMap.SetKey(key, value)
        if err != nil {
            log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
        }
    }
    return configMap
}    

func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
    var wg sync.WaitGroup
    consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
    done := make(chan bool, 1)
    if err != nil {
        log.Fatalf("An error %v occurred while starting kafka consumer.", err)
    }
    err = consumer.SubscribeTopics(channels, nil)
    if err != nil {
        log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
    }
    return &KafkaConsumer{eventChannels: channels, done: done, wg: &wg, consumer: consumer, consumerMapping: consumerMapping}
}    

func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
    event := eventService.Event{}
    err := proto.Unmarshal(eventData, &event)
    if err != nil {
        log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
    }
    return &event
}    

func (kc *KafkaConsumer) Consume() {
    go func() {
        run := true
        for run == true {
            select {
            case sig := <-kc.done:
                log.Println(fmt.Sprintf("Caught signal %v: terminating \n", sig))
                run = false
                return
            default:
            }
            e := <-kc.consumer.Events()
            switch event := e.(type) {
            case kafka.AssignedPartitions:
                _, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
                err := kc.consumer.Assign(event.Partitions)
                if err != nil {
                    log.Println(fmt.Sprintf("An error %v occurred while assigning partitions.", err))
                }
            case kafka.RevokedPartitions:
                _, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
                err := kc.consumer.Unassign()
                if err != nil {
                    log.Println(fmt.Sprintf("An error %v occurred while unassigning partitions.", err))
                }
            case *kafka.Message:
                domainEvent := kc.getEvent(event.Value)
                kc.wg.Add(1)
                go func(event *eventService.Event) {
                    defer kc.wg.Done()
                    if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
                        eventConsumer(*domainEvent)
                    } else {
                        log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
                    }
                }(domainEvent)
            case kafka.PartitionEOF:
                fmt.Printf("%% Reached %v\n", e)
            case kafka.Error:
                _, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
            }
        }
    }()
}    

func (kc *KafkaConsumer) Close() {
    log.Println("Waiting")
    kc.wg.Wait()
    kc.done <- true
    log.Println("Done waiting")
    err := kc.consumer.Close()
    if err != nil {
        log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
    }
}

और नीचे मुख्य थ्रेड कोड है

    package main    

import (
    "event-service/pkg/pb"
    "event-service/pkg/svc"
    "fmt"
    "log"
)    

func main() {
    eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
        log.Println(fmt.Sprintf("Got event %v from kafka", event))
    }}
    consumerConfig := map[string]interface{}{
        "bootstrap.servers":               "localhost:9092",
        "group.id":                        "catalog",
        "go.events.channel.enable":        true,
        "go.application.rebalance.enable": true,
        "enable.partition.eof":            true,
        "auto.offset.reset":               "earliest",
    }
    kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
    kafkaConsumer.Consume()
    kafkaConsumer.Close()
}

यहां समस्या यह है कि एप्लिकेशन कभी-कभी समाप्त नहीं हो रहा है और यह कुछ रनों में consume फ़ंक्शन निष्पादित नहीं कर रहा है, मैं यहां क्या खो रहा हूं?

-1
kumarD 14 अप्रैल 2020, 18:02

1 उत्तर

सबसे बढ़िया उत्तर

ठीक है, ये रहा समाधान, 1. चूंकि उपभोक्ता गो रूटीन को तब तक जीना चाहिए जब तक मेन गो रूटीन जीवित है और मेन गो रूटीन भी एक अंतहीन गो रूटीन है, गो रूटीन चलने के दौरान कंज्यूमर गो रूटीन को बंद करना है, सही दृष्टिकोण नहीं है।

तो निम्न समाधान काम करता है

package main    

import (
    "event-service/pkg/pb"
    "event-service/pkg/svc"
    "fmt"
    "log"
    "sync"
)    

func main() {
    eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
        log.Println(fmt.Sprintf("Got event %v from kafka", event))
    }}
    consumerConfig := map[string]interface{}{
        "bootstrap.servers":               "localhost:9092",
        "group.id":                        "catalog-2",
        "session.timeout.ms":              6000,
        "go.events.channel.enable":        true,
        "go.application.rebalance.enable": true,
        "enable.partition.eof":            true,
        "auto.offset.reset":               "earliest",
    }
    var wg sync.WaitGroup
    kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
    kafkaConsumer.Consume(&wg)
    wg.Wait()
    kafkaConsumer.Close()
}

सेवा परिभाषा

package svc    

import (
    "event-service/pkg/pb"
    "fmt"
    "github.com/gogo/protobuf/proto"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"
)    

type EventConsumer func(event eventService.Event)    

type KafkaConsumer struct {
    done            chan bool
    consumer        *kafka.Consumer
    consumerMapping map[string]EventConsumer
    sigChan         chan os.Signal
    channels        []string
}    

func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
    configMap := &kafka.ConfigMap{}
    for key, value := range config {
        err := configMap.SetKey(key, value)
        if err != nil {
            log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
        }
    }
    return configMap
}    

func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
    sigChan := make(chan os.Signal, 1)
    consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
    done := make(chan bool, 1)
    if err != nil {
        log.Fatalf("An error %v occurred while starting kafka consumer.", err)
    }
    err = consumer.SubscribeTopics(channels, nil)
    if err != nil {
        log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
    }
    return &KafkaConsumer{channels: channels, sigChan: sigChan, done: done, consumer: consumer, consumerMapping: consumerMapping}
}    

func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
    event := eventService.Event{}
    err := proto.Unmarshal(eventData, &event)
    if err != nil {
        log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
    }
    return &event
}    

func (kc *KafkaConsumer) Consume(wg *sync.WaitGroup) {
    signal.Notify(kc.sigChan, syscall.SIGINT, syscall.SIGTERM)
    wg.Add(1)
    go func() {
        run := true
        defer wg.Done()
        for run == true {
            select {
            case sig := <-kc.sigChan:
                fmt.Printf("Caught signal %v: terminating\n", sig)
                run = false
            case ev := <-kc.consumer.Events():
                switch e := ev.(type) {
                case kafka.AssignedPartitions:
                    _, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
                    _ = kc.consumer.Assign(e.Partitions)
                case kafka.RevokedPartitions:
                    _, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
                    _ = kc.consumer.Unassign()
                case *kafka.Message:
                    domainEvent := kc.getEvent(e.Value)
                    wg.Add(1)
                    go func(event *eventService.Event) {
                        defer wg.Done()
                        if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
                            eventConsumer(*domainEvent)
                        } else {
                            log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
                        }
                    }(domainEvent)
                case kafka.PartitionEOF:
                    fmt.Printf("%% Reached %v\n", e)
                case kafka.Error:
                    // Errors should generally be considered as informational, the client will try to automatically recover
                    _, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
                }
            }
        }
    }()
}    

func (kc *KafkaConsumer) Close() {
    err := kc.consumer.Close()
    if err != nil {
        log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
    }
}
0
kumarD 15 अप्रैल 2020, 13:38