p1-logger/main.go
Pieter Hollander 2844bd89d1
All checks were successful
Build Docker image / build (push) Successful in 1m15s
Added documentation to Payload struct.
2024-02-18 15:10:37 +01:00

235 lines
6.9 KiB
Go

// TODO: Optimisation: only log dt1, dt2, rt1, rt2, g on change.
package main
import (
"database/sql"
"encoding/json"
"log"
"os"
"time"
_ "time/tzdata"
mqtt "github.com/eclipse/paho.mqtt.golang"
_ "github.com/lib/pq"
)
// Payload struct
// Dt1, Dt2, Rt1, Rt2 and G are meter readings
// By making them pointers to int, they can be set to 'nil' when the reading has not changed.
// This way, they are logged as NULL in the database when no change has happened, saving storage capacity.
type Payload struct {
T string `json:"t"` // Timestamp
Dt1 *int `json:"dt1"` // Delivered / imported meter reading tariff 1 (kWh)
Dt2 *int `json:"dt2"` // Delivered / imported tariff 2 (kWh)
Rt1 *int `json:"rt1"` // Returned / exported tariff 1 (kWh)
Rt2 *int `json:"rt2"` // Returned / exported tariff 2 (kWh)
D int `json:"d"` // Delivering / importing (W)
R int `json:"r"` // Returning / exporting (W)
F int `json:"f"` // Failure (counter)
Fl int `json:"fl"` // Failure long duration (counter)
G *int `json:"g"` // Gas meter reading (l)
V1 int `json:"v1"` // Voltage L1 (V)
V2 int `json:"v2"` // Voltage L2 (V)
V3 int `json:"v3"` // Voltage L3 (V)
C1 int `json:"c1"` // Current L1 (A)
C2 int `json:"c2"` // Current L2 (A)
C3 int `json:"c3"` // Current L3 (A)
D1 int `json:"d1"` // Delivering / importing L1 (W)
D2 int `json:"d2"` // Delivering / importing L2 (W)
D3 int `json:"d3"` // Delivering / importing L3 (W)
R1 int `json:"r1"` // Returning / exporting L1 (W)
R2 int `json:"r2"` // Returning / exporting L2 (W)
R3 int `json:"r3"` // Returning / exporting L3 (W)
}
var db *sql.DB
func main() {
// // Load environment variables from .env file if it exists
// err := godotenv.Load()
// if err != nil {
// log.Println("Error loading .env file:", err)
// }
// Connect to PostgreSQL
pgConnStr := os.Getenv("PG_DB")
if err := connectToPostgreSQL(pgConnStr); err != nil {
log.Fatal("Error connecting to PostgreSQL:", err)
}
// pgConnStr := os.Getenv("PG_DB")
// var err error
// db, err = sql.Open("postgres", pgConnStr)
// if err != nil {
// log.Fatal(err)
// }
// defer db.Close()
// Initialize MQTT options
opts := mqtt.NewClientOptions()
opts.AddBroker(os.Getenv("MQTT_BROKER"))
opts.SetUsername(os.Getenv("MQTT_USERNAME"))
opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
opts.SetAutoReconnect(true)
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
log.Printf("Connection lost: %v", err)
})
opts.SetOnConnectHandler(func(client mqtt.Client) {
topic := os.Getenv("MQTT_TOPIC")
log.Println("Connected to MQTT broker, subscribing to topic...")
if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil {
log.Printf("Error subscribing to MQTT topic %s: %v", topic, token.Error())
}
})
// Connect to MQTT broker
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
// Keep the program running
select {}
}
var prevDt1, prevDt2, prevRt1, prevRt2, prevG int
func updateFieldIfChanged(currentValue *int, previousValue *int) *int {
if currentValue != nil && *currentValue == *previousValue {
return nil // No change, so we prepare to insert a NULL
} else if currentValue != nil {
*previousValue = *currentValue // Update the previous value to the current one
return currentValue
}
return currentValue // Return the original value if it's nil
}
func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
// Parse JSON payload
var payload Payload
err := json.Unmarshal(msg.Payload(), &payload)
if err != nil {
log.Println("Error parsing MQTT payload:", err)
return
}
// Parse timestamp to time.Time
timestamp, err := parseTimestamp(payload.T)
if err != nil {
log.Println("Error parsing timestamp:", err)
return
}
payload.Dt1 = updateFieldIfChanged(payload.Dt1, &prevDt1)
payload.Dt2 = updateFieldIfChanged(payload.Dt2, &prevDt2)
payload.Rt1 = updateFieldIfChanged(payload.Rt1, &prevRt1)
payload.Rt2 = updateFieldIfChanged(payload.Rt2, &prevRt2)
payload.G = updateFieldIfChanged(payload.G, &prevG)
// Insert data into PostgreSQL
err = insertData(timestamp, payload)
if err != nil {
log.Println("Error inserting data into PostgreSQL:", err)
}
}
func parseTimestamp(t string) (time.Time, error) {
// Extract values from timestamp string
year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0')
hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0')
// Load location for "Europe/Amsterdam" time zone
loc, err := time.LoadLocation("Europe/Amsterdam")
if err != nil {
return time.Time{}, err
}
// Create and return the timestamp
return time.Date(year, month, day, hour, min, sec, 0, loc), nil
}
func insertData(timestamp time.Time, payload Payload) error {
// Prepare SQL statement
stmt := `
INSERT INTO p1 (
timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2,
delivery_all, returning_all, failures, long_failures, gas,
voltage_l1, voltage_l2, voltage_l3,
current_l1, current_l2, current_l3,
delivery_l1, delivery_l2, delivery_l3,
returning_l1, returning_l2, returning_l3
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
`
_, err := db.Exec(
stmt,
timestamp,
payload.Dt1, payload.Dt2,
payload.Rt1, payload.Rt2,
payload.D, payload.R,
payload.F, payload.Fl,
payload.G,
payload.V1, payload.V2, payload.V3,
payload.C1, payload.C2, payload.C3,
payload.D1, payload.D2, payload.D3,
payload.R1, payload.R2, payload.R3,
)
return err
}
func connectToPostgreSQL(pgConnStr string) error {
// Connect to PostgreSQL
var err error
for {
db, err = sql.Open("postgres", pgConnStr)
if err == nil {
break // Successfully connected
}
log.Println("Error connecting to PostgreSQL:", err)
time.Sleep(5 * time.Second) // Retry after 5 seconds
}
// Enable TimescaleDB
_, err = db.Exec(`
CREATE EXTENSION IF NOT EXISTS timescaledb;
`)
if err != nil {
log.Fatal("Error creating TimescaleDB extension:", err)
}
// Create table if not exists
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS p1 (
timestamp TIMESTAMPTZ,
delivered_tariff1 INT,
delivered_tariff2 INT,
returned_tariff1 INT,
returned_tariff2 INT,
delivery_all INT,
returning_all INT,
failures INT,
long_failures INT,
gas INT,
voltage_l1 INT,
voltage_l2 INT,
voltage_l3 INT,
current_l1 INT,
current_l2 INT,
current_l3 INT,
delivery_l1 INT,
delivery_l2 INT,
delivery_l3 INT,
returning_l1 INT,
returning_l2 INT,
returning_l3 INT
);
SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE);
`)
if err != nil {
log.Fatal("Error creating table:", err)
}
return nil
}