Files
sensorapi_influxtst/databases/influx_sql.js

396 lines
13 KiB
JavaScript

// Access to InfluxDB 1.8 via HTTP using InfluxQL
//
// IMPORTANT: InfluxDB 1.8 vs 2.0 Data Schema Differences:
// - InfluxDB 1.8: Only stores LA_max, LA_min, LA_eq (all in dB)
// - InfluxDB 2.0: Additionally stores E10tel_eq as pre-calculated linear value (10^(LA_max/10))
//
// This implementation converts LA_max to E10tel_eq at runtime to maintain
// compatibility with the Flux version while ensuring correct logarithmic averaging.
import axios from 'axios'
import { DateTime } from 'luxon'
import { logit, logerror } from '../utilities/logit.js'
import { returnOnError } from "../utilities/reporterror.js"
// InfluxDB 1.8 Configuration
let INFLUXHOST = process.env.INFLUXHOST || "localhost"
let INFLUXPORT = process.env.INFLUXPORT || 8086
let INFLUXUSER = process.env.INFLUXUSER || ""
let INFLUXPASS = process.env.INFLUXPASS || ""
let INFLUXDB = process.env.INFLUXDB || "sensor_data"
// InfluxDB 1.8 URLs
const INFLUXURL_READ = `http://${INFLUXHOST}:${INFLUXPORT}/query`
const INFLUXURL_WRITE = `http://${INFLUXHOST}:${INFLUXPORT}/write`
/**
* Execute InfluxQL query against InfluxDB 1.8
* @param {string} query - InfluxQL query string
* @returns {Object} - {values: [], err: null}
*/
const influxRead = async (query) => {
let start = DateTime.now()
logit(`ReadInflux from ${INFLUXURL_READ}`)
let erg = { values: [], err: null}
try {
const params = {
db: INFLUXDB,
q: query,
epoch: 'ms' // Return timestamps in milliseconds
}
// Add authentication if provided
if (INFLUXUSER && INFLUXPASS) {
params.u = INFLUXUSER
params.p = INFLUXPASS
}
let ret = await axios({
method: 'get',
url: INFLUXURL_READ,
params: params,
timeout: 10000,
})
if (ret.status !== 200) {
return returnOnError(erg, 'RESPSTATUS', influxRead.name, ret.status)
}
// InfluxDB 1.8 returns JSON format
if (ret.data.error) {
return returnOnError(erg, ret.data.error, influxRead.name)
}
erg.values = ret.data.results
} catch (e) {
return returnOnError(erg, e, influxRead.name)
}
logit(`Influx read time: ${start.diffNow('seconds').toObject().seconds * -1} sec`)
return erg
}
/**
* Write data to InfluxDB 1.8
* @param {string} data - Line protocol data
* @returns {Object} - Response object
*/
const influxWrite = async (data) => {
let start = DateTime.now()
let ret
try {
const params = {
db: INFLUXDB,
precision: 'ms'
}
// Add authentication if provided
if (INFLUXUSER && INFLUXPASS) {
params.u = INFLUXUSER
params.p = INFLUXPASS
}
ret = await axios({
method: 'post',
url: INFLUXURL_WRITE,
params: params,
data: data,
headers: {
'Content-Type': 'text/plain; charset=utf-8'
},
timeout: 10000,
})
if (ret.status !== 204) {
logerror(`doWrite2API Status: ${ret.status}`)
}
} catch (e) {
logerror(`doWrite2API ${e}`)
}
logit(`Influx-Write-Time: ${start.diffNow('seconds').toObject().seconds * -1} sec`)
return ret
}
/**
* Helper function to transform InfluxDB 1.8 result to format compatible with Flux version
* @param {Array} series - InfluxDB series data
* @returns {Array} - Transformed data array
*/
const transformInfluxResult = (series) => {
if (!series || !series.length) return []
const result = []
series.forEach(serie => {
if (!serie.values) return
const columns = serie.columns
const timeIndex = columns.indexOf('time')
serie.values.forEach(row => {
const record = {}
columns.forEach((col, index) => {
if (col === 'time') {
// Convert timestamp to ISO string for compatibility
record._time = new Date(row[index]).toISOString()
} else {
record[col] = row[index]
}
})
result.push(record)
})
})
return result
}
/**
* Execute query and transform results
* @param {Object} ret - Return object
* @param {string} query - InfluxQL query
* @returns {Object} - Transformed result
*/
const fetchFromInflux = async (ret, query) => {
let { values, err } = await influxRead(query)
if (err) {
if (err.toString().includes('400')) {
return returnOnError(ret, 'SYNTAXURL', fetchFromInflux.name)
} else {
return returnOnError(ret, err, fetchFromInflux.name)
}
}
if (!values || !values.length || !values[0].series) {
return returnOnError(ret, 'NODATA', fetchFromInflux.name)
}
ret.values = transformInfluxResult(values[0].series)
return ret
}
/**
* Fetch current/historical sensor data from InfluxDB 1.8
* @param {Object} opts - Options object
* @param {string} opts.sensorid - Sensor ID
* @param {string} opts.start - Start time (e.g., "start: -1h")
* @param {string} opts.stop - Stop time (e.g., "stop: now()")
* @param {number} opts.sort - Sort order (1 for ascending, -1 for descending)
* @returns {Object} - {err: null, values: []}
*/
export const fetchActData = async (opts) => {
let ret = { err: null, values: [] }
// Convert Flux time format to InfluxQL format
let startTime = opts.start.replace('start: ', '').trim()
let stopTime = opts.stop.replace('stop: ', '').trim()
// Build sorting clause
let orderClause = ''
if (opts.sort) {
if (opts.sort === 1) {
orderClause = 'ORDER BY time ASC'
} else if (opts.sort === -1) {
orderClause = 'ORDER BY time DESC'
}
}
// InfluxQL query to get LA_max for a sensor within time range
// Note: In InfluxDB 1.8 we only have LA_max, not E10tel_eq like in 2.0
const query = `
SELECT "LA_max", "LA_min", "LA_eq"
FROM "measurements"
WHERE "sid" = '${opts.sensorid}'
AND time >= ${startTime}
AND time <= ${stopTime}
${orderClause}
`
// Get the data and transform it to include E10tel_eq equivalent
const result = await fetchFromInflux(ret, query)
if (result.err) {
return result
}
// Transform data to add E10tel_eq field for compatibility with Flux version
// E10tel_eq = 10^(LA_max/10)
result.values = result.values.map(record => ({
...record,
E10tel_eq: record.LA_max !== null && record.LA_max !== undefined
? Math.pow(10, record.LA_max / 10)
: null
}))
return result
}
/**
* Helper function to calculate logarithmic average for decibel values
* For decibel values, we need to:
* 1. Convert dB to linear scale (10^(dB/10))
* 2. Calculate arithmetic mean of linear values
* 3. Convert back to dB (10 * log10(mean))
* @param {Array} values - Array of decibel values
* @returns {number} - Logarithmic average in dB
*/
const calculateLogMean = (values) => {
if (!values || values.length === 0) return null
// Convert dB to linear scale, calculate mean, convert back to dB
const linearSum = values.reduce((sum, val) => {
if (val !== null && val !== undefined) {
return sum + Math.pow(10, val / 10)
}
return sum
}, 0)
const validCount = values.filter(val => val !== null && val !== undefined).length
if (validCount === 0) return null
const linearMean = linearSum / validCount
return 10 * Math.log10(linearMean)
}
/**
* Fetch noise averaging data from InfluxDB 1.8 with proper logarithmic averaging for LAmax
* @param {Object} opts - Options object
* @param {string} opts.sensorid - Sensor ID
* @param {string} opts.start - Start time
* @param {string} opts.stop - Stop time
* @param {number} opts.peak - Peak threshold for counting
* @param {boolean} opts.long - Return full data or just summarized
* @returns {Object} - {err: null, values: []}
*/
export const fetchNoiseAVGData = async (opts) => {
let ret = { err: null, values: [] }
// Convert Flux time format to InfluxQL format
let startTime = opts.start.replace('start: ', '').trim()
let stopTime = opts.stop.replace('stop: ', '').trim()
// Since InfluxQL doesn't support complex joins like Flux, we need to make multiple queries
// and combine the results in JavaScript
// Query 1: Get LA_max data aggregated by hour for E10tel calculation
// In InfluxDB 1.8, we only have LA_max (dB), need to convert to E10tel equivalent
const queryLAmaxForE10 = `
SELECT "LA_max", time
FROM "measurements"
WHERE "sid" = '${opts.sensorid}'
AND time >= ${startTime}
AND time <= ${stopTime}
AND "LA_max" IS NOT NULL
ORDER BY time ASC
`
// Query 2: Same query for peak counting (we'll process the same data)
const queryLAmaxForPeaks = queryLAmaxForE10
try {
// Execute LA_max query (we use the same data for both E10tel calculation and peak counting)
let { values: lamaxValues, err: lamaxErr } = await influxRead(queryLAmaxForE10)
if (lamaxErr) {
return returnOnError(ret, lamaxErr, fetchNoiseAVGData.name)
}
if (!lamaxValues || !lamaxValues.length || !lamaxValues[0].series) {
return returnOnError(ret, 'NODATA', fetchNoiseAVGData.name)
}
// Transform LA_max results
const lamaxData = transformInfluxResult(lamaxValues[0].series)
// Group LA_max data by hour and calculate:
// 1. E10tel equivalent values (10^(LA_max/10))
// 2. Peak counting
// 3. Statistics for n_AVG calculation
const hourlyData = {}
lamaxData.forEach(record => {
const timestamp = new Date(record._time)
const hourKey = new Date(timestamp.getFullYear(), timestamp.getMonth(),
timestamp.getDate(), timestamp.getHours()).toISOString()
if (!hourlyData[hourKey]) {
hourlyData[hourKey] = {
time: hourKey,
lamaxValues: [],
e10telValues: [], // Converted LA_max to E10tel equivalent
peakCount: 0
}
}
const lamax = record.LA_max
if (lamax !== null && lamax !== undefined) {
// Store original LA_max value
hourlyData[hourKey].lamaxValues.push(lamax)
// Convert LA_max (dB) to E10tel equivalent: 10^(LA_max/10)
const e10tel = Math.pow(10, lamax / 10)
hourlyData[hourKey].e10telValues.push(e10tel)
// Count peaks
if (lamax >= opts.peak) {
hourlyData[hourKey].peakCount++
}
}
})
// Calculate final results for each hour
const combinedResults = []
Object.values(hourlyData).forEach(hourData => {
const result = {
_time: hourData.time,
count: hourData.e10telValues.length,
peakcount: hourData.peakCount
}
// Calculate E10tel statistics
if (hourData.e10telValues.length > 0) {
// Sum of E10tel values
result.n_sum = hourData.e10telValues.reduce((sum, val) => sum + val, 0)
// Mean of E10tel values, then convert back to dB for n_AVG
// This matches the Flux version: mean(E10tel_eq) then 10*log10(mean)
const e10telMean = result.n_sum / hourData.e10telValues.length
result.n_AVG = 10.0 * Math.log10(e10telMean)
}
// Add additional fields if opts.long is true
if (opts.long) {
result.LA_max_values = hourData.lamaxValues
result.LA_max_log_avg = calculateLogMean(hourData.lamaxValues)
result.E10tel_values = hourData.e10telValues
}
combinedResults.push(result)
})
// Sort by time
combinedResults.sort((a, b) => new Date(a._time) - new Date(b._time))
// Filter results based on opts.long
if (!opts.long) {
ret.values = combinedResults.map(record => ({
_time: record._time,
peakcount: record.peakcount,
n_AVG: record.n_AVG
}))
} else {
ret.values = combinedResults
}
} catch (e) {
return returnOnError(ret, e, fetchNoiseAVGData.name)
}
return ret
}
// Export write function for compatibility
export { influxWrite }