मैं influxdb करने के लिए सेंसर डेटा लिखने की कोशिश कर रहा हूं। डेटा को नैनोसेकंद सटीकता की आवश्यकता होती है। आयातित तिथियां गलत हैं और न कि सभी डेटा पॉइंट प्लॉट किए गए हैं।

पुन: उत्पन्न करने के चरण:

मैं एक डेटाफ्रेम में एक json फ़ाइल पढ़ता हूं जो फिर भी influxdb पर भेजा जाता है। पुन: उत्पन्न करने के लिए कोड:

 dataDF = pd.read_json(dataFile)
  startTime = np.datetime64(dataDF['TimeStamp'][0])
  period = round(1e9 / dataDF['SampleRate'][0])
  timeDF = [startTime]

  for i in range(dataDF.shape[0]):
      ts = timeDF[i] + np.timedelta64(period, 'ns')
      timeDF.append(ts)

  dataDF.index = timeDF[:-1]
  dataDF = dataDF.drop('TimeStamp', 1)
  
  write_api.write(bucket="devBucket", record=dataDF, data_frame_measurement_name=dataDF['DeviceType'][0],
                  data_frame_tag_columns=['SerialNumber', 'SampleRate', 'DeviceType'])

मुझे यकीन नहीं है कि मैं लाइब्रेरी गलत का उपयोग कर रहा हूं लेकिन उदाहरण से इसकी तुलना में ऐसा लगता है कि इसे काम करना चाहिए। सम्मिलित करने से पहले डेटाफ्रेम का एक स्निपेट यहां दिया गया है।

                           SerialNumber   DeviceType  SampleRate   Data
2021-02-12 13:25:35.000000000    1234567890   Pressure       44000  1073741816
2021-02-12 13:25:35.000022727    1234567890   Pressure       44000  1073791012
2021-02-12 13:25:35.000045454    1234567890   Pressure       44000  1073791012
2021-02-12 13:25:35.000068181    1234567890   Pressure       44000  1073745384
2021-02-12 13:25:35.000090908    1234567890   Pressure       44000  1073745384
0
Nooruddin Lakhani 21 अप्रैल 2021, 13:29

1 उत्तर

सबसे बढ़िया उत्तर

मैंने बस पायथन स्क्रिप्ट का पालन करके अपना डेटा आयात करने की कोशिश की और सब कुछ अच्छा लग रहा है

import numpy as np
import pandas as pd

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

"""
Load DataFrame form CSV File
"""
dataDF = pd.read_json("sample.json")
startTime = np.datetime64(dataDF['TimeStamp'][0])
period = round(1e9 / dataDF['SampleRate'][0])
timeDF = [startTime]

for i in range(dataDF.shape[0]):
    ts = timeDF[i] + np.timedelta64(period, 'ns')
    timeDF.append(ts)

dataDF.index = timeDF[:-1]
dataDF = dataDF.drop('TimeStamp', 1)

print(dataDF)

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket="my-bucket", record=dataDF, data_frame_measurement_name=dataDF['DeviceType'][0],
                data_frame_tag_columns=['SerialNumber', 'SampleRate', 'DeviceType'])

"""
Querying ingested data
"""
query = 'from(bucket: "my-bucket")' \
        ' |> range(start: 0, stop: now())' \
        ' |> filter(fn: (r) => r._measurement == "Pressure")' \
        ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
        ' |> count(column: "Data")'
tables = client.query_api().query(query=query)

"""
Processing results
"""
print()
print("=== count of ingested items: ===")
print(tables[0].records[0]["Data"])

"""
Close client
"""
client.__del__()

आपके सभी डेटा में टाइम रेंज में टाइमस्टैम्प है: 2021-02-12T12: 42: 09Z - 2021-02-12T12: 42: 10z। तो आपको उचित क्वेरी बनाने के लिए स्क्रिप्ट संपादक का उपयोग करना चाहिए:

from(bucket: "my-bucket")
  |> range(start: 2021-02-12T12:42:09Z, stop: 2021-02-12T12:42:10Z)
  |> filter(fn: (r) => r["_measurement"] == "Pressure")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")
1
DanOrre 21 अप्रैल 2021, 16:13