// TODO: Make TimescaleDB optional. package main import ( "database/sql" "encoding/json" "fmt" "log" "log/slog" "os" "time" _ "time/tzdata" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/joho/godotenv" _ "github.com/lib/pq" ) // Payload struct // // All metrics are stored in PostgreSQL as INT and not NUMERIC or FLOAT to optimise their storage size. // // Dt1, Dt2, Rt1, Rt2 and G are cumulative meter readings. // By making them pointers to int, they can be set to 'nil' when the reading has not changed. // This way, they are logged as NULL in the database when no change has happened, saving storage capacity. // As an added benefit, this also make data retrieval, visualisation and analysis more light-weight. // // Gas consumption is updated once per minute. // Not saving intermittent values theoretically saves 4 bytes per value. // // Storage for non-NULL values every second: // 86,400 entries/day×4 bytes/entry=345,600 bytes/day86,400entries/day×4bytes/entry=345,600bytes/day // Storage for non-NULL values every minute: // 1,440 entries/day×4 bytes/entry=5,760 bytes/day1,440entries/day×4bytes/entry=5,760bytes/day // Storage savings: // 345,600 bytes/day−5,760 bytes/day=339,840 bytes/day345,600bytes/day−5,760bytes/day=339,840bytes/day // (about 331.5 KB / day or 9.72 MB / month) // In practice, savings will be even higher when gas is not being consumed continuously. // // When Tariff 1 is active, Tariff 2 isn't. // When energy is being imported, it is not being returned. // // Therefore, only updating their values on-change should save at least 75% storage capacity requirements // An average month has 2 629 743.83 seconds. // This saves at least: // 2,629,743.83 seconds * 4 bytes = 10,518,975.32 bytes = 10,518.98 kB = 10.52 MB // This applies both to Dt1 and Dt2 as well as Rt1 and Rt2, only one is recorded at a time. // 10.52 * 3 = 31.56 MB. // // In addition, many meters only update these metrics once every 10 seconds, // meaning even more storage capacity is saved: // 10.52 / 10 * 9 = 9,468 MB. type Payload struct { T string `json:"t"` // Timestamp Dt1 *int `json:"dt1"` // Delivered / imported meter reading tariff 1 (kWh) Dt2 *int `json:"dt2"` // Delivered / imported tariff 2 (kWh) Rt1 *int `json:"rt1"` // Returned / exported tariff 1 (kWh) Rt2 *int `json:"rt2"` // Returned / exported tariff 2 (kWh) D int `json:"d"` // Delivering / importing (W) R int `json:"r"` // Returning / exporting (W) F int `json:"f"` // Failure (counter) Fl int `json:"fl"` // Failure long duration (counter) G *int `json:"g"` // Gas meter reading (l) V1 int `json:"v1"` // Voltage L1 (V) V2 int `json:"v2"` // Voltage L2 (V) V3 int `json:"v3"` // Voltage L3 (V) C1 int `json:"c1"` // Current L1 (A) C2 int `json:"c2"` // Current L2 (A) C3 int `json:"c3"` // Current L3 (A) D1 int `json:"d1"` // Delivering / importing L1 (W) D2 int `json:"d2"` // Delivering / importing L2 (W) D3 int `json:"d3"` // Delivering / importing L3 (W) R1 int `json:"r1"` // Returning / exporting L1 (W) R2 int `json:"r2"` // Returning / exporting L2 (W) R3 int `json:"r3"` // Returning / exporting L3 (W) } var db *sql.DB var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) func main() { // Initial logger setup slog.SetDefault(logger) // Load environment variables from .env file if it exists if err := godotenv.Load(); err != nil { logger.Info("No .env file found or error loading .env file (ignore this message when using container)", "error", err) } // Update the logger based on the LOG_LEVEL environment variable logLevel := getLogLevelFromEnv(slog.LevelInfo) // Default to info level logger = createLoggerWithLevel(logLevel) // Example usage of the logger with the new log level logger.Info("Logger initialized with dynamic log level") // Connect to PostgreSQL pgConnStr := os.Getenv("PG_DB") if err := connectToPostgreSQL(pgConnStr); err != nil { log.Fatal("Error connecting to PostgreSQL", "error", err) } // Initialize MQTT options opts := mqtt.NewClientOptions() opts.AddBroker(os.Getenv("MQTT_BROKER")) opts.SetUsername(os.Getenv("MQTT_USERNAME")) opts.SetPassword(os.Getenv("MQTT_PASSWORD")) opts.SetAutoReconnect(true) opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { logger.Error("Connection lost", "error", err) }) opts.SetOnConnectHandler(func(client mqtt.Client) { topic := os.Getenv("MQTT_TOPIC") logger.Info("Connected to MQTT broker, subscribing to topic...", "topic", topic) if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil { logger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error()) } }) // Connect to MQTT broker client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } // Keep the program running select {} } // Helper function to map environment variable to slog.Level func getLogLevelFromEnv(defaultLevel slog.Level) slog.Level { logLevelStr := os.Getenv("LOG_LEVEL") switch logLevelStr { case "DEBUG": return slog.LevelDebug case "INFO": return slog.LevelInfo case "WARN": return slog.LevelWarn case "ERROR": return slog.LevelError default: return defaultLevel } } // Function to create a new logger with a specified log level func createLoggerWithLevel(level slog.Level) *slog.Logger { return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: level, })) } // updateFieldIfChanged ensures that meter readings that haven't been updated aren't written to the database, in order to save storage space. func updateFieldIfChanged(currentValue *int, previousValue *int) (*int, bool) { if currentValue != nil && *currentValue == *previousValue { return nil, false // No change } else if currentValue != nil { *previousValue = *currentValue // Update the previous value to the current one return currentValue, true // Change occurred } return currentValue, false // Return the original value if it's nil, indicating no change } // safeDerefInt handles potential nil pointers to avoid a runtime panic // log messages will display the actual integer values if the pointers are not nil, // or "nil" if the pointers are nil. func safeDerefInt(ptr *int) string { if ptr != nil { return fmt.Sprintf("%d", *ptr) // Dereference the pointer to get the value } return "nil" // Return a string indicating the value is nil } var prevDt1, prevDt2, prevRt1, prevRt2, prevG int func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { // Parse JSON payload var payload Payload err := json.Unmarshal(msg.Payload(), &payload) if err != nil { logger.Error("Error parsing MQTT payload", "error", err) return } // Parse timestamp to time.Time timestamp, err := parseDSMRTimestamp(payload.T) if err != nil { logger.Error("Error parsing timestamp", "error", err) return } var changed bool // Flag to track if any value changed // Update each field, directly updating `changed` if any change is detected var tempChanged bool // Used to capture the change status for each field payload.Dt1, tempChanged = updateFieldIfChanged(payload.Dt1, &prevDt1) changed = changed || tempChanged payload.Dt2, tempChanged = updateFieldIfChanged(payload.Dt2, &prevDt2) changed = changed || tempChanged payload.Rt1, tempChanged = updateFieldIfChanged(payload.Rt1, &prevRt1) changed = changed || tempChanged payload.Rt2, tempChanged = updateFieldIfChanged(payload.Rt2, &prevRt2) changed = changed || tempChanged payload.G, tempChanged = updateFieldIfChanged(payload.G, &prevG) changed = changed || tempChanged // If any value has changed, log all the relevant values if changed { logger.Debug("Values changed", "dt1", safeDerefInt(payload.Dt1), "dt2", safeDerefInt(payload.Dt2), "rt1", safeDerefInt(payload.Rt1), "rt2", safeDerefInt(payload.Rt2), "g", safeDerefInt(payload.G)) } // Insert data into PostgreSQL err = insertData(timestamp, payload) logger.Debug("Inserting values", "t", payload.T, "dt1", payload.Dt1, "dt2", payload.Dt2, "rt1", payload.Rt1, "rt2", payload.Rt2, "d", payload.D, "r", payload.R, "f", payload.F, "fl", payload.Fl, "g", payload.G, "v1", payload.V1, "v2", payload.V2, "v3", payload.V3, "c1", payload.C1, "c2", payload.C2, "c3", payload.C3, "d1", payload.D1, "d2", payload.D2, "d3", payload.D3, "r1", payload.R1, "r2", payload.R2, "r3", payload.R3) if err != nil { logger.Error("Error inserting data into PostgreSQL", "error", err) } } // parseDSMRTimestamp parses the timestamp as emitted by the P1 meter // (YYMMDDhhmmssX, where X is S or W for summer- or wintertime). // // More information: // https://github.com/matthijskooijman/arduino-dsmr/blob/master/README.md // https://srolija.medium.com/gos-summer-time-localization-issues-4c8ab702806b // https://github.com/thomasvnl/P1DSMRReader-ESPHome // https://pkg.go.dev/github.com/mijnverbruik/dsmr#Timestamp.Position // https://pkg.go.dev/github.com/mijnverbruik/dsmr#Timestamp func parseDSMRTimestamp(t string) (time.Time, error) { // Extract values from timestamp string year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0') hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0') // Load location for "Europe/Amsterdam" time zone loc, err := time.LoadLocation("Europe/Amsterdam") if err != nil { return time.Time{}, err } // Create and return the timestamp return time.Date(year, month, day, hour, min, sec, 0, loc), nil } func insertData(timestamp time.Time, payload Payload) error { // Prepare SQL statement stmt := ` INSERT INTO p1 ( timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2, delivery_all, returning_all, failures, long_failures, gas, voltage_l1, voltage_l2, voltage_l3, current_l1, current_l2, current_l3, delivery_l1, delivery_l2, delivery_l3, returning_l1, returning_l2, returning_l3 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22) ` _, err := db.Exec( stmt, timestamp, payload.Dt1, payload.Dt2, payload.Rt1, payload.Rt2, payload.D, payload.R, payload.F, payload.Fl, payload.G, payload.V1, payload.V2, payload.V3, payload.C1, payload.C2, payload.C3, payload.D1, payload.D2, payload.D3, payload.R1, payload.R2, payload.R3, ) return err } func connectToPostgreSQL(pgConnStr string) error { // Connect to PostgreSQL var err error for { db, err = sql.Open("postgres", pgConnStr) if err == nil { break // Successfully connected } logger.Error("Error connecting to PostgreSQL", "error", err) time.Sleep(5 * time.Second) // Retry after 5 seconds } // Create table if not exists _, err = db.Exec(` CREATE TABLE IF NOT EXISTS p1 ( timestamp TIMESTAMPTZ, delivered_tariff1 INT, delivered_tariff2 INT, returned_tariff1 INT, returned_tariff2 INT, delivery_all INT, returning_all INT, failures INT, long_failures INT, gas INT, voltage_l1 INT, voltage_l2 INT, voltage_l3 INT, current_l1 INT, current_l2 INT, current_l3 INT, delivery_l1 INT, delivery_l2 INT, delivery_l3 INT, returning_l1 INT, returning_l2 INT, returning_l3 INT ); `) if err != nil { log.Fatal("Error creating table:", err) } timescaleDBEnabled := os.Getenv("TIMESCALEDB_ENABLED") if timescaleDBEnabled == "true" { // Enable TimescaleDB _, err = db.Exec(` CREATE EXTENSION IF NOT EXISTS timescaledb; SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE); `) if err != nil { log.Fatal("Error creating TimescaleDB extension:", err) } } return nil }