मैं अपाचे बीम के लिए पायथन एसडीके का उपयोग कर रहा हूं। डेटाटेबल और स्कीमा के मान पीसीओलेक्शन में हैं। यह वह संदेश है जिसे मैंने पबसुब से पढ़ा है:

{"DEVICE":"rms005_m1","DATESTAMP":"2020-05-29 20:54:26.733 UTC","SINUMERIK__x_position":69.54199981689453,"SINUMERIK__y_position":104.31400299072266,"SINUMERIK__z_position":139.0850067138672}

फिर मैं इसे डेटाटेबल के लिए लैम्ब्डा फ़ंक्शन के साथ जेसन संदेश में मानों का उपयोग करके BigQuery को लिखना चाहता हूं और स्कीमा के लिए यह फ़ंक्शन:

def set_schema(data):
    list = []
    for name in data:
        if name == 'STATUS' or name == 'DEVICE':
            type = 'STRING'
        elif name == 'DATESTAMP':
            type = 'TIMESTAMP'
        else:
            type = 'FLOAT'
        list.append(name + ':' + type)
    schema = ",".join(list)
    return schema

data = (p
        | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=topic)
        | "Parse json" >> beam.Map(json_parse)
        | "Write to BQ" >> beam.io.WriteToBigQuery(
            table='project:dataset{datatable}__opdata'.format(datatable = lambda element: element["DEVICE"]),
            schema=set_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )
       )

जब मैं इसे निष्पादित करता हूं तो मुझे यह त्रुटि मिलती है:

ValueError: Expected a table reference (PROJECT:DATASET.TABLE or DATASET.TABLE) instead of project:dataset.<function <lambda> at 0x7fa0dc378710>__opdata

मैं पीसीएलेक्शन के मूल्यों को पीआरट्रांसफॉर्म में चर के रूप में कैसे उपयोग कर सकता हूं?

0
Abutreca 1 जून 2020, 16:24

1 उत्तर

सबसे बढ़िया उत्तर

आपको एक फ़ंक्शन को तालिका में पास करना होगा। इसके बजाय इसे आजमाएं:

| "Write to BQ" >> beam.io.WriteToBigQuery(
            table=lambda element: 'project:dataset{datatable}__opdata'.format(datatable = element["DEVICE"]),
            schema=set_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )
2
Lak 2 जून 2020, 07:19