first
This commit is contained in:
9
.dockerignore
Normal file
9
.dockerignore
Normal file
@@ -0,0 +1,9 @@
|
||||
node-modules
|
||||
.gitignore
|
||||
.dockerignore
|
||||
build_and_copy.sh
|
||||
docker-compose.yml
|
||||
Dockerfile*
|
||||
INFLUXSERVER
|
||||
TRANSFERE
|
||||
|
||||
7
.gitignore
vendored
Normal file
7
.gitignore
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
data/
|
||||
node_modules
|
||||
.idea
|
||||
.env
|
||||
log
|
||||
|
||||
|
||||
21
.vscode/launch.json
vendored
Normal file
21
.vscode/launch.json
vendored
Normal file
@@ -0,0 +1,21 @@
|
||||
{
|
||||
// Use IntelliSense to learn about possible attributes.
|
||||
// Hover to view descriptions of existing attributes.
|
||||
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"type": "node",
|
||||
"request": "launch",
|
||||
"name": "Launch Program",
|
||||
"skipFiles": [
|
||||
"<node_internals>/**"
|
||||
],
|
||||
"program": "${workspaceFolder}/fetchnewdata.js",
|
||||
"args": [],
|
||||
"env": {
|
||||
"MONGOPORT": "27098",
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
27
Dockerfile_timeseries
Normal file
27
Dockerfile_timeseries
Normal file
@@ -0,0 +1,27 @@
|
||||
FROM node:alpine
|
||||
|
||||
ADD package.json /tmp/package.json
|
||||
RUN cd /tmp && npm install
|
||||
RUN mkdir -p /opt/app && cp -a /tmp/node_modules /opt/app/
|
||||
|
||||
WORKDIR /opt/app
|
||||
ADD . /opt/app
|
||||
ADD crontab.tmp /opt/app
|
||||
|
||||
RUN mkdir -p data
|
||||
|
||||
#RUN apk add busybox-initscripts
|
||||
#RUN apk add --no-cache tzdata
|
||||
#ENV TZ Europe/Berlin
|
||||
#RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
RUN crontab crontab.tmp
|
||||
RUN deluser --remove-home node
|
||||
|
||||
RUN touch cmds.sh \
|
||||
&& echo 'crond -f' >>cmds.sh
|
||||
|
||||
CMD sh ./cmds.sh
|
||||
|
||||
|
||||
|
||||
15
TRANSFERE/create_collections
Normal file
15
TRANSFERE/create_collections
Normal file
@@ -0,0 +1,15 @@
|
||||
db = connect( 'mongodb://localhost/allsensors')
|
||||
|
||||
let options = {
|
||||
timeseries: {
|
||||
timeField: "datetime",
|
||||
metaField: "sensorid",
|
||||
}
|
||||
}
|
||||
|
||||
db.createCollection("pm_sensors", options)
|
||||
db.createCollection("thp_sensors", options)
|
||||
db.createCollection("noise_sensors", options)
|
||||
db.createCollection("radioactivity_sensors", options)
|
||||
db.createCollection("gps_sensors", options)
|
||||
db.createCollection("unknown_sensors", options)
|
||||
63
build_and_copy.sh
Executable file
63
build_and_copy.sh
Executable file
@@ -0,0 +1,63 @@
|
||||
#!/bin/bash
|
||||
# Build Docker-Container
|
||||
#
|
||||
# Call: buildit.sh name [target]
|
||||
#
|
||||
# The Dockerfile must be named like Dockerfile_name
|
||||
#
|
||||
# 2018-09-20 rxf
|
||||
# - before sending docker image to remote, tag actual remote image
|
||||
#
|
||||
# 2018-09-14 rxf
|
||||
# - first Version
|
||||
#
|
||||
|
||||
set -x
|
||||
port=""
|
||||
orgName=timeseries
|
||||
name=timeseries
|
||||
|
||||
usage()
|
||||
{
|
||||
echo "Usage build_and_copy.sh [-p port] [-n name] target"
|
||||
echo " Build docker container $name and copy to target"
|
||||
echo "Params:"
|
||||
echo " target: Where to copy the container to "
|
||||
echo " -p port: ssh port (default 22)"
|
||||
echo " -n name: new name for container (default: $orgName)"
|
||||
}
|
||||
|
||||
while getopts n:p:h? o
|
||||
do
|
||||
case "$o" in
|
||||
n) name="$OPTARG";;
|
||||
p) port="-p $OPTARG";;
|
||||
h) usage; exit 0;;
|
||||
*) usage; exit 1;;
|
||||
esac
|
||||
done
|
||||
shift $((OPTIND-1))
|
||||
|
||||
while [ $# -gt 0 ]; do
|
||||
if [[ -z "$target" ]]; then
|
||||
target=$1
|
||||
shift
|
||||
else
|
||||
echo "bad option $1"
|
||||
# exit 1
|
||||
shift
|
||||
fi
|
||||
done
|
||||
|
||||
docker build -f Dockerfile_$orgName -t $name .
|
||||
|
||||
dat=`date +%Y%m%d%H%M`
|
||||
|
||||
if [ "$target" == "localhost" ]
|
||||
then
|
||||
docker tag $name $name:V_$dat
|
||||
exit
|
||||
fi
|
||||
|
||||
ssh $port $target "docker tag $name $name:V_$dat"
|
||||
docker save $name | bzip2 | pv | ssh $port $target 'bunzip2 | docker load'
|
||||
2
crontab.tmp
Normal file
2
crontab.tmp
Normal file
@@ -0,0 +1,2 @@
|
||||
1-59/5 * * * * cd /opt/app && npm start
|
||||
|
||||
45
docker-compose.yml
Normal file
45
docker-compose.yml
Normal file
@@ -0,0 +1,45 @@
|
||||
version: '3.9'
|
||||
|
||||
volumes:
|
||||
mongo_data_1:
|
||||
|
||||
services:
|
||||
node:
|
||||
image: timeseries
|
||||
environment:
|
||||
DEVELOP: "true"
|
||||
MONGOHOST: mongodb
|
||||
INFLUXHOST: influxdb
|
||||
TYP: "[\"noise\", \"thp\"]"
|
||||
volumes:
|
||||
- ${PWD}/log:/var/log
|
||||
container_name: timeseries
|
||||
restart: unless-stopped
|
||||
|
||||
influx:
|
||||
image: influxdb:latest
|
||||
container_name: influxdb
|
||||
volumes:
|
||||
- ${PWD}/INFLUXSERVER/data:/var/lib/influxdb2
|
||||
- ${PWD}/INFLUXSERVER/config:/etc/influxdb2
|
||||
ports:
|
||||
- '8086:8086'
|
||||
environment:
|
||||
DOCKER_INFLUXDB_INIT_MODE: ${DOCKER_INFLUXDB_INIT_MODE}
|
||||
DOCKER_INFLUXDB_INIT_USERNAME: ${DOCKER_INFLUXDB_INIT_USERNAME}
|
||||
DOCKER_INFLUXDB_INIT_PASSWORD: ${DOCKER_INFLUXDB_INIT_PASSWORD}
|
||||
DOCKER_INFLUXDB_INIT_ORG: ${DOCKER_INFLUXDB_INIT_ORG}
|
||||
DOCKER_INFLUXDB_INIT_BUCKET: ${DOCKER_INFLUXDB_INIT_BUCKET}
|
||||
restart: unless-stopped
|
||||
|
||||
mongodb:
|
||||
image: mongo
|
||||
volumes:
|
||||
- mongo_data_1:/data/db
|
||||
- ${PWD}/TRANSFERE:/TRANSFERE
|
||||
ports:
|
||||
- "27098:27017"
|
||||
container_name: mongodb
|
||||
# command: '--auth'
|
||||
restart: unless-stopped
|
||||
|
||||
140
fetchnewdata.js
Normal file
140
fetchnewdata.js
Normal file
@@ -0,0 +1,140 @@
|
||||
// Einlesen aller Sensordaten von sensor.comunity und abspeicher in einer Inlux- bzw. einer Mongo-DB
|
||||
|
||||
// Ausgehend von Version 1.x.x werden hier nun die Daten nach den Sensortypen getrennt gespeichert
|
||||
// und zwar einstellbar in einer Mongo-DB oder in einer Influx-DB (oder auch in beide).
|
||||
|
||||
// Version:
|
||||
//
|
||||
// V 2.0.0 2023-10-13 rxf
|
||||
// - Erste Version mit der Trennung nach Typen
|
||||
//
|
||||
// V 2.0.1 2023-10-15 rxf
|
||||
// - Auswahl der Typen via Commandline (-t) oder Environment TYP
|
||||
|
||||
// TYP: Ist ein Array von Strings, wenn nur einzelne Typen gespeichert werden sollen, also z.B.:
|
||||
// ['pm', 'noise']
|
||||
const TYP = process.env.TYP || ''
|
||||
|
||||
import { doReadfromAPI as readin, statistics } from './readdata.js'
|
||||
import { constructDBaseEntries as parse} from './parse.js'
|
||||
|
||||
import * as mongo from './mongo.js'
|
||||
import * as influx from './influx_post.js'
|
||||
import { logit, logerror } from './logit.js'
|
||||
import { DateTime } from 'luxon'
|
||||
import fs from 'fs'
|
||||
import mod_getopt from 'posix-getopt'
|
||||
import pkg from './package.json' assert { type: "json" }
|
||||
|
||||
|
||||
// import nodeSchedule from 'node-schedule'
|
||||
|
||||
const fetchNewData = async (args) => {
|
||||
let client
|
||||
let start = DateTime.now();
|
||||
try {
|
||||
client = await mongo.connectMongo()
|
||||
} catch(e) {
|
||||
logerror(`Connect to Mongo ${e}`);
|
||||
logit(`Programmend - Error in connecting mongo\n`)
|
||||
process.exit(-1);
|
||||
}
|
||||
try {
|
||||
// read data from API or from disk
|
||||
let dat = await readin()
|
||||
if (dat.length !== 0) {
|
||||
// parse the data
|
||||
let [props, data, idata] = await parse(client, dat, args)
|
||||
|
||||
// write sensor data to mongoDB
|
||||
if(args.mongo) {
|
||||
for (const [k,v] of Object.entries(data)) {
|
||||
if(v.length !== 0) {
|
||||
await mongo.writeDataArray(client, k+'_sensors', v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// write sensor data to influxDB
|
||||
if(args.influx) {
|
||||
await influx.influxWrite(idata)
|
||||
}
|
||||
|
||||
// write properties to mongoDB
|
||||
await mongo.bulkWrite(client, mongo.property_coll, props)
|
||||
}
|
||||
} catch (e) {
|
||||
logerror(`Catch in main ${e}`);
|
||||
} finally {
|
||||
statistics.totalTime = DateTime.now().diff(start,['seconds']).toObject().seconds
|
||||
await mongo.writeStatistic(client, statistics)
|
||||
await mongo.closeMongo(client)
|
||||
}
|
||||
}
|
||||
|
||||
// Parse command line options
|
||||
function parse_cmdline(argv) {
|
||||
let parser = new mod_getopt.BasicParser('i(influx)m(mongo)t:(typ)h(help)v(version)',argv);
|
||||
let option;
|
||||
let ret = {influx: true, mongo: true, typ: TYP}
|
||||
while((option = parser.getopt()) !== undefined) {
|
||||
switch(option.option) {
|
||||
case 'i':
|
||||
ret.mongo = false
|
||||
break;
|
||||
|
||||
case 'm':
|
||||
ret.influx = false
|
||||
break;
|
||||
|
||||
case 't':
|
||||
let x = option.optarg.trim()
|
||||
let y = []
|
||||
if(x[0] === '[') {
|
||||
y = JSON.parse(x)
|
||||
} else {
|
||||
y.push(x)
|
||||
}
|
||||
ret.typ = y
|
||||
break;
|
||||
|
||||
case 'v':
|
||||
console.log(`Version: ${pkg.version} from ${pkg.date}`);
|
||||
console.log();
|
||||
process.exit();
|
||||
break;
|
||||
|
||||
case 'h':
|
||||
console.log("Usage: node fetchnewdata.js [-i] [-m] [-t [typ, typ, ..]] [-v] [-h]");
|
||||
console.log("Params:");
|
||||
console.log(" -i use only InfluxDB to store the data (default use both, InfluxDB and MongoDB))");
|
||||
console.log(" -m use only MongoDB to store the data (default use both, InfluxDB and MongoDB)");
|
||||
console.log(" -t [ sensorType, ..]: if given, only those sensors will be used (ex: laerm) default: all");
|
||||
console.log(" MUST BE AN ARRAY!; allowed types: 'pm', 'noise', 'radiactivity', 'thp', 'gps'.")
|
||||
console.log(" -v version: show version");
|
||||
console.log(" -h this help text");
|
||||
console.log("All parameters are optional.");
|
||||
console.log();
|
||||
process.exit();
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
logit(JSON.stringify(ret));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
const main = async () => {
|
||||
const json = JSON.parse(fs.readFileSync('package.json', 'utf8'))
|
||||
logit(`Programmstart V ${json.version} vom ${json.date}.`);
|
||||
let args = parse_cmdline(process.argv);
|
||||
await fetchNewData(args)
|
||||
logit(`Program end - running time: ${statistics.totalTime} sec\n\n`)
|
||||
}
|
||||
|
||||
main().catch(console.error)
|
||||
|
||||
97
influx_post.js
Normal file
97
influx_post.js
Normal file
@@ -0,0 +1,97 @@
|
||||
// Access to influxDB vie HTTP
|
||||
|
||||
import axios from 'axios'
|
||||
import { logit, logerror } from './logit.js'
|
||||
import { DateTime } from 'luxon'
|
||||
import { statistics } from'./readdata.js'
|
||||
|
||||
|
||||
let INFLUXHOST = process.env.INFLUXHOST || "localhost"
|
||||
let INFLUXPORT = process.env.INFLUXPORT || 8086
|
||||
let INFLUXTOKEN = process.env.INFLUXTOKEN ||
|
||||
"xuxTjvV7L3Mlr9diG36gMxExP_SbFntuJkp9KYj2_Hnz5U9zbCo7wurdkQqDtKO0Zchr6wbS8kGNW1L5I2V9YQ=="
|
||||
let INFLUXDATABUCKET = process.env.INFLUXDATABUCKET || "sensor_data"
|
||||
let INFLUXORG = process.env.INFLUXORG || "citysensor"
|
||||
|
||||
const INFLUXURL_READ = `http://${INFLUXHOST}:${INFLUXPORT}/api/v2/query?org=${INFLUXORG}`
|
||||
const INFLUXURL_WRITE = `http://${INFLUXHOST}:${INFLUXPORT}/api/v2/write?org=${INFLUXORG}&bucket=${INFLUXDATABUCKET}&precision=ms`
|
||||
|
||||
export const influxRead = async (query) => {
|
||||
let start = DateTime.now()
|
||||
let data = []
|
||||
try {
|
||||
let ret = await axios({
|
||||
method: 'post',
|
||||
url: INFLUXURL_READ,
|
||||
data: query,
|
||||
headers: {
|
||||
Authorization: `Token ${INFLUXTOKEN}`,
|
||||
Accept: 'application/csv',
|
||||
'Content-type': 'application/vnd.flux'
|
||||
},
|
||||
timeout: 10000,
|
||||
})
|
||||
if (ret.status != 200) {
|
||||
logerror(`doReadfromAPI Status: ${ret.status}`)
|
||||
}
|
||||
data = ret.data
|
||||
} catch (e) {
|
||||
logerror(`doReadfromAPI ${e}`)
|
||||
}
|
||||
logit(`ReadIn-Time: ${start.diffNow('seconds').toObject().seconds * -1} sec`)
|
||||
return data
|
||||
}
|
||||
|
||||
|
||||
export const influxWrite = async (data) => {
|
||||
let start = DateTime.now()
|
||||
let ret
|
||||
// logit(INFLUXURL_WRITE)
|
||||
try {
|
||||
ret = await axios({
|
||||
method: 'post',
|
||||
url: INFLUXURL_WRITE,
|
||||
data: data,
|
||||
headers: {
|
||||
Authorization: `Token ${INFLUXTOKEN}`,
|
||||
Accept: 'application/json',
|
||||
'Content-Type': 'text/plain; charset=utf-8'
|
||||
},
|
||||
timeout: 10000,
|
||||
})
|
||||
if (ret.status != 204) {
|
||||
logerror(`doWrite2API Status: ${ret.status}`)
|
||||
}
|
||||
} catch (e) {
|
||||
logerror(`doWrite2API ${e}`)
|
||||
}
|
||||
let statname = `writeInfluxData[sensor_data]Time`
|
||||
statistics[statname] = DateTime.now().diff(start, ['seconds']).toObject().seconds
|
||||
logit(`Influx-Write-Time: ${start.diffNow('seconds').toObject().seconds * -1} sec`)
|
||||
return ret
|
||||
}
|
||||
|
||||
/*
|
||||
async function main() {
|
||||
let data = `
|
||||
pm,sid=140 P1=12,P2=13
|
||||
pm,sid=142 P1=42,P2=13
|
||||
pm,sid=143 P1=43,P2=13
|
||||
pm,sid=144 P1=44,P2=13
|
||||
thp,sid=141 temperature=23.5,humidity=48,pressure=998
|
||||
`
|
||||
let ret = await influxWrite(data)
|
||||
process.exit()
|
||||
|
||||
let query = `from(bucket:"sensor_data")
|
||||
|> range(start: -1mo)
|
||||
|> filter(fn: (r) => r._measurement == "pm")
|
||||
|> filter(fn: (r) => r.sid == "140")
|
||||
`
|
||||
let erg = await influxRead(query)
|
||||
console.log(erg)
|
||||
}
|
||||
|
||||
|
||||
main().catch(console.error)
|
||||
*/
|
||||
12
logit.js
Normal file
12
logit.js
Normal file
@@ -0,0 +1,12 @@
|
||||
import { DateTime} from 'luxon'
|
||||
|
||||
export function logit(str) {
|
||||
let s = `${DateTime.now().toISO()} => ${str}`;
|
||||
console.log(s);
|
||||
}
|
||||
|
||||
export function logerror(str) {
|
||||
let s = `${DateTime.utc().toISO()} => *** ERROR *** ${str}`;
|
||||
console.log(s);
|
||||
}
|
||||
|
||||
194
mongo.js
Normal file
194
mongo.js
Normal file
@@ -0,0 +1,194 @@
|
||||
/* Interface for MongoDB
|
||||
*/
|
||||
import { MongoClient } from 'mongodb'
|
||||
import { logit, logerror } from './logit.js'
|
||||
import { statistics } from './readdata.js'
|
||||
import { DateTime } from 'luxon'
|
||||
|
||||
// const nodemailer = require('nodemailer');
|
||||
|
||||
let MONGOHOST = process.env.MONGOHOST || 'localhost'
|
||||
let MONGOPORT = process.env.MONGOPORT || 27017
|
||||
let MONGOAUTH = process.env.MONGOAUTH || 'false'
|
||||
let MONGOUSRP = process.env.MONGOUSRP || ''
|
||||
let MONGOBASE = process.env.MONGOBASE || 'allsensors'
|
||||
|
||||
|
||||
let MONGO_URL = 'mongodb://' + MONGOHOST + ':' + MONGOPORT; // URL to mongo database
|
||||
if (MONGOAUTH == 'true') {
|
||||
MONGO_URL = 'mongodb://' + MONGOUSRP + '@' + MONGOHOST + ':' + MONGOPORT + '/?authSource=' + MONGOBASE; // URL to mongo database
|
||||
}
|
||||
|
||||
export const property_coll = 'pptest'
|
||||
|
||||
const addandshowstatistics = (client, text, field, start) => {
|
||||
statistics[field] = DateTime.now().diff(start, ['seconds']).toObject().seconds
|
||||
logit(`Write ${text} to mongoDB: Time: ${statistics[field]} sec.`)
|
||||
}
|
||||
|
||||
export const connectMongo = async () => {
|
||||
try {
|
||||
logit(`Try to connect to ${MONGO_URL}`)
|
||||
let client = await MongoClient.connect(MONGO_URL, { useNewUrlParser: true, useUnifiedTopology: true })
|
||||
logit(`Mongodbase connected to ${MONGO_URL}`)
|
||||
return client
|
||||
}
|
||||
catch (error) {
|
||||
throw (error)
|
||||
}
|
||||
}
|
||||
|
||||
export const closeMongo = async (client) => {
|
||||
client.close()
|
||||
}
|
||||
|
||||
|
||||
export const getallProperties = async (client) => {
|
||||
return await client.db(MONGOBASE).collection(property_coll)
|
||||
.find().sort({ _id: 1 }).toArray()
|
||||
|
||||
}
|
||||
|
||||
export const checkOneproperty = async (client, sid) => {
|
||||
return await client.db(MONGOBASE).collection("properties")
|
||||
.findOne({ _id: sid })
|
||||
}
|
||||
|
||||
export const writeOneproperty = async (client, prop) => {
|
||||
try {
|
||||
let result = await client.db(MONGOBASE).collection("properties")
|
||||
.insertOne(prop)
|
||||
} catch (e) {
|
||||
if (e.code == 11000) {
|
||||
return false
|
||||
} else {
|
||||
throw (e)
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
export const writeProperties = async (client, props) => {
|
||||
let result
|
||||
let startAll = DateTime.now();
|
||||
let start
|
||||
let coll = client.db(MONGOBASE).collection("properties")
|
||||
if (props.new.length !== 0) {
|
||||
start = DateTime.now();
|
||||
try {
|
||||
result = await coll.insertMany(props.new)
|
||||
} catch (e) {
|
||||
logerror(`Write properties new ${e}`)
|
||||
}
|
||||
logit(`Write ${props.new.length} properties NEW to mongoDB: Result: ${result.acknowledged}, Time: ${start.diffNow('seconds').toObject().seconds * -1} sec.`)
|
||||
}
|
||||
if (props.loc.length !== 0) {
|
||||
start = DateTime.now()
|
||||
try {
|
||||
for (let item of props.loc) {
|
||||
result = await coll.updateOne({ _id: item._id }, {
|
||||
$set: { location_id: item.location_id },
|
||||
$push: { location: { $each: [item.location[0]], $position: 0 } }
|
||||
})
|
||||
}
|
||||
} catch (e) {
|
||||
logerror(`Write properties location ${e}`)
|
||||
}
|
||||
logit(`Write ${props.loc.length} properties LOC to mongoDB: Result: ${result.acknowledged}, Time: ${start.diffNow('seconds').toObject().seconds * -1} sec.`)
|
||||
}
|
||||
if (props.sname.length !== 0) {
|
||||
start = DateTime.now()
|
||||
try {
|
||||
for (let item of props.sname) {
|
||||
result = await coll.updateOne({ _id: item._id }, {
|
||||
$push: { name: { $each: [item.name[0]], $position: 0 } }
|
||||
})
|
||||
}
|
||||
} catch (e) {
|
||||
logerror(`Write properties samename ${e}`)
|
||||
}
|
||||
logit(`Write ${props.sname.length} properties NAME to mongoDB: Result: ${result.acknowledged}, Time: ${start.diffNow('seconds').toObject().seconds * -1} sec.`)
|
||||
}
|
||||
addandshowstatistics(client, 'properties', 'writePropsTime', startAll)
|
||||
}
|
||||
|
||||
export const writeDataArray = async (client, coll, data) => {
|
||||
let result
|
||||
let start = DateTime.now();
|
||||
try {
|
||||
result = await client.db(MONGOBASE).collection(coll)
|
||||
.insertMany(data, { ordered: false })
|
||||
} catch (e) {
|
||||
if (e.code !== 11000) {
|
||||
console.error(e)
|
||||
}
|
||||
}
|
||||
let statname = `writeMongoData[${coll}]Time`
|
||||
addandshowstatistics(client, `${data.length} entries for ${coll}`, `writeMongoData[${coll}]Time`, start)
|
||||
|
||||
// statistics[statname] = DateTime.now().diff(start, ['seconds']).toObject().seconds
|
||||
// logit(`Write Data for ${coll} to mongoDB: Time: ${statistics[statname]} sec.`)
|
||||
}
|
||||
|
||||
export const writeStatistic = async (client, stat) => {
|
||||
let result
|
||||
let start = DateTime.now();
|
||||
let entry = { timestamp: new Date(), ...stat }
|
||||
try {
|
||||
result = await client.db(MONGOBASE).collection("statistics")
|
||||
.insertOne(entry)
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
}
|
||||
addandshowstatistics(client, `statistics`, `writeStatisticTime`, start)
|
||||
}
|
||||
|
||||
export const dropColl = async (client, coll) => {
|
||||
let start = DateTime.now()
|
||||
let result
|
||||
try {
|
||||
result = await client.db(MONGOBASE).collection(coll).drop()
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
}
|
||||
logit(`Drop collection ${coll}: Result: ${result}, Time: ${start.diffNow('second').toObject().seconds * -1} sec.`)
|
||||
}
|
||||
|
||||
|
||||
export const createIndex = async (client, coll) => {
|
||||
let result
|
||||
let start = DateTime.now()
|
||||
try {
|
||||
result = await client.db(MONGOBASE).collection(coll).createIndex({ "location.loc": "2dsphere" })
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
}
|
||||
logit(`Create-Index: Result: ${result}, Time: ${start.diffNow('second').toObject().seconds * -1} sec.`)
|
||||
}
|
||||
|
||||
export const bulkUpdateMapdata = async (client, data) => {
|
||||
let start = DateTime.now()
|
||||
let result
|
||||
try {
|
||||
result = await client.db(MONGOBASE).collection("mapdata")
|
||||
.bulkWrite(data, { ordered: false })
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
}
|
||||
logit(`Write MapData: Result: ${result}, Time: ${start.diffNow('second').toObject().seconds * -1} sec.`)
|
||||
return result
|
||||
}
|
||||
|
||||
|
||||
export const bulkWrite = async (client, coll, data) => {
|
||||
let start = DateTime.now()
|
||||
let result
|
||||
try {
|
||||
result = await client.db(MONGOBASE).collection(coll)
|
||||
.bulkWrite(data, { ordered: false })
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
}
|
||||
addandshowstatistics(client, `Data for ${coll}`, `writeMongoProperties[${coll}]Time`, start)
|
||||
return result
|
||||
}
|
||||
3645
package-lock.json
generated
Normal file
3645
package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
21
package.json
Normal file
21
package.json
Normal file
@@ -0,0 +1,21 @@
|
||||
{
|
||||
"name": "timeseries_mongo",
|
||||
"version": "2.0.1",
|
||||
"date": "2023-10-15",
|
||||
"description": "",
|
||||
"main": "main.js",
|
||||
"scripts": {
|
||||
"test": "echo \"Error: no test specified\" && exit 1",
|
||||
"start": "node fetchnewdata.js >>/var/log/timeseries.log 2>&1"
|
||||
},
|
||||
"type": "module",
|
||||
"keywords": [],
|
||||
"author": "",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"axios": "^0.26.1",
|
||||
"luxon": "^2.3.1",
|
||||
"mongodb": "^4.4.1",
|
||||
"posix-getopt": "^1.2.1"
|
||||
}
|
||||
}
|
||||
240
parse.js
Normal file
240
parse.js
Normal file
@@ -0,0 +1,240 @@
|
||||
// import logit from './logit.js'
|
||||
import { DateTime } from 'luxon'
|
||||
import * as mongo from './mongo.js'
|
||||
import { statistics } from'./readdata.js'
|
||||
import { logit, logerror } from './logit.js'
|
||||
|
||||
let actualProps = []
|
||||
let newProps = []
|
||||
|
||||
// Check lat/lon and convert to float
|
||||
function checkLatLon(w) {
|
||||
let loc = 0.0;
|
||||
if (!((w === undefined) || (w == null) || (w == ''))) {
|
||||
try {
|
||||
loc = parseFloat(w)
|
||||
} catch (e) {
|
||||
logerror(`Math error with lat/lon, ${e}`)
|
||||
}
|
||||
}
|
||||
return loc
|
||||
}
|
||||
|
||||
// Check, if altitude is there. If so, use it, else use 0
|
||||
function checkAltitude(alt) {
|
||||
let altitude = 0;
|
||||
if(!((alt === undefined) || (alt === ''))) {
|
||||
try {
|
||||
altitude = Math.floor(parseFloat(alt))
|
||||
} catch (e) {
|
||||
logerror(`Math error with altitude, ${e}`)
|
||||
}
|
||||
}
|
||||
return altitude
|
||||
}
|
||||
|
||||
// binary search fir sensor id in property array
|
||||
const binarySearch = (arr, element, x) => {
|
||||
let start = 0, end = arr.length - 1
|
||||
// Iterate while start not meets end
|
||||
while (start <= end) {
|
||||
// Find the mid index
|
||||
let mid = Math.floor((start + end) / 2)
|
||||
// If element is present at mid, return True
|
||||
if (arr[mid][element] === x) return mid
|
||||
// Else look in left or right half accordingly
|
||||
else if (arr[mid][element] < x)
|
||||
start = mid + 1
|
||||
else
|
||||
end = mid - 1
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
|
||||
const checkProperties = (item, mapvalues, typ, dt) => {
|
||||
let now = DateTime.utc().toJSDate()
|
||||
let entry
|
||||
// read entry from actualprops
|
||||
let idx = binarySearch(actualProps, '_id', item.sensor.id)
|
||||
if (idx === -1) { // not in properties => new sensor
|
||||
entry = buildNewEntry(item, typ, dt, now) // so build a new entry
|
||||
} else {
|
||||
entry = actualProps[idx] // get actual properties
|
||||
// check for change of location
|
||||
if (entry.location[0].id !== item.location.id) { // have got a new location
|
||||
if (newProps.findIndex((obj) => {
|
||||
return (obj.replaceOne.replacement.location[0].id === item.location.id)
|
||||
}) === -1) {
|
||||
const newloc = {
|
||||
loc: {
|
||||
type: "Point",
|
||||
coordinates: [
|
||||
checkLatLon(item.location.longitude),
|
||||
checkLatLon(item.location.latitude)
|
||||
]
|
||||
},
|
||||
id: item.location.id,
|
||||
altitude: checkAltitude(item.location.altitude),
|
||||
since: now,
|
||||
exact_loc: item.location.exact_location,
|
||||
indoor: item.location.indoor,
|
||||
country: item.location.country
|
||||
}
|
||||
entry.location.splice(0, 0, newloc) // insert new location at pos 0 in location array
|
||||
}
|
||||
} else { // same location check for change in indoor or exact_location
|
||||
if (entry.location[0].indoor !== item.location.indoor) {
|
||||
entry.location[0].indoor = item.location.indoor // update indoor
|
||||
}
|
||||
if (entry.location[0].exact_loc !== item.location.exact_location) {
|
||||
entry.location[0].exact_loc = item.location.exact_location // update exact_location
|
||||
}
|
||||
if (entry.location[0].country === '') {
|
||||
entry.location[0].country = item.location.country // update country
|
||||
}
|
||||
}
|
||||
// Check für new name
|
||||
if (entry.name[0].name !== item.sensor.sensor_type.name) { // have got a new name
|
||||
if (newProps.findIndex((obj) => {
|
||||
return (obj.replaceOne.replacement.name[0].name === item.sensor.sensor_type.name)
|
||||
}) === -1) {
|
||||
let newname = {
|
||||
name: item.sensor.sensor_type.name,
|
||||
since: now
|
||||
}
|
||||
entry.name.splice(0, 0, newname)
|
||||
}
|
||||
}
|
||||
}
|
||||
// set new mapvalues
|
||||
entry.values = mapvalues
|
||||
delete entry.location_id
|
||||
delete entry.last_seen
|
||||
delete entry.since
|
||||
// push this entry to the new proerties array
|
||||
newProps.push({replaceOne: { filter: {_id: entry._id}, replacement: entry, upsert: true}})
|
||||
}
|
||||
|
||||
|
||||
const buildNewEntry = (item, typ, dt, now) => {
|
||||
return {
|
||||
_id: item.sensor.id,
|
||||
type: typ,
|
||||
name: [{
|
||||
name: item.sensor.sensor_type.name,
|
||||
since: now,
|
||||
}],
|
||||
location: [{
|
||||
loc: {
|
||||
type: "Point",
|
||||
coordinates: [
|
||||
checkLatLon(item.location.longitude),
|
||||
checkLatLon(item.location.latitude)
|
||||
]
|
||||
},
|
||||
id: item.location.id,
|
||||
altitude: checkAltitude(item.location.altitude),
|
||||
since: now,
|
||||
exact_loc: item.location.exact_location,
|
||||
indoor: item.location.indoor,
|
||||
country: item.location.country
|
||||
}]
|
||||
}
|
||||
}
|
||||
|
||||
const types = {
|
||||
P1: 'pm', P2: 'pm', P0: 'pm',
|
||||
temperature: 'thp', humidity: 'thp', pressure: 'thp',
|
||||
noise_LAeq: 'noise', noise_LA_max: 'noise',
|
||||
counts_per_minute: 'radioactivity',
|
||||
lat: 'gps'
|
||||
};
|
||||
|
||||
|
||||
function getType(typ) {
|
||||
if(typ in types) {
|
||||
return types[typ];
|
||||
} else {
|
||||
return 'unknown'
|
||||
}
|
||||
}
|
||||
|
||||
export const constructDBaseEntries = async (client, body, args) => {
|
||||
logit(`Number of entries: ${body.length}`)
|
||||
logit('Parsing ...')
|
||||
let start = DateTime.now()
|
||||
let allLines = {
|
||||
pm: [], thp: [], noise: [], radioactivity: [], gps: [], unknown: []
|
||||
}
|
||||
let datalines = ''
|
||||
actualProps = await mongo.getallProperties(client)
|
||||
try {
|
||||
for (let item of body) { // check all entries
|
||||
const dt = item.timestamp.split(' ')
|
||||
const datetime = new Date(dt[0] + 'T' + dt[1] + 'Z') // extract date of entry as utc
|
||||
let values = {}
|
||||
let ival = '' // fetch values
|
||||
let typ = 'unknown'
|
||||
let mapvalue = {}
|
||||
for (let v of item.sensordatavalues) { // for all values
|
||||
let vtyp = v.value_type; // extract value type
|
||||
if (typ === 'unknown') { // extract measurement type
|
||||
typ = getType(vtyp)
|
||||
}
|
||||
let val = v.value; // and value
|
||||
let x
|
||||
try {
|
||||
x = parseFloat(val); // convert value to float
|
||||
if (Number.isNaN(x)) {
|
||||
x = -9999.9 // default if value is invalid or unknown
|
||||
}
|
||||
} catch (err) {
|
||||
console.log('Math parse float error on value');
|
||||
x = -9999.9 // default if value is invalid or unknown
|
||||
}
|
||||
values[vtyp] = x
|
||||
ival += `${vtyp}=${x},`
|
||||
// if noise sensor precalculate pow10
|
||||
if (vtyp == 'noise_LAeq') {
|
||||
let e10 = Math.pow(10, x / 10)
|
||||
values.E10tel_eq = e10
|
||||
ival += `E10tel_eq=${e10},`
|
||||
mapvalue.E10tel_eq = e10
|
||||
}
|
||||
mapvalue[vtyp] = x
|
||||
}
|
||||
let store = true
|
||||
if(args.typ) {
|
||||
if(!args.typ.includes(typ)) {
|
||||
store = false
|
||||
}
|
||||
}
|
||||
if (store) {
|
||||
allLines[typ].push({sensorid: item.sensor.id, datetime: datetime, values: values})
|
||||
datalines += `${typ},sid=${item.sensor.id} ${ival.slice(0,-1)} ${datetime.getTime()}\n`
|
||||
mapvalue.timestamp = datetime
|
||||
checkProperties(item, mapvalue, typ, datetime); // check if new or new location or new sensor
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
logerror(`constructDBaseEntries ${e}`);
|
||||
}
|
||||
// sort allLines on sensorID
|
||||
|
||||
for (const [k,v] of Object.entries(allLines)) {
|
||||
allLines[k].sort((a, b) => {
|
||||
if (a.sensorid < b.sensorid) {
|
||||
return -1
|
||||
} else if (a.sensorid > b.sensorid) {
|
||||
return 1
|
||||
} else {
|
||||
return 0
|
||||
}
|
||||
})
|
||||
}
|
||||
statistics.parseTime = DateTime.now().diff(start, ['seconds']).toObject().seconds
|
||||
logit(`Parse time: ${statistics.parseTime} sec`)
|
||||
return [newProps, allLines, datalines]
|
||||
}
|
||||
|
||||
52
readdata.js
Normal file
52
readdata.js
Normal file
@@ -0,0 +1,52 @@
|
||||
import axios from 'axios'
|
||||
import * as fs from 'fs'
|
||||
import { logit, logerror} from'./logit.js'
|
||||
import { DateTime } from 'luxon'
|
||||
const API_URL = 'https://api.sensor.community/static/v1/data.json'; // URL to API on 'luftdaten.info'
|
||||
const SAVE_NAME = 'data/aktdata.json'; // filename for actual data
|
||||
|
||||
let LIVE = (process.env.LIVE == "true") || true
|
||||
export let statistics = {};
|
||||
|
||||
export const doReadfromAPI = async () => {
|
||||
logit(`LIVE = ${LIVE}`)
|
||||
let start = DateTime.now()
|
||||
let data = []
|
||||
if (LIVE) {
|
||||
logit(`Start Reading from API`)
|
||||
let body
|
||||
for(let count = 1; count <= 3; count++) {
|
||||
try {
|
||||
logit(`Try - ${count}`)
|
||||
let ret = await axios(API_URL, {timeout: 10000})
|
||||
if (ret.status != 200) {
|
||||
continue
|
||||
}
|
||||
data = ret.data
|
||||
saveDatatoFile(SAVE_NAME, JSON.stringify(data))
|
||||
break
|
||||
} catch (e) {
|
||||
logerror(`doReadfromAPI ${e}`)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logit('Start using data on disk')
|
||||
data = readDatafromFile(SAVE_NAME)
|
||||
}
|
||||
statistics.readInTime = start.diffNow('seconds').toObject().seconds * -1
|
||||
statistics.entries = data.length
|
||||
logit(`ReadIn-Time: ${statistics.readInTime} sec`)
|
||||
return data
|
||||
}
|
||||
|
||||
|
||||
// die Daten in eimnr Datei zwischenspeichern
|
||||
function saveDatatoFile(fn, data) {
|
||||
fs.writeFileSync(fn, data)
|
||||
}
|
||||
|
||||
// Daten wieder vom File lesen
|
||||
function readDatafromFile(fn) {
|
||||
return JSON.parse(fs.readFileSync(fn))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user