ऐसा लगता है कि जब मैं AggregatingReplyingKafkaTemplate का उपयोग template.setReturnPartialOnTimeout(true) के साथ करता हूं, तो यह टाइमआउट अपवाद देता है, भले ही उपभोक्ताओं से आंशिक परिणाम उपलब्ध हों।

नीचे दिए गए उदाहरण में, मेरे पास अनुरोध विषय का उत्तर देने के लिए 3 उपभोक्ता हैं और मैंने 10 सेकंड में उत्तर समयबाह्य सेट किया है। मैंने उपभोक्ता की प्रतिक्रिया में स्पष्ट रूप से 3 से 11 सेकंड की देरी की है, हालांकि, मैं उपभोक्ता 1 और 2 से प्रतिक्रिया की उम्मीद करता हूं, इसलिए, मैं आंशिक परिणाम वापस कर सकता हूं। हालांकि, मुझे KafkaReplyTimeoutException मिल रहा है। अपने इनपुट की सराहना करें। धन्यवाद।

मैं नीचे दिए गए यूनिट टेस्ट के आधार पर कोड का पालन करता हूं। [काफ्काटेम्पलेट टेस्ट का जवाब देना][1]

मैंने नीचे वास्तविक कोड प्रदान किया है:


@RestController
public class SumController {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    public static final String D_REPLY = "dReply";

    public static final String D_REQUEST = "dRequest";

    @ResponseBody
    @PostMapping(value="/sum")
    public String sum(@RequestParam("message") String message) throws InterruptedException, ExecutionException {

        AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
                new TopicPartitionOffset(D_REPLY, 0), 3, new AtomicInteger());
        String resultValue ="";
        String currentValue ="";

        try {
            template.setDefaultReplyTimeout(Duration.ofSeconds(10));
            template.setReturnPartialOnTimeout(true);

            ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, message);

            RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
                    template.sendAndReceive(record);

            future.getSendFuture().get(5, TimeUnit.SECONDS); // send ok
            System.out.println("Send Completed Successfully");

            ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord = future.get(10, TimeUnit.SECONDS);
            System.out.println("Consumer record size "+consumerRecord.value().size());

            Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecord.value().iterator();

            while (iterator.hasNext()) {
                currentValue = iterator.next().value();
                System.out.println("response " + currentValue);
                System.out.println("Record header " + consumerRecord.headers().toString());
                resultValue = resultValue + currentValue + "\r\n";
            }


        } catch (Exception e) {
            System.out.println("Error Message is "+e.getMessage());
        } 

        return resultValue;

    }

    public AggregatingReplyingKafkaTemplate<Integer, String, String> aggregatingTemplate(
            TopicPartitionOffset topic, int releaseSize, AtomicInteger releaseCount) {
        //Create Container Properties
        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        //Set the consumer Config
        //Create Consumer Factory with Consumer Config
        DefaultKafkaConsumerFactory<Integer, Collection<ConsumerRecord<Integer, String>>> cf =
                new DefaultKafkaConsumerFactory<>(consumerConfigs());

        //Create Listener Container with Consumer Factory and Container Property
        KafkaMessageListenerContainer<Integer, Collection<ConsumerRecord<Integer, String>>> container =
                new KafkaMessageListenerContainer<>(cf, containerProperties);
        //  container.setBeanName(this.testName);
        AggregatingReplyingKafkaTemplate<Integer, String, String> template =
                new AggregatingReplyingKafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()), container,
                        (list, timeout) -> {
                            releaseCount.incrementAndGet();
                            return list.size() == releaseSize;
                        });
        template.setSharedReplyTopic(true);
        template.start();
        return template;
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_id");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
        return props;
    }

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
        return props;
    }

    public ProducerFactory<Integer,String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @KafkaListener(id = "def1", topics = { D_REQUEST}, groupId = "D_REQUEST1")
    @SendTo  // default REPLY_TOPIC header
    public String dListener1(String in) throws InterruptedException {
        return "First Consumer : "+ in.toUpperCase();
    }

    @KafkaListener(id = "def2", topics = { D_REQUEST}, groupId = "D_REQUEST2")
    @SendTo  // default REPLY_TOPIC header
    public String dListener2(String in) throws InterruptedException {
        return "Second Consumer : "+ in.toLowerCase();
    }

    @KafkaListener(id = "def3", topics = { D_REQUEST}, groupId = "D_REQUEST3")
    @SendTo  // default REPLY_TOPIC header
    public String dListener3(String in) throws InterruptedException {
        Thread.sleep(11000);
        return "Third Consumer : "+ in;
    }

}
'''


  [1]: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java
0
Murali S 31 मार्च 2020, 13:22

1 उत्तर

सबसे बढ़िया उत्तर

template.setReturnPartialOnTimeout(true) का सीधा सा मतलब है कि टेम्प्लेट टाइमआउट पर रिलीज़ रणनीति से परामर्श करेगा (timeout तर्क = true के साथ, रणनीति को यह बताने के लिए कि यह डिलीवरी कॉल के बजाय टाइमआउट है)।

आंशिक परिणाम जारी करने के लिए इसे सही होना चाहिए।

यह आपको यह तय करने के लिए सूची को देखने (और संभवतः संशोधित) करने की अनुमति देता है कि आप जारी करना चाहते हैं या त्यागना चाहते हैं।

आपकी रणनीति timeout पैरामीटर पर ध्यान नहीं देती:

   (list, timeout) -> {
        releaseCount.incrementAndGet();
        return list.size() == releaseSize;
    });

आपको return timeout ? true : { ... } चाहिए।

0
Gary Russell 31 मार्च 2020, 14:00