141 lines
4.6 KiB
JavaScript
141 lines
4.6 KiB
JavaScript
// Einlesen aller Sensordaten von sensor.comunity und abspeicher in einer Inlux- bzw. einer Mongo-DB
|
|
|
|
// Ausgehend von Version 1.x.x werden hier nun die Daten nach den Sensortypen getrennt gespeichert
|
|
// und zwar einstellbar in einer Mongo-DB oder in einer Influx-DB (oder auch in beide).
|
|
|
|
// Version:
|
|
//
|
|
// V 2.0.0 2023-10-13 rxf
|
|
// - Erste Version mit der Trennung nach Typen
|
|
//
|
|
// V 2.0.1 2023-10-15 rxf
|
|
// - Auswahl der Typen via Commandline (-t) oder Environment TYP
|
|
|
|
// TYP: Ist ein Array von Strings, wenn nur einzelne Typen gespeichert werden sollen, also z.B.:
|
|
// ['pm', 'noise']
|
|
const TYP = process.env.TYP || ''
|
|
|
|
import { doReadfromAPI as readin, statistics } from './readdata.js'
|
|
import { constructDBaseEntries as parse} from './parse.js'
|
|
|
|
import * as mongo from './mongo.js'
|
|
import * as influx from './influx_post.js'
|
|
import { logit, logerror } from './logit.js'
|
|
import { DateTime } from 'luxon'
|
|
import fs from 'fs'
|
|
import mod_getopt from 'posix-getopt'
|
|
const pkg = JSON.parse(fs.readFileSync(new URL('./package.json', import.meta.url), 'utf8'))
|
|
|
|
|
|
// import nodeSchedule from 'node-schedule'
|
|
|
|
const fetchNewData = async (args) => {
|
|
let client
|
|
let start = DateTime.now();
|
|
try {
|
|
client = await mongo.connectMongo()
|
|
} catch(e) {
|
|
logerror(`Connect to Mongo ${e}`);
|
|
logit(`Programmend - Error in connecting mongo\n`)
|
|
process.exit(-1);
|
|
}
|
|
try {
|
|
// read data from API or from disk
|
|
let dat = await readin()
|
|
if (dat.length !== 0) {
|
|
// parse the data
|
|
let [props, data, idata] = await parse(client, dat, args)
|
|
|
|
// write sensor data to mongoDB
|
|
if(args.mongo) {
|
|
for (const [k,v] of Object.entries(data)) {
|
|
if(v.length !== 0) {
|
|
await mongo.writeDataArray(client, k+'_sensors', v)
|
|
}
|
|
}
|
|
}
|
|
|
|
// write sensor data to influxDB
|
|
if(args.influx) {
|
|
await influx.influxWrite(idata)
|
|
}
|
|
|
|
// write properties to mongoDB
|
|
await mongo.bulkWrite(client, mongo.property_coll, props)
|
|
}
|
|
} catch (e) {
|
|
logerror(`Catch in main ${e}`);
|
|
} finally {
|
|
statistics.totalTime = DateTime.now().diff(start,['seconds']).toObject().seconds
|
|
await mongo.writeStatistic(client, statistics)
|
|
await mongo.closeMongo(client)
|
|
}
|
|
}
|
|
|
|
// Parse command line options
|
|
function parse_cmdline(argv) {
|
|
let parser = new mod_getopt.BasicParser('i(influx)m(mongo)t:(typ)h(help)v(version)',argv);
|
|
let option;
|
|
let ret = {influx: false, mongo: true, typ: TYP}
|
|
while((option = parser.getopt()) !== undefined) {
|
|
switch(option.option) {
|
|
case 'i':
|
|
ret.mongo = false
|
|
break;
|
|
|
|
case 'm':
|
|
ret.influx = false
|
|
break;
|
|
|
|
case 't':
|
|
let x = option.optarg.trim()
|
|
let y = []
|
|
if(x[0] === '[') {
|
|
y = JSON.parse(x)
|
|
} else {
|
|
y.push(x)
|
|
}
|
|
ret.typ = y
|
|
break;
|
|
|
|
case 'v':
|
|
console.log(`Version: ${pkg.version} from ${pkg.date}`);
|
|
console.log();
|
|
process.exit();
|
|
break;
|
|
|
|
case 'h':
|
|
console.log("Usage: node fetchnewdata.js [-i] [-m] [-t [typ, typ, ..]] [-v] [-h]");
|
|
console.log("Params:");
|
|
console.log(" -i use only InfluxDB to store the data (default use both, InfluxDB and MongoDB))");
|
|
console.log(" -m use only MongoDB to store the data (default use both, InfluxDB and MongoDB)");
|
|
console.log(" -t [ sensorType, ..]: if given, only those sensors will be used (ex: laerm) default: all");
|
|
console.log(" MUST BE AN ARRAY!; allowed types: 'pm', 'noise', 'radiactivity', 'thp', 'gps'.")
|
|
console.log(" -v version: show version");
|
|
console.log(" -h this help text");
|
|
console.log("All parameters are optional.");
|
|
console.log();
|
|
process.exit();
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
logit(JSON.stringify(ret));
|
|
return ret;
|
|
}
|
|
|
|
|
|
|
|
const main = async () => {
|
|
const json = JSON.parse(fs.readFileSync('package.json', 'utf8'))
|
|
logit(`Programmstart V ${json.version} vom ${json.date}.`);
|
|
let args = parse_cmdline(process.argv);
|
|
await fetchNewData(args)
|
|
logit(`Program end - running time: ${statistics.totalTime} sec\n\n`)
|
|
}
|
|
|
|
main().catch(console.error)
|
|
|