मैं निम्नलिखित का उपयोग कर अपने स्पार्क स्ट्रीमिंग नौकरी से काफ्का के लिए ऑफसेट करने की कोशिश कर रहा हूँ:

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

            // some time later, after outputs have completed
              ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

जैसा कि मुझे इस प्रश्न से मिला है:

काफ्का से स्पार्क डीस्ट्रीम हमेशा शुरुआत से शुरू होता है

और यह ठीक काम करता है, ऑफसेट किए जा रहे हैं। हालाँकि, समस्या यह है कि यह अतुल्यकालिक है, जिसका अर्थ है कि दो और ऑफ़सेट कमिट्स को लाइन में भेजे जाने के बाद भी, काफ़्का अभी भी ऑफ़सेट दो कमिट्स पर पकड़ बना सकता है। यदि उपभोक्ता उस बिंदु पर दुर्घटनाग्रस्त हो जाता है, और मैं इसे वापस लाता हूं, तो यह उन संदेशों को पढ़ना शुरू कर देता है जिन्हें पहले ही संसाधित किया जा चुका है।

अब, अन्य स्रोतों से, यहाँ टिप्पणी अनुभाग की तरह:

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

मैं समझ गया कि स्पार्क स्ट्रीमिंग जॉब से समकालिक रूप से ऑफसेट करने का कोई तरीका नहीं है, (हालांकि अगर मैं काफ्का स्ट्रीम का उपयोग करता हूं तो एक है)। लोग इसके बजाय ऑफ़सेट को डेटाबेस में रखने का सुझाव देते हैं जहाँ आप स्ट्रीम पर अपनी गणना के अंतिम परिणाम को जारी रख रहे हैं।

अब, मेरा प्रश्न यह है: यदि मैं अपने डेटाबेस में वर्तमान में पढ़ी गई ऑफसेट को संग्रहीत करता हूं, तो मैं अगली बार उस ऑफसेट से स्ट्रीम को पढ़ना कैसे शुरू करूं?

3
abhishek 24 अप्रैल 2019, 13:50

1 उत्तर

सबसे बढ़िया उत्तर

मैंने शोध किया और मेरे प्रश्न का उत्तर पाया, इसलिए मैं इसे यहां किसी और के लिए पोस्ट कर रहा हूं जो एक ही समस्या का सामना कर सकता है:

  • org.apache.kafka.common.TopicPartition कुंजी के रूप में और मान के रूप में लंबे समय के साथ एक मानचित्र ऑब्जेक्ट बनाएं। TopicPartition कंस्ट्रक्टर दो तर्क लेता है, विषय का नाम और वह विभाजन जिससे आप पढ़ रहे होंगे। मैप ऑब्जेक्ट का मान उस ऑफ़सेट का लंबा प्रतिनिधित्व है जिससे आप स्ट्रीम पढ़ना चाहते हैं।

    मानचित्र प्रारंभऑफ़सेट = नया हैश मैप<>(); startOffset.put (नया TopicPartition ("topic_name", 0), 3332980L);

  • स्ट्रीम सामग्री को उपयुक्त JavaInputStream में पढ़ें, और पहले बनाए गए Map ऑब्जेक्ट को ConsumerStrategies.Subscribe() विधि के तर्क के रूप में प्रदान करें।

    अंतिम JavaInputDStream> स्ट्रीम = KafkaUtils.createDirectStream (jssc, LocationStrategies.PreferConsistent (), ConsumerStrategies.Subscribe (विषय, kafkaParams, startOffset));

4
supriyo_basak 6 मई 2019, 10:44