// Access to InfluxDB 1.8 via HTTP using InfluxQL // // IMPORTANT: InfluxDB 1.8 vs 2.0 Data Schema Differences: // - InfluxDB 1.8: Only stores LA_max, LA_min, LA_eq (all in dB) // - InfluxDB 2.0: Additionally stores E10tel_eq as pre-calculated linear value (10^(LA_max/10)) // // This implementation converts LA_max to E10tel_eq at runtime to maintain // compatibility with the Flux version while ensuring correct logarithmic averaging. import 'dotenv/config' import axios from 'axios' import { DateTime } from 'luxon' import { logit, logerror } from '../utilities/logit.js' import { returnOnError } from "../utilities/reporterror.js" // InfluxDB 1.8 Configuration let INFLUXHOST = process.env.INFLUXHOST || "localhost" let INFLUXPORT = process.env.INFLUXPORT || 8086 let INFLUXUSER = process.env.INFLUXUSER || "" let INFLUXPASS = process.env.INFLUXPASS || "" let INFLUXDB = process.env.INFLUXDB || "sensor_data" // InfluxDB 1.8 URLs const INFLUXURL_READ = `http://${INFLUXHOST}:${INFLUXPORT}/query` const INFLUXURL_WRITE = `http://${INFLUXHOST}:${INFLUXPORT}/write` /** * Execute InfluxQL query against InfluxDB 1.8 * @param {string} query - InfluxQL query string * @returns {Object} - {values: [], err: null} */ const influxRead = async (query) => { let start = DateTime.now() logit(`ReadInflux from ${INFLUXURL_READ}`) logit(`Query: ${query}`) let erg = { values: [], err: null} try { const params = { db: INFLUXDB, q: query, epoch: 'ms' // Return timestamps in milliseconds } // Add authentication if provided if (INFLUXUSER && INFLUXPASS) { params.u = INFLUXUSER params.p = INFLUXPASS } let ret = await axios({ method: 'get', url: INFLUXURL_READ, params: params, timeout: 10000, }) if (ret.status !== 200) { erg.err = `RESPSTATUS: ${ret.status}` logit(`ERROR ${influxRead.name}: ${erg.err}`) return erg } // InfluxDB 1.8 returns JSON format if (ret.data.error) { erg.err = ret.data.error logit(`ERROR ${influxRead.name}: ${erg.err}`) return erg } erg.values = ret.data.results } catch (e) { erg.err = e.toString() logit(`ERROR ${influxRead.name}: ${erg.err}`) return erg } logit(`Influx read time: ${start.diffNow('seconds').toObject().seconds * -1} sec`) return erg } /** * Write data to InfluxDB 1.8 * @param {string} data - Line protocol data * @returns {Object} - Response object */ const influxWrite = async (data) => { let start = DateTime.now() let ret try { const params = { db: INFLUXDB, precision: 'ms' } // Add authentication if provided if (INFLUXUSER && INFLUXPASS) { params.u = INFLUXUSER params.p = INFLUXPASS } ret = await axios({ method: 'post', url: INFLUXURL_WRITE, params: params, data: data, headers: { 'Content-Type': 'text/plain; charset=utf-8' }, timeout: 10000, }) if (ret.status !== 204) { logerror(`doWrite2API Status: ${ret.status}`) } } catch (e) { logerror(`doWrite2API ${e}`) } logit(`Influx-Write-Time: ${start.diffNow('seconds').toObject().seconds * -1} sec`) return ret } /** * Helper function to transform InfluxDB 1.8 result to format compatible with Flux version * @param {Array} series - InfluxDB series data * @returns {Array} - Transformed data array */ const transformInfluxResult = (series) => { if (!series || !series.length) return [] const result = [] series.forEach(serie => { if (!serie.values) return const columns = serie.columns const timeIndex = columns.indexOf('time') serie.values.forEach(row => { const record = {} columns.forEach((col, index) => { if (col === 'time') { // Convert timestamp to ISO string for compatibility record.datetime = new Date(row[index]).toISOString() } else if (col.startsWith('DNMS')) { col = col.slice(11) record[col] = row[index] } else { record[col] = row[index] } }) result.push(record) }) }) return result } /** * Execute query and transform results * @param {Object} ret - Return object * @param {string} query - InfluxQL query * @returns {Object} - Transformed result */ const fetchFromInflux = async (ret, query) => { let { values, err } = await influxRead(query) if (err) { ret.err = err.toString().includes('400') ? 'SYNTAXURL' : err.toString() logit(`ERROR ${fetchFromInflux.name}: ${ret.err}`) return ret } logit(`values.length: ${values.length}`) if (!values || !values.length || !values[0].series) { ret.err = 'NODATA' logit(`ERROR ${fetchFromInflux.name}: No data returned from query`) return ret } ret.values = transformInfluxResult(values[0].series) return ret } /** * Fetch current/historical sensor data from InfluxDB 1.8 * @param {Object} opts - Options object * @param {string} opts.sensorid - Sensor ID * @param {string} opts.start - Start time (e.g., "start: -1h") * @param {string} opts.stop - Stop time (e.g., "stop: now()") * @param {number} opts.sort - Sort order (1 for ascending, -1 for descending) * @returns {Object} - {err: null, values: []} */ export const fetchActData = async (opts) => { let ret = { err: null, values: [] } // Convert Flux time format to InfluxQL format let startTime = opts.start.replace('start: ', '').trim() let stopTime = opts.stop.replace('stop: ', '').trim() // If time is ISO string, wrap in quotes; if it's a relative time (like now() - 1h), leave as is if (startTime.match(/^\d{4}-\d{2}-\d{2}T/)) { startTime = `'${startTime}'` } if (stopTime.match(/^\d{4}-\d{2}-\d{2}T/)) { stopTime = `'${stopTime}'` } // Build sorting clause let orderClause = '' if (opts.sort) { if (opts.sort === 1) { orderClause = 'ORDER BY time ASC' } else if (opts.sort === -1) { orderClause = 'ORDER BY time DESC' } } // InfluxQL query to get LA_max for a sensor within time range // Note: In InfluxDB 1.8 we only have LA_max, not E10tel_eq like in 2.0 const query = ` SELECT "DNMS_noise_LA_max", "DNMS_noise_LA_min", "DNMS_noise_LAeq" FROM "DNMS" WHERE "node" = '${opts.chipid}' AND time >= ${startTime} AND time <= ${stopTime} ${orderClause} ` // Get the data and transform it to include E10tel_eq equivalent const result = await fetchFromInflux(ret, query) if (result.err) { return result } // Transform data to add E10tel_eq field for compatibility with Flux version // E10tel_eq = 10^(LA_max/10) if (opts.data !== 'live') { result.values = result.values.map(record => ({ ...record, E10tel_eq: record.LA_max !== null && record.LA_max !== undefined ? Math.pow(10, record.LA_max / 10) : null }))} return result } /** * Helper function to calculate logarithmic average for decibel values * For decibel values, we need to: * 1. Convert dB to linear scale (10^(dB/10)) * 2. Calculate arithmetic mean of linear values * 3. Convert back to dB (10 * log10(mean)) * @param {Array} values - Array of decibel values * @returns {number} - Logarithmic average in dB */ const calculateLogMean = (values) => { if (!values || values.length === 0) return null // Convert dB to linear scale, calculate mean, convert back to dB const linearSum = values.reduce((sum, val) => { if (val !== null && val !== undefined) { return sum + Math.pow(10, val / 10) } return sum }, 0) const validCount = values.filter(val => val !== null && val !== undefined).length if (validCount === 0) return null const linearMean = linearSum / validCount return 10 * Math.log10(linearMean) } /** * Fetch noise averaging data from InfluxDB 1.8 with proper logarithmic averaging for LAmax * @param {Object} opts - Options object * @param {string} opts.sensorid - Sensor ID * @param {string} opts.start - Start time * @param {string} opts.stop - Stop time * @param {number} opts.peak - Peak threshold for counting * @param {boolean} opts.long - Return full data or just summarized * @returns {Object} - {err: null, values: []} */ export const fetchNoiseAVGData = async (opts) => { let ret = { err: null, values: [] } // Convert Flux time format to InfluxQL format let startTime = opts.start.replace('start: ', '').trim() let stopTime = opts.stop.replace('stop: ', '').trim() // If time is ISO string, wrap in quotes; if it's a relative time (like now() - 1h), leave as is if (startTime.match(/^\d{4}-\d{2}-\d{2}T/)) { startTime = `'${startTime}'` } if (stopTime.match(/^\d{4}-\d{2}-\d{2}T/)) { stopTime = `'${stopTime}'` } // Since InfluxQL doesn't support complex joins like Flux, we need to make multiple queries // and combine the results in JavaScript // Query 1: Get LA_max data aggregated by hour for E10tel calculation // In InfluxDB 1.8, we only have LA_max (dB), need to convert to E10tel equivalent const queryLAmaxForE10 = ` SELECT "DNMS_noise_LA_max" FROM "DNMS" WHERE "node" = '${opts.chipid}' AND time >= ${startTime} AND time <= ${stopTime} ORDER BY time ASC ` // Query 2: Same query for peak counting (we'll process the same data) const queryLAmaxForPeaks = queryLAmaxForE10 try { // Execute LA_max query (we use the same data for both E10tel calculation and peak counting) let { values: lamaxValues, err: lamaxErr } = await influxRead(queryLAmaxForE10) if (lamaxErr) { ret.err = lamaxErr.toString() logit(`ERROR ${fetchNoiseAVGData.name}: ${ret.err}`) return ret } if (!lamaxValues || !lamaxValues.length || !lamaxValues[0].series) { ret.err = 'NODATA' logit(`ERROR ${fetchNoiseAVGData.name}: No data returned from query`) return ret } // Transform LA_max results const lamaxData = transformInfluxResult(lamaxValues[0].series) // Group LA_max data by hour and calculate: // 1. E10tel equivalent values (10^(LA_max/10)) // 2. Peak counting // 3. Statistics for n_AVG calculation const hourlyData = {} lamaxData.forEach(record => { const timestamp = new Date(record._time) const hourKey = new Date(timestamp.getFullYear(), timestamp.getMonth(), timestamp.getDate(), timestamp.getHours()).toISOString() if (!hourlyData[hourKey]) { hourlyData[hourKey] = { time: hourKey, lamaxValues: [], e10telValues: [], // Converted LA_max to E10tel equivalent peakCount: 0 } } const lamax = record.DNMS_noise_LA_max || record.LA_max if (lamax !== null && lamax !== undefined) { // Store original LA_max value hourlyData[hourKey].lamaxValues.push(lamax) // Convert LA_max (dB) to E10tel equivalent: 10^(LA_max/10) const e10tel = Math.pow(10, lamax / 10) hourlyData[hourKey].e10telValues.push(e10tel) // Count peaks if (lamax >= opts.peak) { hourlyData[hourKey].peakCount++ } } }) // Calculate final results for each hour const combinedResults = [] Object.values(hourlyData).forEach(hourData => { const result = { _time: hourData.time, count: hourData.e10telValues.length, peakcount: hourData.peakCount } // Calculate E10tel statistics if (hourData.e10telValues.length > 0) { // Sum of E10tel values result.n_sum = hourData.e10telValues.reduce((sum, val) => sum + val, 0) // Mean of E10tel values, then convert back to dB for n_AVG // This matches the Flux version: mean(E10tel_eq) then 10*log10(mean) const e10telMean = result.n_sum / hourData.e10telValues.length result.n_AVG = 10.0 * Math.log10(e10telMean) } // Add additional fields if opts.long is true if (opts.long) { result.LA_max_values = hourData.lamaxValues result.LA_max_log_avg = calculateLogMean(hourData.lamaxValues) result.E10tel_values = hourData.e10telValues } combinedResults.push(result) }) // Sort by time combinedResults.sort((a, b) => new Date(a._time) - new Date(b._time)) // Filter results based on opts.long if (!opts.long) { ret.values = combinedResults.map(record => ({ _time: record._time, peakcount: record.peakcount, n_AVG: record.n_AVG })) } else { ret.values = combinedResults } } catch (e) { ret.err = e.toString() logit(`ERROR ${fetchNoiseAVGData.name}: ${ret.err}`) return ret } return ret } /** * Fetch latest LA_max values for multiple chip IDs * @param {Object} opts - Options object * @param {Array} opts.chipids - Array of chip IDs * @returns {Object} - {err: null, values: [{chipid, LA_max, timestamp}]} */ export const fetchLatestLAmaxForChips = async (opts) => { let ret = { err: null, values: [] } if (!opts.chipids || !Array.isArray(opts.chipids) || opts.chipids.length === 0) { ret.err = 'No chip IDs provided' logit(`ERROR ${fetchLatestLAmaxForChips.name}: ${ret.err}`) return ret } try { // Build WHERE clause with multiple chip IDs using OR (InfluxQL doesn't support IN) const chipIdConditions = opts.chipids.map(id => `"node" = '${id}'`).join(' OR ') // Query to get latest LA_max for each chip const query = `SELECT "DNMS_noise_LA_max", "node" FROM "DNMS" WHERE (${chipIdConditions}) AND time >= now() - 24h ORDER BY time DESC` let { values: lamaxValues, err: lamaxErr } = await influxRead(query) if (lamaxErr) { ret.err = lamaxErr.toString() logit(`ERROR ${fetchLatestLAmaxForChips.name}: ${ret.err}`) return ret } if (!lamaxValues || !lamaxValues.length || !lamaxValues[0].series) { ret.err = 'NODATA' logit(`ERROR ${fetchLatestLAmaxForChips.name}: No data returned from query`) return ret } // Transform results const allData = transformInfluxResult(lamaxValues[0].series) // Get latest value for each chip (data is already sorted by time DESC) const latestByChip = {} allData.forEach(record => { const chipid = record.node const lamax = record.LA_max // Only keep the first (latest) value for each chip if (!latestByChip[chipid] && lamax !== null && lamax !== undefined) { latestByChip[chipid] = { chipid: chipid, LA_max: lamax, timestamp: record.datetime } } }) // Convert to array ret.values = Object.values(latestByChip) // Add null entries for chips without data opts.chipids.forEach(chipid => { if (!latestByChip[chipid]) { ret.values.push({ chipid: chipid, LA_max: null, timestamp: null }) } }) } catch (e) { ret.err = e.toString() logit(`ERROR ${fetchLatestLAmaxForChips.name}: ${ret.err}`) return ret } return ret } // Export write function for compatibility export { influxWrite }