package main import ( "database/sql" "encoding/json" "fmt" "log" "log/slog" "os" "time" _ "time/tzdata" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/joho/godotenv" _ "github.com/lib/pq" ) // Payload struct // // All metrics are stored in PostgreSQL as INT and not NUMERIC or FLOAT to optimise their storage size. // // Dt1, Dt2, Rt1, Rt2 and G are cumulative meter readings. // By making them pointers to int, they can be set to 'nil' when the reading has not changed. // This way, they are logged as NULL in the database when no change has happened, saving storage capacity. // As an added benefit, this also make data retrieval, visualisation and analysis more light-weight. // // Gas consumption is updated once per minute. // Not saving intermittent values theoretically saves 4 bytes per value. // Storage for non-NULL values every second: // 86,400 entries/day×4 bytes/entry=345,600 bytes/day86,400entries/day×4bytes/entry=345,600bytes/day // Storage for non-NULL values every minute: // 1,440 entries/day×4 bytes/entry=5,760 bytes/day1,440entries/day×4bytes/entry=5,760bytes/day // Storage savings: // 345,600 bytes/day−5,760 bytes/day=339,840 bytes/day345,600bytes/day−5,760bytes/day=339,840bytes/day // (about 331.5 KB / day or 9.72 MB / month) // In practice, savings will be even higher when gas is not being consumed continuously. // // When Tariff 1 is active, Tariff 2 isn't. // When energy is being imported, it is not being returned. // Therefore, only updating their values on-change should save at least 75% storage capacity requirements // An average month has 2 629 743.83 seconds. // This saves at least: // 2,629,743.83 seconds * 4 bytes = 10,518,975.32 bytes = 10,518.98 kB = 10.52 MB // This applies both to Dt1 and Dt2 as well as Rt1 and Rt2, only one is recorded at a time. // 10.52 * 3 = 31.56 MB. // In addition, many meters only update these metrics once every 10 seconds, // meaning even more storage capacity is saved: // 10.52 / 10 * 9 = 9,468 MB. type Payload struct { T string `json:"t"` // Timestamp Dt1 *int `json:"dt1"` // Delivered / imported meter reading tariff 1 (kWh) Dt2 *int `json:"dt2"` // Delivered / imported tariff 2 (kWh) Rt1 *int `json:"rt1"` // Returned / exported tariff 1 (kWh) Rt2 *int `json:"rt2"` // Returned / exported tariff 2 (kWh) D int `json:"d"` // Delivering / importing (W) R int `json:"r"` // Returning / exporting (W) F int `json:"f"` // Failure (counter) Fl int `json:"fl"` // Failure long duration (counter) G *int `json:"g"` // Gas meter reading (l) V1 int `json:"v1"` // Voltage L1 (V) V2 int `json:"v2"` // Voltage L2 (V) V3 int `json:"v3"` // Voltage L3 (V) C1 int `json:"c1"` // Current L1 (A) C2 int `json:"c2"` // Current L2 (A) C3 int `json:"c3"` // Current L3 (A) D1 int `json:"d1"` // Delivering / importing L1 (W) D2 int `json:"d2"` // Delivering / importing L2 (W) D3 int `json:"d3"` // Delivering / importing L3 (W) R1 int `json:"r1"` // Returning / exporting L1 (W) R2 int `json:"r2"` // Returning / exporting L2 (W) R3 int `json:"r3"` // Returning / exporting L3 (W) } var db *sql.DB var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) func main() { slog.SetDefault(logger) // Load environment variables from .env file if it exists if err := godotenv.Load(); err != nil { logger.Info("No .env file found or error loading .env file (ignore this message when using container)", "error", err) } // Connect to PostgreSQL pgConnStr := os.Getenv("PG_DB") if err := connectToPostgreSQL(pgConnStr); err != nil { log.Fatal("Error connecting to PostgreSQL", "error", err) } // Initialize MQTT options opts := mqtt.NewClientOptions() opts.AddBroker(os.Getenv("MQTT_BROKER")) opts.SetUsername(os.Getenv("MQTT_USERNAME")) opts.SetPassword(os.Getenv("MQTT_PASSWORD")) opts.SetAutoReconnect(true) opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { logger.Error("Connection lost", "error", err) }) opts.SetOnConnectHandler(func(client mqtt.Client) { topic := os.Getenv("MQTT_TOPIC") logger.Info("Connected to MQTT broker, subscribing to topic...", "topic", topic) if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil { logger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error()) } }) // Connect to MQTT broker client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } // Keep the program running select {} } var prevDt1, prevDt2, prevRt1, prevRt2, prevG int func updateFieldIfChanged(currentValue *int, previousValue *int) (*int, bool) { if currentValue != nil && *currentValue == *previousValue { return nil, false // No change } else if currentValue != nil { *previousValue = *currentValue // Update the previous value to the current one return currentValue, true // Change occurred } return currentValue, false // Return the original value if it's nil, indicating no change } // safeDerefInt handles potential nil pointers to avoid a runtime panic // log messages will display the actual integer values if the pointers are not nil, // or "nil" if the pointers are nil. func safeDerefInt(ptr *int) string { if ptr != nil { return fmt.Sprintf("%d", *ptr) // Dereference the pointer to get the value } return "nil" // Return a string indicating the value is nil } func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { // Parse JSON payload var payload Payload err := json.Unmarshal(msg.Payload(), &payload) if err != nil { logger.Error("Error parsing MQTT payload", "error", err) return } // Parse timestamp to time.Time timestamp, err := parseTimestamp(payload.T) if err != nil { logger.Error("Error parsing timestamp", "error", err) return } var changed bool // Flag to track if any value changed // Update each field, directly updating `changed` if any change is detected var tempChanged bool // Used to capture the change status for each field payload.Dt1, tempChanged = updateFieldIfChanged(payload.Dt1, &prevDt1) changed = changed || tempChanged payload.Dt2, tempChanged = updateFieldIfChanged(payload.Dt2, &prevDt2) changed = changed || tempChanged payload.Rt1, tempChanged = updateFieldIfChanged(payload.Rt1, &prevRt1) changed = changed || tempChanged payload.Rt2, tempChanged = updateFieldIfChanged(payload.Rt2, &prevRt2) changed = changed || tempChanged payload.G, tempChanged = updateFieldIfChanged(payload.G, &prevG) changed = changed || tempChanged // If any value has changed, log all the relevant values if changed { logger.Debug("Values changed", "dt1", safeDerefInt(payload.Dt1), "dt2", safeDerefInt(payload.Dt2), "rt1", safeDerefInt(payload.Rt1), "rt2", safeDerefInt(payload.Rt2), "g", safeDerefInt(payload.G)) } // Insert data into PostgreSQL err = insertData(timestamp, payload) if err != nil { logger.Error("Error inserting data into PostgreSQL", "error", err) } } func parseTimestamp(t string) (time.Time, error) { // Extract values from timestamp string year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0') hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0') // Load location for "Europe/Amsterdam" time zone loc, err := time.LoadLocation("Europe/Amsterdam") if err != nil { return time.Time{}, err } // Create and return the timestamp return time.Date(year, month, day, hour, min, sec, 0, loc), nil } func insertData(timestamp time.Time, payload Payload) error { // Prepare SQL statement stmt := ` INSERT INTO p1 ( timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2, delivery_all, returning_all, failures, long_failures, gas, voltage_l1, voltage_l2, voltage_l3, current_l1, current_l2, current_l3, delivery_l1, delivery_l2, delivery_l3, returning_l1, returning_l2, returning_l3 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22) ` _, err := db.Exec( stmt, timestamp, payload.Dt1, payload.Dt2, payload.Rt1, payload.Rt2, payload.D, payload.R, payload.F, payload.Fl, payload.G, payload.V1, payload.V2, payload.V3, payload.C1, payload.C2, payload.C3, payload.D1, payload.D2, payload.D3, payload.R1, payload.R2, payload.R3, ) return err } func connectToPostgreSQL(pgConnStr string) error { // Connect to PostgreSQL var err error for { db, err = sql.Open("postgres", pgConnStr) if err == nil { break // Successfully connected } logger.Error("Error connecting to PostgreSQL", "error", err) time.Sleep(5 * time.Second) // Retry after 5 seconds } // Enable TimescaleDB _, err = db.Exec(` CREATE EXTENSION IF NOT EXISTS timescaledb; `) if err != nil { log.Fatal("Error creating TimescaleDB extension:", err) } // Create table if not exists _, err = db.Exec(` CREATE TABLE IF NOT EXISTS p1 ( timestamp TIMESTAMPTZ, delivered_tariff1 INT, delivered_tariff2 INT, returned_tariff1 INT, returned_tariff2 INT, delivery_all INT, returning_all INT, failures INT, long_failures INT, gas INT, voltage_l1 INT, voltage_l2 INT, voltage_l3 INT, current_l1 INT, current_l2 INT, current_l3 INT, delivery_l1 INT, delivery_l2 INT, delivery_l3 INT, returning_l1 INT, returning_l2 INT, returning_l3 INT ); SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE); `) if err != nil { log.Fatal("Error creating table:", err) } return nil }