मैं 2 अलग-अलग विषयों की सदस्यता लेने वाले 2 उपभोक्ताओं को चलाने की कोशिश कर रहा हूं। एक समय में एक को चलाने पर दोनों उपभोक्ता कार्यक्रम ठीक से चलते हैं, लेकिन उन्हें एक ही समय में चलाने पर, एक उपभोक्ता हमेशा अपवाद प्रदर्शित करता है:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

मैंने सुझावों का पालन किया और max.pool.size को 2, और session.timeout.ms को 30000, heartbeat.interval.ms को 1000 पर सेट किया है

नीचे मेरा उपभोक्ता कार्य है, यह फ़ंक्शन दोनों फाइलों के लिए समान है, केवल विषय का नाम Test2 में बदल जाता है, और मैं इन दोनों कार्यों को एक ही समय में चल रहे 2 अलग-अलग वर्गों में चला रहा हूं।

    public void consume()
    {
        //Kafka consumer configuration settings
        List<String> topicNames = new ArrayList<String>();
        topicNames.add("Test1");
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put("heartbeat.interval.ms", "1000");
        props.put("max.poll.records", "2");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(topicNames);
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
            System.out.println("Record: "+record.value());
                String responseString = "successfull";
                if (responseString.equals("successfull")) {
                    consumer.commitSync();
                }
            }
        }
    }
        catch (Exception e) {
            LOG.error("Exception: ", e);
        }
        finally {
            consumer.close();
        }
    }

इस त्रुटि के कारण, रिकॉर्ड Kafka विषय में प्रतिबद्ध नहीं हो रहे हैं। मैं इस त्रुटि को कैसे दूर करूं?

1
astudent 9 सितंबर 2019, 15:31

1 उत्तर

सबसे बढ़िया उत्तर

आपके मामले में आपको उपभोक्ता को अलग-अलग समूह आईडी असाइन करने की आवश्यकता है। आप एक ही समूह आईडी के साथ दो उपभोक्ता बना रहे हैं (यह ठीक है), लेकिन सदस्यता को दो बार कॉल करना ठीक नहीं है

आप एक बार में एक उपभोक्ता को चलाने में सक्षम हैं क्योंकि आप केवल एक बार सदस्यता लेने के लिए कॉल कर रहे हैं।

अगर आपको और मदद चाहिए तो मुझे बताएं। मदद करने के लिए खुश।

1
Ashish Bhosle 9 सितंबर 2019, 14:00