मैं स्प्रिंगबूट 2, काफ्क 2.2.0, स्प्रिंग-काफ्का 2.2.5 . का उपयोग करके प्रोजेक्ट बना रहा हूं

मैंने kafka exactly once वातावरण बनाया और संदेश बनाना और उपभोग करना अच्छा था।

लेकिन kafka-consumer-groups.sh ने ऐसा कहा।

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
test_topic      0          23              24              1   
test_topic      1          25              26              1   
test_topic      2          21              22              1   

मैं काफ्का को केवल एक संदेश भेजता हूं, लेकिन LOG-END-OFFSET दुगना हो जाता है और 1 अंतराल हमेशा बना रहता है। (मेरे जावा ऐप में, उत्पादन और उपभोग इरादे के अनुसार काम करता है)

मुझे नहीं पता कि लॉग-एंड-ऑफसेट क्यों दोगुना हो गया।

यदि exactly once कॉन्‍फ़िगर को हटा रहे हैं, तो LOG-END-OFFSET और CURRENT-OFFSET की गणना में कोई समस्‍या नहीं है।

यह मेरे kafkaTemplate सेटअप कोड हैं।

@Bean
    @Primary
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> producerProperties = new HashMap<>();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092";
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // exactly once producer setup
        producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>(KafkaStaticOptions.OBJECT_MAPPER));
        factory.setTransactionIdPrefix("my.transaction.");
        return factory;
    }

    @Bean
    @Primary
    public KafkaTransactionManager<String, Object> kafkaTransactionManager(
        ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    @Primary
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

मेरा निर्माता कोड।

kafkaTemplate.executeInTransaction(kt -> kt.send("test_topic", "test data hahaha"));

जब लॉग-एंड-ऑफ़सेट दोगुना हो गया, तो मैंने जाँच की, और यह produce transaction commit का समय है।

मैंने गलत कॉन्फ़िगरेशन क्या किया?

3
LKC 17 मई 2019, 11:33

1 उत्तर

सबसे बढ़िया उत्तर

लेन-देन का उपयोग करते समय, काफ्का लॉग में "कंट्रोल बैच" डालें ताकि यह इंगित किया जा सके कि संदेश भाग थे या नहीं एक लेन-देन का।

इन बैचों को ऑफ़सेट भी असाइन किया जाता है इसलिए यही कारण है कि आप ऑफ़सेट को 2 से बढ़ते हुए देखते हैं, भले ही आपने केवल एक रिकॉर्ड भेजा हो।

यदि आप अपने लिए जाँच करना चाहते हैं, तो आप अपने लॉग की सामग्री प्रदर्शित करने और नियंत्रण बैच देखने के लिए DumpLogSegments टूल का उपयोग कर सकते हैं:

./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Dumping /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1558083247264 size: 10315 magic: 2 compresscodec: NONE crc: 3531536908 isvalid: true
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 10315 CreateTime: 1558083247414 size: 78 magic: 2 compresscodec: NONE crc: 576574952 isvalid: true

मैंने एकल रिकॉर्ड भेजने के लिए लेन-देन निर्माता का उपयोग किया और आप देख सकते हैं कि दूसरी प्रविष्टि में isControl: true है।

4
Mickael Maison 17 मई 2019, 09:07