p1-logger/main.go
Pieter Hollander 666bf5700e
All checks were successful
Build Docker image / build (push) Successful in 1m8s
Improved documentation.
2024-02-18 15:39:55 +01:00

260 lines
8.3 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// TODO: Optimisation: only log dt1, dt2, rt1, rt2, g on change.
package main
import (
"database/sql"
"encoding/json"
"log"
"os"
"time"
_ "time/tzdata"
mqtt "github.com/eclipse/paho.mqtt.golang"
_ "github.com/lib/pq"
)
// Payload struct
//
// All metrics are stored in PostgreSQL as INT and not NUMERIC or FLOAT to optimise their storage size.
//
// Dt1, Dt2, Rt1, Rt2 and G are cumulative meter readings.
// By making them pointers to int, they can be set to 'nil' when the reading has not changed.
// This way, they are logged as NULL in the database when no change has happened, saving storage capacity.
// As an added benefit, this also make data retrieval, visualisation and analysis more light-weight.
//
// Gas consumption is updated once per minute.
// Not saving intermittent values theoretically saves 4 bytes per value.
// Storage for non-NULL values every second:
// 86,400entries/day×4bytes/entry=345,600bytes/day86,400entries/day×4bytes/entry=345,600bytes/day
// Storage for non-NULL values every minute:
// 1,440entries/day×4bytes/entry=5,760bytes/day1,440entries/day×4bytes/entry=5,760bytes/day
// Storage savings:
// 345,600bytes/day5,760bytes/day=339,840bytes/day345,600bytes/day5,760bytes/day=339,840bytes/day
// (about 331.5 KB / day or 9.72 MB / month)
// In practice, savings will be even higher when gas is not being consumed continuously.
//
// When Tariff 1 is active, Tariff 2 isn't.
// Therefore, only updating their values on-change should at least half their storage capacity requirements
// An average month has 2 629 743.83 seconds.
// This saves at least:
// 2,629,743.83 seconds * 4 bytes = 10,518,975.32 bytes = 10,518.98 kB = 10.52 MB
// This applies both to Dt1 and Dt2 as well as Rt1 and Rt2, so should be doubled.
// 10.52 * 2 = 21.04 MB.
// In practice, savings will be even higher when electricity is not being consumed continuously.
type Payload struct {
T string `json:"t"` // Timestamp
Dt1 *int `json:"dt1"` // Delivered / imported meter reading tariff 1 (kWh)
Dt2 *int `json:"dt2"` // Delivered / imported tariff 2 (kWh)
Rt1 *int `json:"rt1"` // Returned / exported tariff 1 (kWh)
Rt2 *int `json:"rt2"` // Returned / exported tariff 2 (kWh)
D int `json:"d"` // Delivering / importing (W)
R int `json:"r"` // Returning / exporting (W)
F int `json:"f"` // Failure (counter)
Fl int `json:"fl"` // Failure long duration (counter)
G *int `json:"g"` // Gas meter reading (l)
V1 int `json:"v1"` // Voltage L1 (V)
V2 int `json:"v2"` // Voltage L2 (V)
V3 int `json:"v3"` // Voltage L3 (V)
C1 int `json:"c1"` // Current L1 (A)
C2 int `json:"c2"` // Current L2 (A)
C3 int `json:"c3"` // Current L3 (A)
D1 int `json:"d1"` // Delivering / importing L1 (W)
D2 int `json:"d2"` // Delivering / importing L2 (W)
D3 int `json:"d3"` // Delivering / importing L3 (W)
R1 int `json:"r1"` // Returning / exporting L1 (W)
R2 int `json:"r2"` // Returning / exporting L2 (W)
R3 int `json:"r3"` // Returning / exporting L3 (W)
}
var db *sql.DB
func main() {
// // Load environment variables from .env file if it exists
// err := godotenv.Load()
// if err != nil {
// log.Println("Error loading .env file:", err)
// }
// Connect to PostgreSQL
pgConnStr := os.Getenv("PG_DB")
if err := connectToPostgreSQL(pgConnStr); err != nil {
log.Fatal("Error connecting to PostgreSQL:", err)
}
// pgConnStr := os.Getenv("PG_DB")
// var err error
// db, err = sql.Open("postgres", pgConnStr)
// if err != nil {
// log.Fatal(err)
// }
// defer db.Close()
// Initialize MQTT options
opts := mqtt.NewClientOptions()
opts.AddBroker(os.Getenv("MQTT_BROKER"))
opts.SetUsername(os.Getenv("MQTT_USERNAME"))
opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
opts.SetAutoReconnect(true)
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
log.Printf("Connection lost: %v", err)
})
opts.SetOnConnectHandler(func(client mqtt.Client) {
topic := os.Getenv("MQTT_TOPIC")
log.Println("Connected to MQTT broker, subscribing to topic...")
if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil {
log.Printf("Error subscribing to MQTT topic %s: %v", topic, token.Error())
}
})
// Connect to MQTT broker
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
// Keep the program running
select {}
}
var prevDt1, prevDt2, prevRt1, prevRt2, prevG int
func updateFieldIfChanged(currentValue *int, previousValue *int) *int {
if currentValue != nil && *currentValue == *previousValue {
return nil // No change, so we prepare to insert a NULL
} else if currentValue != nil {
*previousValue = *currentValue // Update the previous value to the current one
return currentValue
}
return currentValue // Return the original value if it's nil
}
func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
// Parse JSON payload
var payload Payload
err := json.Unmarshal(msg.Payload(), &payload)
if err != nil {
log.Println("Error parsing MQTT payload:", err)
return
}
// Parse timestamp to time.Time
timestamp, err := parseTimestamp(payload.T)
if err != nil {
log.Println("Error parsing timestamp:", err)
return
}
payload.Dt1 = updateFieldIfChanged(payload.Dt1, &prevDt1)
payload.Dt2 = updateFieldIfChanged(payload.Dt2, &prevDt2)
payload.Rt1 = updateFieldIfChanged(payload.Rt1, &prevRt1)
payload.Rt2 = updateFieldIfChanged(payload.Rt2, &prevRt2)
payload.G = updateFieldIfChanged(payload.G, &prevG)
// Insert data into PostgreSQL
err = insertData(timestamp, payload)
if err != nil {
log.Println("Error inserting data into PostgreSQL:", err)
}
}
func parseTimestamp(t string) (time.Time, error) {
// Extract values from timestamp string
year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0')
hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0')
// Load location for "Europe/Amsterdam" time zone
loc, err := time.LoadLocation("Europe/Amsterdam")
if err != nil {
return time.Time{}, err
}
// Create and return the timestamp
return time.Date(year, month, day, hour, min, sec, 0, loc), nil
}
func insertData(timestamp time.Time, payload Payload) error {
// Prepare SQL statement
stmt := `
INSERT INTO p1 (
timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2,
delivery_all, returning_all, failures, long_failures, gas,
voltage_l1, voltage_l2, voltage_l3,
current_l1, current_l2, current_l3,
delivery_l1, delivery_l2, delivery_l3,
returning_l1, returning_l2, returning_l3
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
`
_, err := db.Exec(
stmt,
timestamp,
payload.Dt1, payload.Dt2,
payload.Rt1, payload.Rt2,
payload.D, payload.R,
payload.F, payload.Fl,
payload.G,
payload.V1, payload.V2, payload.V3,
payload.C1, payload.C2, payload.C3,
payload.D1, payload.D2, payload.D3,
payload.R1, payload.R2, payload.R3,
)
return err
}
func connectToPostgreSQL(pgConnStr string) error {
// Connect to PostgreSQL
var err error
for {
db, err = sql.Open("postgres", pgConnStr)
if err == nil {
break // Successfully connected
}
log.Println("Error connecting to PostgreSQL:", err)
time.Sleep(5 * time.Second) // Retry after 5 seconds
}
// Enable TimescaleDB
_, err = db.Exec(`
CREATE EXTENSION IF NOT EXISTS timescaledb;
`)
if err != nil {
log.Fatal("Error creating TimescaleDB extension:", err)
}
// Create table if not exists
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS p1 (
timestamp TIMESTAMPTZ,
delivered_tariff1 INT,
delivered_tariff2 INT,
returned_tariff1 INT,
returned_tariff2 INT,
delivery_all INT,
returning_all INT,
failures INT,
long_failures INT,
gas INT,
voltage_l1 INT,
voltage_l2 INT,
voltage_l3 INT,
current_l1 INT,
current_l2 INT,
current_l3 INT,
delivery_l1 INT,
delivery_l2 INT,
delivery_l3 INT,
returning_l1 INT,
returning_l2 INT,
returning_l3 INT
);
SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE);
`)
if err != nil {
log.Fatal("Error creating table:", err)
}
return nil
}