Pieter Hollander
ac512a898b
All checks were successful
Build Docker image / build (push) Successful in 1m8s
365 lines
12 KiB
Go
365 lines
12 KiB
Go
// TODO: Make TimescaleDB optional.
|
||
package main
|
||
|
||
import (
|
||
"database/sql"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"log/slog"
|
||
"os"
|
||
"time"
|
||
_ "time/tzdata"
|
||
|
||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||
"github.com/joho/godotenv"
|
||
_ "github.com/lib/pq"
|
||
)
|
||
|
||
// Payload struct
|
||
//
|
||
// All metrics are stored in PostgreSQL as INT and not NUMERIC or FLOAT to optimise their storage size.
|
||
//
|
||
// Dt1, Dt2, Rt1, Rt2 and G are cumulative meter readings.
|
||
// By making them pointers to int, they can be set to 'nil' when the reading has not changed.
|
||
// This way, they are logged as NULL in the database when no change has happened, saving storage capacity.
|
||
// As an added benefit, this also make data retrieval, visualisation and analysis more light-weight.
|
||
//
|
||
// Gas consumption is updated once per minute.
|
||
// Not saving intermittent values theoretically saves 4 bytes per value.
|
||
//
|
||
// Storage for non-NULL values every second:
|
||
// 86,400 entries/day×4 bytes/entry=345,600 bytes/day86,400entries/day×4bytes/entry=345,600bytes/day
|
||
// Storage for non-NULL values every minute:
|
||
// 1,440 entries/day×4 bytes/entry=5,760 bytes/day1,440entries/day×4bytes/entry=5,760bytes/day
|
||
// Storage savings:
|
||
// 345,600 bytes/day−5,760 bytes/day=339,840 bytes/day345,600bytes/day−5,760bytes/day=339,840bytes/day
|
||
// (about 331.5 KB / day or 9.72 MB / month)
|
||
// In practice, savings will be even higher when gas is not being consumed continuously.
|
||
//
|
||
// When Tariff 1 is active, Tariff 2 isn't.
|
||
// When energy is being imported, it is not being returned.
|
||
//
|
||
// Therefore, only updating their values on-change should save at least 75% storage capacity requirements
|
||
// An average month has 2 629 743.83 seconds.
|
||
// This saves at least:
|
||
// 2,629,743.83 seconds * 4 bytes = 10,518,975.32 bytes = 10,518.98 kB = 10.52 MB
|
||
// This applies both to Dt1 and Dt2 as well as Rt1 and Rt2, only one is recorded at a time.
|
||
// 10.52 * 3 = 31.56 MB.
|
||
//
|
||
// In addition, many meters only update these metrics once every 10 seconds,
|
||
// meaning even more storage capacity is saved:
|
||
// 10.52 / 10 * 9 = 9,468 MB.
|
||
type Payload struct {
|
||
T string `json:"t"` // Timestamp
|
||
Dt1 *int `json:"dt1"` // Delivered / imported meter reading tariff 1 (kWh)
|
||
Dt2 *int `json:"dt2"` // Delivered / imported tariff 2 (kWh)
|
||
Rt1 *int `json:"rt1"` // Returned / exported tariff 1 (kWh)
|
||
Rt2 *int `json:"rt2"` // Returned / exported tariff 2 (kWh)
|
||
D int `json:"d"` // Delivering / importing (W)
|
||
R int `json:"r"` // Returning / exporting (W)
|
||
F int `json:"f"` // Failure (counter)
|
||
Fl int `json:"fl"` // Failure long duration (counter)
|
||
G *int `json:"g"` // Gas meter reading (l)
|
||
V1 int `json:"v1"` // Voltage L1 (V)
|
||
V2 int `json:"v2"` // Voltage L2 (V)
|
||
V3 int `json:"v3"` // Voltage L3 (V)
|
||
C1 int `json:"c1"` // Current L1 (A)
|
||
C2 int `json:"c2"` // Current L2 (A)
|
||
C3 int `json:"c3"` // Current L3 (A)
|
||
D1 int `json:"d1"` // Delivering / importing L1 (W)
|
||
D2 int `json:"d2"` // Delivering / importing L2 (W)
|
||
D3 int `json:"d3"` // Delivering / importing L3 (W)
|
||
R1 int `json:"r1"` // Returning / exporting L1 (W)
|
||
R2 int `json:"r2"` // Returning / exporting L2 (W)
|
||
R3 int `json:"r3"` // Returning / exporting L3 (W)
|
||
}
|
||
|
||
var db *sql.DB
|
||
var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
||
|
||
func main() {
|
||
// Initial logger setup
|
||
slog.SetDefault(logger)
|
||
|
||
// Load environment variables from .env file if it exists
|
||
if err := godotenv.Load(); err != nil {
|
||
logger.Info("No .env file found or error loading .env file (ignore this message when using container)", "error", err)
|
||
}
|
||
|
||
// Update the logger based on the LOG_LEVEL environment variable
|
||
logLevel := getLogLevelFromEnv(slog.LevelInfo) // Default to info level
|
||
logger = createLoggerWithLevel(logLevel)
|
||
|
||
// Example usage of the logger with the new log level
|
||
logger.Info("Logger initialized with dynamic log level")
|
||
|
||
// Connect to PostgreSQL
|
||
pgConnStr := os.Getenv("PG_DB")
|
||
if err := connectToPostgreSQL(pgConnStr); err != nil {
|
||
log.Fatal("Error connecting to PostgreSQL", "error", err)
|
||
}
|
||
|
||
// Initialize MQTT options
|
||
opts := mqtt.NewClientOptions()
|
||
opts.AddBroker(os.Getenv("MQTT_BROKER"))
|
||
opts.SetUsername(os.Getenv("MQTT_USERNAME"))
|
||
opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
|
||
opts.SetAutoReconnect(true)
|
||
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
|
||
logger.Error("Connection lost", "error", err)
|
||
})
|
||
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
||
topic := os.Getenv("MQTT_TOPIC")
|
||
logger.Info("Connected to MQTT broker, subscribing to topic...", "topic", topic)
|
||
if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil {
|
||
logger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error())
|
||
}
|
||
})
|
||
|
||
// Connect to MQTT broker
|
||
client := mqtt.NewClient(opts)
|
||
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
||
log.Fatal(token.Error())
|
||
}
|
||
|
||
// Keep the program running
|
||
select {}
|
||
}
|
||
|
||
// Helper function to map environment variable to slog.Level
|
||
func getLogLevelFromEnv(defaultLevel slog.Level) slog.Level {
|
||
logLevelStr := os.Getenv("LOG_LEVEL")
|
||
switch logLevelStr {
|
||
case "DEBUG":
|
||
return slog.LevelDebug
|
||
case "INFO":
|
||
return slog.LevelInfo
|
||
case "WARN":
|
||
return slog.LevelWarn
|
||
case "ERROR":
|
||
return slog.LevelError
|
||
default:
|
||
return defaultLevel
|
||
}
|
||
}
|
||
|
||
// Function to create a new logger with a specified log level
|
||
func createLoggerWithLevel(level slog.Level) *slog.Logger {
|
||
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||
Level: level,
|
||
}))
|
||
}
|
||
|
||
// updateFieldIfChanged ensures that meter readings that haven't been updated aren't written to the database, in order to save storage space.
|
||
func updateFieldIfChanged(currentValue *int, previousValue *int) (*int, bool) {
|
||
if currentValue != nil && *currentValue == *previousValue {
|
||
return nil, false // No change
|
||
} else if currentValue != nil {
|
||
*previousValue = *currentValue // Update the previous value to the current one
|
||
return currentValue, true // Change occurred
|
||
}
|
||
return currentValue, false // Return the original value if it's nil, indicating no change
|
||
}
|
||
|
||
// safeDerefInt handles potential nil pointers to avoid a runtime panic
|
||
// log messages will display the actual integer values if the pointers are not nil,
|
||
// or "nil" if the pointers are nil.
|
||
func safeDerefInt(ptr *int) string {
|
||
if ptr != nil {
|
||
return fmt.Sprintf("%d", *ptr) // Dereference the pointer to get the value
|
||
}
|
||
return "nil" // Return a string indicating the value is nil
|
||
}
|
||
|
||
var prevDt1, prevDt2, prevRt1, prevRt2, prevG int
|
||
|
||
func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
|
||
// Parse JSON payload
|
||
var payload Payload
|
||
err := json.Unmarshal(msg.Payload(), &payload)
|
||
if err != nil {
|
||
logger.Error("Error parsing MQTT payload", "error", err)
|
||
return
|
||
}
|
||
|
||
// Parse timestamp to time.Time
|
||
timestamp, err := parseDSMRTimestamp(payload.T)
|
||
if err != nil {
|
||
logger.Error("Error parsing timestamp", "error", err)
|
||
return
|
||
}
|
||
|
||
var changed bool // Flag to track if any value changed
|
||
|
||
// Update each field, directly updating `changed` if any change is detected
|
||
var tempChanged bool // Used to capture the change status for each field
|
||
payload.Dt1, tempChanged = updateFieldIfChanged(payload.Dt1, &prevDt1)
|
||
changed = changed || tempChanged
|
||
|
||
payload.Dt2, tempChanged = updateFieldIfChanged(payload.Dt2, &prevDt2)
|
||
changed = changed || tempChanged
|
||
|
||
payload.Rt1, tempChanged = updateFieldIfChanged(payload.Rt1, &prevRt1)
|
||
changed = changed || tempChanged
|
||
|
||
payload.Rt2, tempChanged = updateFieldIfChanged(payload.Rt2, &prevRt2)
|
||
changed = changed || tempChanged
|
||
|
||
payload.G, tempChanged = updateFieldIfChanged(payload.G, &prevG)
|
||
changed = changed || tempChanged
|
||
|
||
// If any value has changed, log all the relevant values
|
||
if changed {
|
||
logger.Debug("Values changed",
|
||
"dt1", safeDerefInt(payload.Dt1),
|
||
"dt2", safeDerefInt(payload.Dt2),
|
||
"rt1", safeDerefInt(payload.Rt1),
|
||
"rt2", safeDerefInt(payload.Rt2),
|
||
"g", safeDerefInt(payload.G))
|
||
}
|
||
// Insert data into PostgreSQL
|
||
err = insertData(timestamp, payload)
|
||
logger.Debug("Inserting values",
|
||
"t", payload.T,
|
||
"dt1", payload.Dt1,
|
||
"dt2", payload.Dt2,
|
||
"rt1", payload.Rt1,
|
||
"rt2", payload.Rt2,
|
||
"d", payload.D,
|
||
"r", payload.R,
|
||
"f", payload.F,
|
||
"fl", payload.Fl,
|
||
"g", payload.G,
|
||
"v1", payload.V1,
|
||
"v2", payload.V2,
|
||
"v3", payload.V3,
|
||
"c1", payload.C1,
|
||
"c2", payload.C2,
|
||
"c3", payload.C3,
|
||
"d1", payload.D1,
|
||
"d2", payload.D2,
|
||
"d3", payload.D3,
|
||
"r1", payload.R1,
|
||
"r2", payload.R2,
|
||
"r3", payload.R3)
|
||
if err != nil {
|
||
logger.Error("Error inserting data into PostgreSQL", "error", err)
|
||
}
|
||
}
|
||
|
||
// parseDSMRTimestamp parses the timestamp as emitted by the P1 meter
|
||
// (YYMMDDhhmmssX, where X is S or W for summer- or wintertime).
|
||
//
|
||
// More information:
|
||
// https://github.com/matthijskooijman/arduino-dsmr/blob/master/README.md
|
||
// https://srolija.medium.com/gos-summer-time-localization-issues-4c8ab702806b
|
||
// https://github.com/thomasvnl/P1DSMRReader-ESPHome
|
||
// https://pkg.go.dev/github.com/mijnverbruik/dsmr#Timestamp.Position
|
||
// https://pkg.go.dev/github.com/mijnverbruik/dsmr#Timestamp
|
||
func parseDSMRTimestamp(t string) (time.Time, error) {
|
||
|
||
// Extract values from timestamp string
|
||
year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0')
|
||
hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0')
|
||
|
||
// Load location for "Europe/Amsterdam" time zone
|
||
loc, err := time.LoadLocation("Europe/Amsterdam")
|
||
if err != nil {
|
||
return time.Time{}, err
|
||
}
|
||
|
||
// Create and return the timestamp
|
||
return time.Date(year, month, day, hour, min, sec, 0, loc), nil
|
||
|
||
}
|
||
|
||
func insertData(timestamp time.Time, payload Payload) error {
|
||
// Prepare SQL statement
|
||
stmt := `
|
||
INSERT INTO p1 (
|
||
timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2,
|
||
delivery_all, returning_all, failures, long_failures, gas,
|
||
voltage_l1, voltage_l2, voltage_l3,
|
||
current_l1, current_l2, current_l3,
|
||
delivery_l1, delivery_l2, delivery_l3,
|
||
returning_l1, returning_l2, returning_l3
|
||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
|
||
`
|
||
_, err := db.Exec(
|
||
stmt,
|
||
timestamp,
|
||
payload.Dt1, payload.Dt2,
|
||
payload.Rt1, payload.Rt2,
|
||
payload.D, payload.R,
|
||
payload.F, payload.Fl,
|
||
payload.G,
|
||
payload.V1, payload.V2, payload.V3,
|
||
payload.C1, payload.C2, payload.C3,
|
||
payload.D1, payload.D2, payload.D3,
|
||
payload.R1, payload.R2, payload.R3,
|
||
)
|
||
return err
|
||
}
|
||
|
||
func connectToPostgreSQL(pgConnStr string) error {
|
||
// Connect to PostgreSQL
|
||
var err error
|
||
for {
|
||
db, err = sql.Open("postgres", pgConnStr)
|
||
if err == nil {
|
||
break // Successfully connected
|
||
}
|
||
|
||
logger.Error("Error connecting to PostgreSQL", "error", err)
|
||
time.Sleep(5 * time.Second) // Retry after 5 seconds
|
||
}
|
||
|
||
// Create table if not exists
|
||
_, err = db.Exec(`
|
||
CREATE TABLE IF NOT EXISTS p1 (
|
||
timestamp TIMESTAMPTZ,
|
||
delivered_tariff1 INT,
|
||
delivered_tariff2 INT,
|
||
returned_tariff1 INT,
|
||
returned_tariff2 INT,
|
||
delivery_all INT,
|
||
returning_all INT,
|
||
failures INT,
|
||
long_failures INT,
|
||
gas INT,
|
||
voltage_l1 INT,
|
||
voltage_l2 INT,
|
||
voltage_l3 INT,
|
||
current_l1 INT,
|
||
current_l2 INT,
|
||
current_l3 INT,
|
||
delivery_l1 INT,
|
||
delivery_l2 INT,
|
||
delivery_l3 INT,
|
||
returning_l1 INT,
|
||
returning_l2 INT,
|
||
returning_l3 INT
|
||
);
|
||
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON log (timestamp);
|
||
`)
|
||
if err != nil {
|
||
log.Fatal("Error creating table:", err)
|
||
}
|
||
|
||
timescaleDBEnabled := os.Getenv("TIMESCALEDB_ENABLED")
|
||
|
||
if timescaleDBEnabled == "true" {
|
||
// Enable TimescaleDB
|
||
_, err = db.Exec(`
|
||
CREATE EXTENSION IF NOT EXISTS timescaledb;
|
||
SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
|
||
`)
|
||
if err != nil {
|
||
log.Fatal("Error creating TimescaleDB extension:", err)
|
||
}
|
||
|
||
}
|
||
|
||
return nil
|
||
}
|