// Access to influxDB vie HTTP import axios from 'axios' import { DateTime } from 'luxon' // import csvParse from 'csv-parser' import { logit, logerror } from '../utilities/logit.js' import {returnOnError} from "../utilities/reporterror.js"; import {csv2Json} from "../utilities/csv2json.js"; let INFLUXHOST = process.env.INFLUXHOST || "localhost" let INFLUXPORT = process.env.INFLUXPORT || 8086 let INFLUXTOKEN = process.env.INFLUXTOKEN || "" //"rklEClT22KfdXZhA47eyJhbqcvekb8bcKCqlUG7n72uDSmR2xGvif0CmGJe0WQtXB96y29mmt-9BdsgWA5npfg==" //"BNR6cGdb006O1T6hQkGcfB8tgH-UPO6QkOPToeAvrP7LATJbCuWi1wYf3HBpVdZQEBxHxNSrNenZsOSMogX-lg==" let INFLUXDATABUCKET = process.env.INFLUXDATABUCKET || "sensor_data" let INFLUXORG = process.env.INFLUXORG || "citysensor" const INFLUXURL_READ = `http://${INFLUXHOST}:${INFLUXPORT}/api/v2/query?org=${INFLUXORG}` const INFLUXURL_WRITE = `http://${INFLUXHOST}:${INFLUXPORT}/api/v2/write?org=${INFLUXORG}&bucket=${INFLUXDATABUCKET}` const influxRead = async (query) => { let start = DateTime.now() logit(`ReadInflux from ${INFLUXURL_READ}`) let erg = { values: [], err: null} try { let ret = await axios({ method: 'post', url: INFLUXURL_READ, data: query, headers: { Authorization: `Token ${INFLUXTOKEN}`, Accept: 'application/csv', 'Content-type': 'application/vnd.flux' }, timeout: 10000, }) if (ret.status !== 200) { return returnOnError(erg, RESPSTATUS, influxRead.name, ret.status) } erg.values = ret.data } catch (e) { return returnOnError(erg, e, influxRead.name) } // logit(`Influx read time: ${start.diffNow('seconds').toObject().seconds * -1} sec`) return erg } const influxWrite = async (data) => { let start = DateTime.now() let ret try { ret = await axios({ method: 'post', url: INFLUXURL_WRITE, data: data, headers: { Authorization: `Token ${INFLUXTOKEN}`, Accept: 'application/json', 'Content-Type': 'text/plain; charset=utf-8' }, timeout: 10000, }) if (ret.status !== 204) { logerror(`doWrite2API Status: ${ret.status}`) } } catch (e) { logerror(`doWrite2API ${e}`) } logit(`Influx-Write-Time: ${start.diffNow('seconds').toObject().seconds * -1} sec`) return ret } const fetchFromInflux = async (ret, query) => { let { values, err} = await influxRead(query) if(err) { if(err.toString().includes('400')) { return returnOnError(ret, 'SYNTAXURL', fetchFromInflux.name) } else { return returnOnError(ret, err, fetchFromInflux.name) } } if (values.length <= 2) { return returnOnError(ret, 'NODATA', fetchFromInflux.name) } ret.values = csv2Json(values) return ret } export const fetchActData = async (opts) => { let ret = {err: null, values: []} let sorting = '' if(opts.sort) { if (opts.sort === 1) { sorting = '|> sort(columns: ["_time"], desc: false)' } else if (opts.sort === -1) { sorting = '|> sort(columns: ["_time"], desc: true)' } } // build the flux query let query = ` from(bucket: "sensor_data") |> range(${opts.start}, ${opts.stop}) |> filter(fn: (r) => r.sid == "${opts.sensorid}") ${sorting} |> keep(columns: ["_time","_field","_value"]) |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") ` return await fetchFromInflux(ret, query) } export const fetchNoiseAVGData = async (opts) => { let ret = {err: null, values: []} let small = '|> keep(columns: ["_time", "peakcount", "n_AVG"])' if (opts.long) { small = '' } let queryAVG = ` import "math" threshold = ${opts.peak} data = from(bucket: "sensor_data") |> range(${opts.start}, ${opts.stop}) |> filter(fn: (r) => r["sid"] == "${opts.sensorid}") e10 = data |> filter(fn: (r) => r._field == "E10tel_eq") |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) |> map(fn: (r) => ({r with _value: (10.0 * math.log10(x: r._value))})) |> keep(columns: ["_time","_field","_value"]) |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> rename(columns: {"E10tel_eq" : "n_AVG"}) ecnt = data |> filter(fn: (r) => r._field == "E10tel_eq") |> aggregateWindow(every: 1h, fn: count, createEmpty: false) |> keep(columns: ["_time","_field","_value"]) |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> rename(columns: {"E10tel_eq" : "count"}) esum = data |> filter(fn: (r) => r._field == "E10tel_eq") |> aggregateWindow(every: 1h, fn: sum, createEmpty: false) |> keep(columns: ["_time","_field","_value"]) |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> rename(columns: {"E10tel_eq" : "n_sum"}) peak = data |> filter(fn: (r) => r._field == "LA_max") |> aggregateWindow( every: 1h, fn: (column, tables=<-) => tables |> reduce( identity: {peakcount: 0.0}, fn: (r, accumulator) => ({ peakcount: if r._value >= threshold then accumulator.peakcount + 1.0 else accumulator.peakcount + 0.0, }), ), ) |> keep(columns: ["_time","peakcount"]) part1 = join( tables: {e10: e10, ecnt: ecnt}, on: ["_time"]) part2 = join( tables: {esum: esum, peak: peak}, on: ["_time"]) join( tables: {P1: part1, P2: part2}, on: ["_time"]) ${small} ` return await fetchFromInflux(ret, queryAVG) }