Pieter Hollander
7d04a97109
All checks were successful
Build Docker image / build (push) Successful in 47s
186 lines
4.5 KiB
Go
186 lines
4.5 KiB
Go
package main
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"log"
|
|
"os"
|
|
"time"
|
|
_ "time/tzdata"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
type Payload struct {
|
|
T string `json:"t"`
|
|
Dt1 int `json:"dt1"`
|
|
Dt2 int `json:"dt2"`
|
|
Rt1 int `json:"rt1"`
|
|
Rt2 int `json:"rt2"`
|
|
D int `json:"d"`
|
|
R int `json:"r"`
|
|
F int `json:"f"`
|
|
Fl int `json:"fl"`
|
|
G int `json:"g"`
|
|
V1 int `json:"v1"`
|
|
V2 int `json:"v2"`
|
|
V3 int `json:"v3"`
|
|
C1 int `json:"c1"`
|
|
C2 int `json:"c2"`
|
|
C3 int `json:"c3"`
|
|
D1 int `json:"d1"`
|
|
D2 int `json:"d2"`
|
|
D3 int `json:"d3"`
|
|
R1 int `json:"r1"`
|
|
R2 int `json:"r2"`
|
|
R3 int `json:"r3"`
|
|
}
|
|
|
|
var db *sql.DB
|
|
|
|
func main() {
|
|
// Load environment variables from .env file if it exists
|
|
// err := godotenv.Load()
|
|
// if err != nil {
|
|
// log.Println("Error loading .env file:", err)
|
|
// }
|
|
|
|
// Connect to PostgreSQL
|
|
pgConnStr := os.Getenv("PG_DB")
|
|
var err error
|
|
db, err = sql.Open("postgres", pgConnStr)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer db.Close()
|
|
|
|
// Enable TimescaleDB
|
|
// _, err = db.Exec(`
|
|
// CREATE EXTENSION IF NOT EXISTS timescaledb;
|
|
// `)
|
|
if err != nil {
|
|
log.Fatal("Error creating TimescaleDB extension:", err)
|
|
}
|
|
|
|
// Create table if not exists
|
|
_, err = db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS p1 (
|
|
timestamp TIMESTAMPTZ,
|
|
delivered_tariff1 INT,
|
|
delivered_tariff2 INT,
|
|
returned_tariff1 INT,
|
|
returned_tariff2 INT,
|
|
delivery_all INT,
|
|
returning_all INT,
|
|
failures INT,
|
|
long_failures INT,
|
|
gas INT,
|
|
voltage_l1 INT,
|
|
voltage_l2 INT,
|
|
voltage_l3 INT,
|
|
current_l1 INT,
|
|
current_l2 INT,
|
|
current_l3 INT,
|
|
delivery_l1 INT,
|
|
delivery_l2 INT,
|
|
delivery_l3 INT,
|
|
returning_l1 INT,
|
|
returning_l2 INT,
|
|
returning_l3 INT
|
|
);
|
|
-- SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE);
|
|
`)
|
|
if err != nil {
|
|
log.Fatal("Error creating table:", err)
|
|
}
|
|
|
|
// Initialize MQTT options
|
|
opts := mqtt.NewClientOptions()
|
|
opts.AddBroker(os.Getenv("MQTT_BROKER"))
|
|
opts.SetUsername(os.Getenv("MQTT_USERNAME"))
|
|
opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
|
|
|
|
// Connect to MQTT broker
|
|
client := mqtt.NewClient(opts)
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
log.Fatal(token.Error())
|
|
}
|
|
|
|
// Subscribe to MQTT topic
|
|
topic := os.Getenv("MQTT_TOPIC")
|
|
if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil {
|
|
log.Fatal(token.Error())
|
|
}
|
|
|
|
// Keep the program running
|
|
select {}
|
|
}
|
|
|
|
func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
|
|
// Parse JSON payload
|
|
var payload Payload
|
|
err := json.Unmarshal(msg.Payload(), &payload)
|
|
if err != nil {
|
|
log.Println("Error parsing MQTT payload:", err)
|
|
return
|
|
}
|
|
|
|
// Parse timestamp to time.Time
|
|
timestamp, err := parseTimestamp(payload.T)
|
|
if err != nil {
|
|
log.Println("Error parsing timestamp:", err)
|
|
return
|
|
}
|
|
|
|
// Insert data into PostgreSQL
|
|
err = insertData(timestamp, payload)
|
|
if err != nil {
|
|
log.Println("Error inserting data into PostgreSQL:", err)
|
|
}
|
|
}
|
|
|
|
func parseTimestamp(t string) (time.Time, error) {
|
|
|
|
// Extract values from timestamp string
|
|
year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0')
|
|
hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0')
|
|
|
|
// Load location for "Europe/Amsterdam" time zone
|
|
loc, err := time.LoadLocation("Europe/Amsterdam")
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
|
|
// Create and return the timestamp
|
|
return time.Date(year, month, day, hour, min, sec, 0, loc), nil
|
|
|
|
}
|
|
|
|
func insertData(timestamp time.Time, payload Payload) error {
|
|
// Prepare SQL statement
|
|
stmt := `
|
|
INSERT INTO p1 (
|
|
timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2,
|
|
delivery_all, returning_all, failures, long_failures, gas,
|
|
voltage_l1, voltage_l2, voltage_l3,
|
|
current_l1, current_l2, current_l3,
|
|
delivery_l1, delivery_l2, delivery_l3,
|
|
returning_l1, returning_l2, returning_l3
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
|
|
`
|
|
_, err := db.Exec(
|
|
stmt,
|
|
timestamp,
|
|
payload.Dt1, payload.Dt2,
|
|
payload.Rt1, payload.Rt2,
|
|
payload.D, payload.R,
|
|
payload.F, payload.Fl,
|
|
payload.G,
|
|
payload.V1, payload.V2, payload.V3,
|
|
payload.C1, payload.C2, payload.C3,
|
|
payload.D1, payload.D2, payload.D3,
|
|
payload.R1, payload.R2, payload.R3,
|
|
)
|
|
return err
|
|
}
|