Files
readin/fetchnewdata.js

141 lines
4.6 KiB
JavaScript

// Einlesen aller Sensordaten von sensor.comunity und abspeicher in einer Inlux- bzw. einer Mongo-DB
// Ausgehend von Version 1.x.x werden hier nun die Daten nach den Sensortypen getrennt gespeichert
// und zwar einstellbar in einer Mongo-DB oder in einer Influx-DB (oder auch in beide).
// Version:
//
// V 2.0.0 2023-10-13 rxf
// - Erste Version mit der Trennung nach Typen
//
// V 2.0.1 2023-10-15 rxf
// - Auswahl der Typen via Commandline (-t) oder Environment TYP
// TYP: Ist ein Array von Strings, wenn nur einzelne Typen gespeichert werden sollen, also z.B.:
// ['pm', 'noise']
const TYP = process.env.TYP || ''
import { doReadfromAPI as readin, statistics } from './readdata.js'
import { constructDBaseEntries as parse} from './parse.js'
import * as mongo from './mongo.js'
import * as influx from './influx_post.js'
import { logit, logerror } from './logit.js'
import { DateTime } from 'luxon'
import fs from 'fs'
import mod_getopt from 'posix-getopt'
const pkg = JSON.parse(fs.readFileSync(new URL('./package.json', import.meta.url), 'utf8'))
// import nodeSchedule from 'node-schedule'
const fetchNewData = async (args) => {
let client
let start = DateTime.now();
try {
client = await mongo.connectMongo()
} catch(e) {
logerror(`Connect to Mongo ${e}`);
logit(`Programmend - Error in connecting mongo\n`)
process.exit(-1);
}
try {
// read data from API or from disk
let dat = await readin()
if (dat.length !== 0) {
// parse the data
let [props, data, idata] = await parse(client, dat, args)
// write sensor data to mongoDB
if(args.mongo) {
for (const [k,v] of Object.entries(data)) {
if(v.length !== 0) {
await mongo.writeDataArray(client, k+'_sensors', v)
}
}
}
// write sensor data to influxDB
if(args.influx) {
await influx.influxWrite(idata)
}
// write properties to mongoDB
await mongo.bulkWrite(client, mongo.property_coll, props)
}
} catch (e) {
logerror(`Catch in main ${e}`);
} finally {
statistics.totalTime = DateTime.now().diff(start,['seconds']).toObject().seconds
await mongo.writeStatistic(client, statistics)
await mongo.closeMongo(client)
}
}
// Parse command line options
function parse_cmdline(argv) {
let parser = new mod_getopt.BasicParser('i(influx)m(mongo)t:(typ)h(help)v(version)',argv);
let option;
let ret = {influx: false, mongo: true, typ: TYP}
while((option = parser.getopt()) !== undefined) {
switch(option.option) {
case 'i':
ret.mongo = false
break;
case 'm':
ret.influx = false
break;
case 't':
let x = option.optarg.trim()
let y = []
if(x[0] === '[') {
y = JSON.parse(x)
} else {
y.push(x)
}
ret.typ = y
break;
case 'v':
console.log(`Version: ${pkg.version} from ${pkg.date}`);
console.log();
process.exit();
break;
case 'h':
console.log("Usage: node fetchnewdata.js [-i] [-m] [-t [typ, typ, ..]] [-v] [-h]");
console.log("Params:");
console.log(" -i use only InfluxDB to store the data (default use both, InfluxDB and MongoDB))");
console.log(" -m use only MongoDB to store the data (default use both, InfluxDB and MongoDB)");
console.log(" -t [ sensorType, ..]: if given, only those sensors will be used (ex: laerm) default: all");
console.log(" MUST BE AN ARRAY!; allowed types: 'pm', 'noise', 'radiactivity', 'thp', 'gps'.")
console.log(" -v version: show version");
console.log(" -h this help text");
console.log("All parameters are optional.");
console.log();
process.exit();
break;
default:
break;
}
}
logit(JSON.stringify(ret));
return ret;
}
const main = async () => {
const json = JSON.parse(fs.readFileSync('package.json', 'utf8'))
logit(`Programmstart V ${json.version} vom ${json.date}.`);
let args = parse_cmdline(process.argv);
await fetchNewData(args)
logit(`Program end - running time: ${statistics.totalTime} sec\n\n`)
}
main().catch(console.error)