मैं एक स्पार्क एप्लिकेशन लिख रहा हूं जो काफ्का विषय से संदेश पढ़ता है, डीबी में रिकॉर्ड देखता है, नए संदेश बनाता है और उन्हें किसी अन्य काफ्का विषय पर प्रकाशित करता है। यहाँ मेरा कोड कैसा दिखता है -

val inputMessagesDataSet: DataSet[InputMessage] = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server1")
  .option("subscribe", "input-kafka-topic1")
  .load()
  .select($"value")
  .mapPartitions{r =>
     val messages: Iterator[InputMessage] = parseMessages(r)
  }

inputMessagesDataSet
  .writeStream
  .foreachBatch(processMessages _)
  .trigger(trigger)
  .start
  .awaitTermination

def processMessages(inputMessageDataSet: Dataset[InputMessage]) = {
   // fetch stuff from DB and build a DataSet[OutputMessage]
   val outputMessagesDataSet: DataSet[OutputMessage] = ...
   // now queue to another kafka topic
  outputMessagesDataSet
      .writeStream
      .trigger(trigger)
      .format("kafka")
      .option("kafka.bootstrap.servers", "server1")
      .option("topic", "output-kafka-topic")
      .option("checkpointLocation", loc)
      .start
      .awaitTermination
}

लेकिन मुझे यह कहते हुए एक त्रुटि मिलती है

org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame; ऑनलाइन outputMessagesDataSet.writeStream

ऐसा इसलिए लगता है क्योंकि outputMessagesDataSet को readStream का उपयोग करके नहीं बनाया गया है। मेरे द्वारा मूल mapPartitions() में DataSet[OutputMessage] का निर्माण नहीं करने का कारण यह है कि DB रिकॉर्ड आदि लाने के लिए जिन वर्गों की आवश्यकता होती है, वे क्रमबद्ध नहीं होते हैं, इसलिए यह NotSerializableException फेंकता है।

मैं काफ्का के लिए एक नया डेटासेट और कतार कैसे बना सकता हूं?

0
codewarrior 9 सितंबर 2020, 00:18

1 उत्तर

सबसे बढ़िया उत्तर

foreachBatch एक स्थिर डेटासेट स्वीकार करता है, इसलिए आपको write का उपयोग करने की आवश्यकता है, न कि writeStream

वैकल्पिक रूप से, आप writeStream.format("kafka") forEachBatch का उपयोग किए बिना कर सकते हैं

2
OneCricketeer 11 सितंबर 2020, 20:24