stateStore.get(), transform() से KStream को उपयोग किए जाने पर असंगत परिणाम देता है। यह शून्य लौटाता है, भले ही संबंधित कुंजी-मान स्टोर में put() रहा हो।

क्या कोई KeyValueStore<> के इस व्यवहार की व्याख्या कर सकता है?

@Component
public class StreamProcessor {

    @StreamListener
    public void process(@Input(KStreamBindings.INPUT_STREAM) KStream<String, JsonNode> inputStream) {
        KStream<String, JsonNode> joinedEvents = inputStream
           .selectKey((key, value) -> computeKey(value))
           .transform(
               () -> new SelfJoinTransformer((v1, v2) -> join(v1, v2), "join_store"),
               "join_store"
            );
        
        joinedEvents
               .foreach((key, value) -> System.out.format("%s,joined=%b\n",key, value.has("right")));
    }

    private JsonNode join(JsonNode left, JsonNode right) {
        ((ObjectNode) left).set("right", right);
        return left;
    }
}

public class SelfJoinTransformer implements Transformer<String, JsonNode, KeyValue<String, JsonNode>> {
  private KeyValueStore<String, JsonNode> stateStore;
  private ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner;
  private String storeName;

  public SelfJoinTransformer(ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner, String storeName) {
    this.storeName = storeName;
    this.valueJoiner = valueJoiner;
  }

  @Override
  public void init(ProcessorContext context) {
     this.stateStore = (KeyValueStore<String, JsonNode>) context.getStateStore(storeName);
  }

  @Override
  public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
    JsonNode oldValue = stateStore.get(key);
    if (oldValue != null) { //this condition rarely holds true
        stateStore.delete(key);
        System.out.format("%s,joined\n", key);
        return KeyValue.pair(key, valueJoiner.apply(oldValue, value));
    }
    stateStore.put(key, value);
    return null;
  }
}
0
Nilesh 2 अक्टूबर 2020, 20:31

1 उत्तर

सबसे बढ़िया उत्तर

कारण, ऐसा लगता है कि संदेश गायब हो रहे हैं (यह मानते हुए, कि विराम चिह्न उन्हें नहीं हटाता है) यह है कि आप KStream :: selectKey (...) का उपयोग करते हैं, यह कुंजी बदलता है, लेकिन पुनर्विभाजन नहीं करता है और आप देख सकते हैं गलत विभाजन में कुंजी।

निम्नलिखित परिदृश्य को देखें:

  • संदेश1: k1, v1 (partition0)
  • संदेश2: k2, v2 (partition1)

धारणा संदेश अलग-अलग विभाजन में रखे जाते हैं (कुंजी के कारण) चयन के बाद कुंजी: k1 -> k, k2 -> k

  • संदेश1: k, v1
  • संदेश2: k, v2

ऑपरेशन selectKey स्टेटलेस है इसलिए संदेश डाउनस्ट्रीम (विषय) पर नहीं भेजे जाते हैं और पुनर्विभाजन नहीं होता है। पहले संदेश के लिए: स्टोर में कुंजी - k के लिए मान रखा गया है (विभाजन 0) जब दूसरा संदेश आता है: कुंजी-के के लिए कोई संदेश नहीं है, क्योंकि यह अलग विभाजन है (विभाजन 1)

3
Bartosz Wardziński 5 अक्टूबर 2020, 11:49