p1-logger/main.go
Pieter Hollander 11792f8c53
All checks were successful
Build Docker image / build (push) Successful in 1m13s
Add timestamp index. Allow to add TimescaleDB after initial setup.
2024-02-21 11:17:09 +01:00

365 lines
12 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// TODO: Make TimescaleDB optional.
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"log/slog"
"os"
"time"
_ "time/tzdata"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/joho/godotenv"
_ "github.com/lib/pq"
)
// Payload struct
//
// All metrics are stored in PostgreSQL as INT and not NUMERIC or FLOAT to optimise their storage size.
//
// Dt1, Dt2, Rt1, Rt2 and G are cumulative meter readings.
// By making them pointers to int, they can be set to 'nil' when the reading has not changed.
// This way, they are logged as NULL in the database when no change has happened, saving storage capacity.
// As an added benefit, this also make data retrieval, visualisation and analysis more light-weight.
//
// Gas consumption is updated once per minute.
// Not saving intermittent values theoretically saves 4 bytes per value.
//
// Storage for non-NULL values every second:
// 86,400entries/day×4bytes/entry=345,600bytes/day86,400entries/day×4bytes/entry=345,600bytes/day
// Storage for non-NULL values every minute:
// 1,440entries/day×4bytes/entry=5,760bytes/day1,440entries/day×4bytes/entry=5,760bytes/day
// Storage savings:
// 345,600bytes/day5,760bytes/day=339,840bytes/day345,600bytes/day5,760bytes/day=339,840bytes/day
// (about 331.5 KB / day or 9.72 MB / month)
// In practice, savings will be even higher when gas is not being consumed continuously.
//
// When Tariff 1 is active, Tariff 2 isn't.
// When energy is being imported, it is not being returned.
//
// Therefore, only updating their values on-change should save at least 75% storage capacity requirements
// An average month has 2 629 743.83 seconds.
// This saves at least:
// 2,629,743.83 seconds * 4 bytes = 10,518,975.32 bytes = 10,518.98 kB = 10.52 MB
// This applies both to Dt1 and Dt2 as well as Rt1 and Rt2, only one is recorded at a time.
// 10.52 * 3 = 31.56 MB.
//
// In addition, many meters only update these metrics once every 10 seconds,
// meaning even more storage capacity is saved:
// 10.52 / 10 * 9 = 9,468 MB.
type Payload struct {
T string `json:"t"` // Timestamp
Dt1 *int `json:"dt1"` // Delivered / imported meter reading tariff 1 (kWh)
Dt2 *int `json:"dt2"` // Delivered / imported tariff 2 (kWh)
Rt1 *int `json:"rt1"` // Returned / exported tariff 1 (kWh)
Rt2 *int `json:"rt2"` // Returned / exported tariff 2 (kWh)
D int `json:"d"` // Delivering / importing (W)
R int `json:"r"` // Returning / exporting (W)
F int `json:"f"` // Failure (counter)
Fl int `json:"fl"` // Failure long duration (counter)
G *int `json:"g"` // Gas meter reading (l)
V1 int `json:"v1"` // Voltage L1 (V)
V2 int `json:"v2"` // Voltage L2 (V)
V3 int `json:"v3"` // Voltage L3 (V)
C1 int `json:"c1"` // Current L1 (A)
C2 int `json:"c2"` // Current L2 (A)
C3 int `json:"c3"` // Current L3 (A)
D1 int `json:"d1"` // Delivering / importing L1 (W)
D2 int `json:"d2"` // Delivering / importing L2 (W)
D3 int `json:"d3"` // Delivering / importing L3 (W)
R1 int `json:"r1"` // Returning / exporting L1 (W)
R2 int `json:"r2"` // Returning / exporting L2 (W)
R3 int `json:"r3"` // Returning / exporting L3 (W)
}
var db *sql.DB
var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
func main() {
// Initial logger setup
slog.SetDefault(logger)
// Load environment variables from .env file if it exists
if err := godotenv.Load(); err != nil {
logger.Info("No .env file found or error loading .env file (ignore this message when using container)", "error", err)
}
// Update the logger based on the LOG_LEVEL environment variable
logLevel := getLogLevelFromEnv(slog.LevelInfo) // Default to info level
logger = createLoggerWithLevel(logLevel)
// Example usage of the logger with the new log level
logger.Info("Logger initialized with dynamic log level")
// Connect to PostgreSQL
pgConnStr := os.Getenv("PG_DB")
if err := connectToPostgreSQL(pgConnStr); err != nil {
log.Fatal("Error connecting to PostgreSQL", "error", err)
}
// Initialize MQTT options
opts := mqtt.NewClientOptions()
opts.AddBroker(os.Getenv("MQTT_BROKER"))
opts.SetUsername(os.Getenv("MQTT_USERNAME"))
opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
opts.SetAutoReconnect(true)
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
logger.Error("Connection lost", "error", err)
})
opts.SetOnConnectHandler(func(client mqtt.Client) {
topic := os.Getenv("MQTT_TOPIC")
logger.Info("Connected to MQTT broker, subscribing to topic...", "topic", topic)
if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil {
logger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error())
}
})
// Connect to MQTT broker
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
// Keep the program running
select {}
}
// Helper function to map environment variable to slog.Level
func getLogLevelFromEnv(defaultLevel slog.Level) slog.Level {
logLevelStr := os.Getenv("LOG_LEVEL")
switch logLevelStr {
case "DEBUG":
return slog.LevelDebug
case "INFO":
return slog.LevelInfo
case "WARN":
return slog.LevelWarn
case "ERROR":
return slog.LevelError
default:
return defaultLevel
}
}
// Function to create a new logger with a specified log level
func createLoggerWithLevel(level slog.Level) *slog.Logger {
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
}))
}
// updateFieldIfChanged ensures that meter readings that haven't been updated aren't written to the database, in order to save storage space.
func updateFieldIfChanged(currentValue *int, previousValue *int) (*int, bool) {
if currentValue != nil && *currentValue == *previousValue {
return nil, false // No change
} else if currentValue != nil {
*previousValue = *currentValue // Update the previous value to the current one
return currentValue, true // Change occurred
}
return currentValue, false // Return the original value if it's nil, indicating no change
}
// safeDerefInt handles potential nil pointers to avoid a runtime panic
// log messages will display the actual integer values if the pointers are not nil,
// or "nil" if the pointers are nil.
func safeDerefInt(ptr *int) string {
if ptr != nil {
return fmt.Sprintf("%d", *ptr) // Dereference the pointer to get the value
}
return "nil" // Return a string indicating the value is nil
}
var prevDt1, prevDt2, prevRt1, prevRt2, prevG int
func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
// Parse JSON payload
var payload Payload
err := json.Unmarshal(msg.Payload(), &payload)
if err != nil {
logger.Error("Error parsing MQTT payload", "error", err)
return
}
// Parse timestamp to time.Time
timestamp, err := parseDSMRTimestamp(payload.T)
if err != nil {
logger.Error("Error parsing timestamp", "error", err)
return
}
var changed bool // Flag to track if any value changed
// Update each field, directly updating `changed` if any change is detected
var tempChanged bool // Used to capture the change status for each field
payload.Dt1, tempChanged = updateFieldIfChanged(payload.Dt1, &prevDt1)
changed = changed || tempChanged
payload.Dt2, tempChanged = updateFieldIfChanged(payload.Dt2, &prevDt2)
changed = changed || tempChanged
payload.Rt1, tempChanged = updateFieldIfChanged(payload.Rt1, &prevRt1)
changed = changed || tempChanged
payload.Rt2, tempChanged = updateFieldIfChanged(payload.Rt2, &prevRt2)
changed = changed || tempChanged
payload.G, tempChanged = updateFieldIfChanged(payload.G, &prevG)
changed = changed || tempChanged
// If any value has changed, log all the relevant values
if changed {
logger.Debug("Values changed",
"dt1", safeDerefInt(payload.Dt1),
"dt2", safeDerefInt(payload.Dt2),
"rt1", safeDerefInt(payload.Rt1),
"rt2", safeDerefInt(payload.Rt2),
"g", safeDerefInt(payload.G))
}
// Insert data into PostgreSQL
err = insertData(timestamp, payload)
logger.Debug("Inserting values",
"t", payload.T,
"dt1", payload.Dt1,
"dt2", payload.Dt2,
"rt1", payload.Rt1,
"rt2", payload.Rt2,
"d", payload.D,
"r", payload.R,
"f", payload.F,
"fl", payload.Fl,
"g", payload.G,
"v1", payload.V1,
"v2", payload.V2,
"v3", payload.V3,
"c1", payload.C1,
"c2", payload.C2,
"c3", payload.C3,
"d1", payload.D1,
"d2", payload.D2,
"d3", payload.D3,
"r1", payload.R1,
"r2", payload.R2,
"r3", payload.R3)
if err != nil {
logger.Error("Error inserting data into PostgreSQL", "error", err)
}
}
// parseDSMRTimestamp parses the timestamp as emitted by the P1 meter
// (YYMMDDhhmmssX, where X is S or W for summer- or wintertime).
//
// More information:
// https://github.com/matthijskooijman/arduino-dsmr/blob/master/README.md
// https://srolija.medium.com/gos-summer-time-localization-issues-4c8ab702806b
// https://github.com/thomasvnl/P1DSMRReader-ESPHome
// https://pkg.go.dev/github.com/mijnverbruik/dsmr#Timestamp.Position
// https://pkg.go.dev/github.com/mijnverbruik/dsmr#Timestamp
func parseDSMRTimestamp(t string) (time.Time, error) {
// Extract values from timestamp string
year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0')
hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0')
// Load location for "Europe/Amsterdam" time zone
loc, err := time.LoadLocation("Europe/Amsterdam")
if err != nil {
return time.Time{}, err
}
// Create and return the timestamp
return time.Date(year, month, day, hour, min, sec, 0, loc), nil
}
func insertData(timestamp time.Time, payload Payload) error {
// Prepare SQL statement
stmt := `
INSERT INTO p1 (
timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2,
delivery_all, returning_all, failures, long_failures, gas,
voltage_l1, voltage_l2, voltage_l3,
current_l1, current_l2, current_l3,
delivery_l1, delivery_l2, delivery_l3,
returning_l1, returning_l2, returning_l3
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
`
_, err := db.Exec(
stmt,
timestamp,
payload.Dt1, payload.Dt2,
payload.Rt1, payload.Rt2,
payload.D, payload.R,
payload.F, payload.Fl,
payload.G,
payload.V1, payload.V2, payload.V3,
payload.C1, payload.C2, payload.C3,
payload.D1, payload.D2, payload.D3,
payload.R1, payload.R2, payload.R3,
)
return err
}
func connectToPostgreSQL(pgConnStr string) error {
// Connect to PostgreSQL
var err error
for {
db, err = sql.Open("postgres", pgConnStr)
if err == nil {
break // Successfully connected
}
logger.Error("Error connecting to PostgreSQL", "error", err)
time.Sleep(5 * time.Second) // Retry after 5 seconds
}
// Create table if not exists
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS p1 (
timestamp TIMESTAMPTZ,
delivered_tariff1 INT,
delivered_tariff2 INT,
returned_tariff1 INT,
returned_tariff2 INT,
delivery_all INT,
returning_all INT,
failures INT,
long_failures INT,
gas INT,
voltage_l1 INT,
voltage_l2 INT,
voltage_l3 INT,
current_l1 INT,
current_l2 INT,
current_l3 INT,
delivery_l1 INT,
delivery_l2 INT,
delivery_l3 INT,
returning_l1 INT,
returning_l2 INT,
returning_l3 INT
);
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON log (timestamp);
`)
if err != nil {
log.Fatal("Error creating table:", err)
}
timescaleDBEnabled := os.Getenv("TIMESCALEDB_ENABLED")
if timescaleDBEnabled == "true" {
// Enable TimescaleDB
_, err = db.Exec(`
CREATE EXTENSION IF NOT EXISTS timescaledb;
SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE), migrate_data => TRUE;
`)
if err != nil {
log.Fatal("Error creating TimescaleDB extension:", err)
}
}
return nil
}