p1-logger/main.go
Pieter Hollander 7b01faa277
All checks were successful
Build Docker image / build (push) Successful in 1m8s
Build Golang packages / release (push) Has been skipped
Move initial configuration to separate loadConfig function. Add support for HA options.json.
2024-07-25 23:00:09 +02:00

518 lines
16 KiB
Go

package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"log/slog"
"os"
"strconv"
"time"
_ "time/tzdata"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/joho/godotenv"
_ "github.com/lib/pq"
)
// Payload struct
//
// All metrics are stored in PostgreSQL as INT and not NUMERIC or FLOAT to optimise their storage size.
//
// Dt1, Dt2, Rt1, Rt2, G, F and Fl are cumulative meter readings.
// Therefore, they function as counters that can only go up.
// By making them pointers to int, they can be set to 'nil' when the reading has not changed.
// This way, they are logged as NULL in the database when no change has happened, saving storage capacity.
// As an added benefit, this also make data retrieval, visualisation and analysis more light-weight.
//
// Gas consumption is updated once per minute.
// Not saving intermittent values theoretically saves 4 bytes per value.
//
// An average month has 43 829.0639 minutes.
// 4*43829,0639 = 175,316.26 bytes = 0.18 MB
//
// In practice, storage will be even lower when gas is not being consumed continuously,
// as a new metric won't be available for every minute.
//
// When Tariff 1 is active, Tariff 2 isn't.
// When energy is being imported, it is not being returned.
//
// Therefore, only updating their values on-change should save at least 75% storage capacity requirements
// An average month has 2 629 743.83 seconds.
// This saves at least:
// 2,629,743.83 seconds * 4 bytes = 10,518,975.32 bytes = 10,518.98 kB = 10.52 MB
// This applies both to Dt1 and Dt2 as well as Rt1 and Rt2, only one is recorded at a time.
// 10.52 * 3 = 31.56 MB.
//
// In addition, many meters only update these metrics once every 10 seconds,
// meaning even more storage capacity is saved:
// 10.52 / 10 * 9 = 9,468 MB.
//
// For D1-3 and R1-3, either Dx or Rx will be active.
// Therefore, their storage requirements can be sliced in half, saving 3 * 10.52 = 42.08 MB
// This is different for D and R, as these contain the sum of each phase and are reported simultaneously.
//
// Not saving failures when there is no change, will save around 10.52MB per month per metric, so an additional 21.04MB
// Theoretical storage requirements for a DSMR5 meter with metrics emitted every second:
//
// Column Column size (bytes) Per month (MB) X Total (MB)
// ---------------------------------------------------------------------------
// Timestamp 8 bytes 021.04
// Dt1, Dt2, Rt1, Rt2 4 bytes 001.05
// D, R 4 bytes 010.52
// R 4 bytes 010.52
// F 4 bytes 000.00
// Fl 4 bytes 000.00
// G 4 bytes 000.18
// V1-3, C1-3 4 bytes 010.52 6 063.12
// D1-3, R1-3 4 bytes 010.52 3 031.56
// ---------------------------------------------------------------------------
// Total 137.99
//
// Compared to no optimisations:
//
// Column Column size (bytes) Per month (MB) X Total (MB)
// ---------------------------------------------------------------------------
// Timestamps 8 bytes 21.04
// Values 4 bytes 10.52 21 220.92
// ---------------------------------------------------------------------------
// Total 241.96
//
// Conclusion:
// At the expense of having to create more complicated queries for analysis and visualisation,
// the optimisations save at least 103.97 MB or 1-137.99/241.96 = 43%
// in uncompressed storage space per month.
type Payload struct {
T string `json:"t"` // Timestamp
Dt1 *int `json:"dt1"` // Delivered / imported meter reading tariff 1 (kWh)
Dt2 *int `json:"dt2"` // Delivered / imported tariff 2 (kWh)
Rt1 *int `json:"rt1"` // Returned / exported tariff 1 (kWh)
Rt2 *int `json:"rt2"` // Returned / exported tariff 2 (kWh)
D *int `json:"d"` // Delivering / importing (W)
R *int `json:"r"` // Returning / exporting (W)
F *int `json:"f"` // Failure (counter)
Fl *int `json:"fl"` // Failure long duration (counter)
G *int `json:"g"` // Gas meter reading (l)
V1 int `json:"v1"` // Voltage L1 (V)
V2 int `json:"v2"` // Voltage L2 (V)
V3 int `json:"v3"` // Voltage L3 (V)
C1 int `json:"c1"` // Current L1 (A)
C2 int `json:"c2"` // Current L2 (A)
C3 int `json:"c3"` // Current L3 (A)
D1 *int `json:"d1"` // Delivering / importing L1 (W)
D2 *int `json:"d2"` // Delivering / importing L2 (W)
D3 *int `json:"d3"` // Delivering / importing L3 (W)
R1 *int `json:"r1"` // Returning / exporting L1 (W)
R2 *int `json:"r2"` // Returning / exporting L2 (W)
R3 *int `json:"r3"` // Returning / exporting L3 (W)
}
type Config struct {
MQTTBroker string `json:"mqtt_broker"`
MQTTTopic string `json:"mqtt_topic"`
MQTTUser string `json:"mqtt_user"`
MQTTPassword string `json:"mqtt_password"`
DB string `json:"DB"`
TimescaleDB bool `json:"timescaledb"`
LogLevel string `json:"log_level"`
TZ string `json:"timezone"`
}
var db *sql.DB
var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
var config Config
func loadConfig() Config {
configFilePath := os.Getenv("CONFIG_FILE")
if configFilePath == "" {
configFilePath = "/data/options.json"
}
data, err := os.ReadFile(configFilePath)
if err == nil {
// Successfully read the file, parse the JSON
err = json.Unmarshal(data, &config)
if err != nil {
log.Fatalf("Error parsing config file: %v", err)
}
} else {
// Load environment variables from .env file if it exists
if err := godotenv.Load(); err != nil {
logger.Info("No .env file found or error loading .env file (ignore this message when using container)", "error", err)
}
// Fallback to environment variables
config.MQTTBroker = os.Getenv("MQTT_BROKER")
if config.MQTTBroker == "" {
log.Fatal("MQTT_BROKER environment variable is not set.")
}
config.MQTTTopic = os.Getenv("MQTT_TOPIC")
if config.MQTTTopic == "" {
log.Fatal("MQTT_TOPIC environment variable is not set.")
}
config.MQTTUser = os.Getenv("MQTT_USERNAME")
if config.MQTTUser == "" {
log.Fatal("MQTT_USERNAME environment variable is not set.")
}
config.MQTTPassword = os.Getenv("MQTT_PASSWORD")
if config.MQTTPassword == "" {
log.Fatal("MQTT_PASSWORD environment variable is not set.")
}
config.DB = os.Getenv("PG_DB")
if config.DB == "" {
log.Fatal("PG_DB environment variable is not set.")
}
timescaleDBStr := os.Getenv("TIMESCALEDB_ENABLED")
if timescaleDBStr != "" {
timescaleDB, err := strconv.ParseBool(timescaleDBStr)
if err != nil {
log.Fatalf("Error parsing TIMESCALEDB_ENABLED: %v", err)
}
config.TimescaleDB = timescaleDB
}
config.LogLevel = os.Getenv("LOG_LEVEL")
if config.DB == "" {
log.Fatal("LOG_LEVEL environment variable is not set.")
}
config.TZ = os.Getenv("TZ")
if config.DB == "" {
log.Fatal("TZ environment variable is not set.")
}
}
return config
}
func main() {
// Initial logger setup
slog.SetDefault(logger)
loadConfig()
// Update the logger based on the LOG_LEVEL environment variable
logLevel := getLogLevelFromEnv(slog.LevelInfo) // Default to info level
logger = createLoggerWithLevel(logLevel)
// Example usage of the logger with the new log level
logger.Info("Logger initialized with dynamic log level")
// Connect to PostgreSQL
pgConnStr := config.DB
if err := connectToPostgreSQL(pgConnStr); err != nil {
log.Fatal("Error connecting to PostgreSQL", "error", err)
}
// Initialize MQTT options
opts := mqtt.NewClientOptions()
opts.AddBroker(config.MQTTBroker)
opts.SetUsername(config.MQTTUser)
opts.SetPassword(config.MQTTPassword)
opts.SetAutoReconnect(true)
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
logger.Error("Connection lost", "error", err)
})
opts.SetOnConnectHandler(func(client mqtt.Client) {
topic := config.MQTTTopic
logger.Info("Connected to MQTT broker, subscribing to topic...", "topic", topic)
if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil {
logger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error())
}
})
// Connect to MQTT broker
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
// Keep the program running
select {}
}
// Helper function to map environment variable to slog.Level
func getLogLevelFromEnv(defaultLevel slog.Level) slog.Level {
logLevelStr := config.LogLevel
switch logLevelStr {
case "DEBUG":
return slog.LevelDebug
case "INFO":
return slog.LevelInfo
case "WARN":
return slog.LevelWarn
case "ERROR":
return slog.LevelError
default:
return defaultLevel
}
}
// Function to create a new logger with a specified log level
func createLoggerWithLevel(level slog.Level) *slog.Logger {
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
}))
}
// updateFieldIfChanged ensures that meter readings that haven't been updated aren't written to the database, in order to save storage space.
// If they haven't changed, they are set to nil, meaning they will be inserted as NULL into the database.
func updateFieldIfChanged(currentValue *int, previousValue *int) (*int, bool) {
if currentValue != nil && *currentValue == *previousValue {
return nil, false // No change
} else if currentValue != nil {
*previousValue = *currentValue // Update the previous value to the current one
return currentValue, true // Change occurred
}
return currentValue, false // Return the original value if it's nil, indicating no change
}
// safeDerefInt handles potential nil pointers to avoid a runtime panic
// log messages will display the actual integer values if the pointers are not nil,
// or "nil" if the pointers are nil.
func safeDerefInt(ptr *int) string {
if ptr != nil {
return fmt.Sprintf("%d", *ptr) // Dereference the pointer to get the value
}
return "nil" // Return a string indicating the value is nil
}
var prevDt1, prevDt2, prevRt1, prevRt2, prevG, prevF, prevFl int
var prevD, prevR int
var prevD1, prevD2, prevD3, prevR1, prevR2, prevR3 int
func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
// Parse JSON payload
var payload Payload
err := json.Unmarshal(msg.Payload(), &payload)
if err != nil {
logger.Error("Error parsing MQTT payload", "error", err)
return
}
// Parse timestamp to time.Time
timestamp, err := parseDSMRTimestamp(payload.T)
if err != nil {
logger.Error("Error parsing timestamp", "error", err)
return
}
var changed bool // Flag to track if any value changed
// Update each field, directly updating `changed` if any change is detected
var tempChanged bool // Used to capture the change status for each field
// Electricity meter readings
payload.Dt1, tempChanged = updateFieldIfChanged(payload.Dt1, &prevDt1)
changed = changed || tempChanged
payload.Dt2, tempChanged = updateFieldIfChanged(payload.Dt2, &prevDt2)
changed = changed || tempChanged
payload.Rt1, tempChanged = updateFieldIfChanged(payload.Rt1, &prevRt1)
changed = changed || tempChanged
payload.Rt2, tempChanged = updateFieldIfChanged(payload.Rt2, &prevRt2)
changed = changed || tempChanged
// Faults
payload.F, tempChanged = updateFieldIfChanged(payload.F, &prevF)
changed = changed || tempChanged
payload.Fl, tempChanged = updateFieldIfChanged(payload.Fl, &prevFl)
changed = changed || tempChanged
// Gas
payload.G, tempChanged = updateFieldIfChanged(payload.G, &prevG)
changed = changed || tempChanged
// D, R
payload.D, tempChanged = updateFieldIfChanged(payload.D, &prevD)
changed = changed || tempChanged
payload.R, tempChanged = updateFieldIfChanged(payload.R, &prevR)
changed = changed || tempChanged
// D1-3
payload.D1, tempChanged = updateFieldIfChanged(payload.D1, &prevD1)
changed = changed || tempChanged
payload.D2, tempChanged = updateFieldIfChanged(payload.D2, &prevD2)
changed = changed || tempChanged
payload.D3, tempChanged = updateFieldIfChanged(payload.D3, &prevD3)
changed = changed || tempChanged
// R1-3
payload.R1, tempChanged = updateFieldIfChanged(payload.R1, &prevR1)
changed = changed || tempChanged
payload.R2, tempChanged = updateFieldIfChanged(payload.R2, &prevR2)
changed = changed || tempChanged
payload.R3, tempChanged = updateFieldIfChanged(payload.R3, &prevR3)
changed = changed || tempChanged
// If any value has changed, log all the relevant values
if changed {
logger.Debug("Values changed",
"dt1", safeDerefInt(payload.Dt1),
"dt2", safeDerefInt(payload.Dt2),
"rt1", safeDerefInt(payload.Rt1),
"rt2", safeDerefInt(payload.Rt2),
"d", safeDerefInt(payload.D),
"r", safeDerefInt(payload.R),
"f", safeDerefInt(payload.F),
"fl", safeDerefInt(payload.Fl),
"g", safeDerefInt(payload.G),
"d1", safeDerefInt(payload.D1),
"d2", safeDerefInt(payload.D2),
"d3", safeDerefInt(payload.D3),
"r1", safeDerefInt(payload.R1),
"r2", safeDerefInt(payload.R2),
"r3", safeDerefInt(payload.R3),
)
}
// Insert data into PostgreSQL
err = insertData(timestamp, payload)
logger.Debug("Inserting values",
"t", payload.T,
"dt1", payload.Dt1,
"dt2", payload.Dt2,
"rt1", payload.Rt1,
"rt2", payload.Rt2,
"d", payload.D,
"r", payload.R,
"f", payload.F,
"fl", payload.Fl,
"g", payload.G,
"v1", payload.V1,
"v2", payload.V2,
"v3", payload.V3,
"c1", payload.C1,
"c2", payload.C2,
"c3", payload.C3,
"d1", payload.D1,
"d2", payload.D2,
"d3", payload.D3,
"r1", payload.R1,
"r2", payload.R2,
"r3", payload.R3)
if err != nil {
logger.Error("Error inserting data into PostgreSQL", "error", err)
}
}
// parseDSMRTimestamp parses the timestamp as emitted by the P1 meter
// (YYMMDDhhmmssX, where X is S or W for summer- or wintertime).
//
// More information:
// https://github.com/matthijskooijman/arduino-dsmr/blob/master/README.md
// https://srolija.medium.com/gos-summer-time-localization-issues-4c8ab702806b
// https://github.com/thomasvnl/P1DSMRReader-ESPHome
// https://pkg.go.dev/github.com/mijnverbruik/dsmr#Timestamp.Position
// https://pkg.go.dev/github.com/mijnverbruik/dsmr#Timestamp
func parseDSMRTimestamp(t string) (time.Time, error) {
// Extract values from timestamp string
year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0')
hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0')
// Load location for "Europe/Amsterdam" time zone
loc, err := time.LoadLocation("Europe/Amsterdam")
if err != nil {
return time.Time{}, err
}
// Create and return the timestamp
return time.Date(year, month, day, hour, min, sec, 0, loc), nil
}
func insertData(timestamp time.Time, payload Payload) error {
// Prepare SQL statement
stmt := `
INSERT INTO p1 (
timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2,
delivery_all, returning_all, failures, long_failures, gas,
voltage_l1, voltage_l2, voltage_l3,
current_l1, current_l2, current_l3,
delivery_l1, delivery_l2, delivery_l3,
returning_l1, returning_l2, returning_l3
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
`
_, err := db.Exec(
stmt,
timestamp,
payload.Dt1, payload.Dt2,
payload.Rt1, payload.Rt2,
payload.D, payload.R,
payload.F, payload.Fl,
payload.G,
payload.V1, payload.V2, payload.V3,
payload.C1, payload.C2, payload.C3,
payload.D1, payload.D2, payload.D3,
payload.R1, payload.R2, payload.R3,
)
return err
}
func connectToPostgreSQL(pgConnStr string) error {
// Connect to PostgreSQL
var err error
for {
db, err = sql.Open("postgres", pgConnStr)
if err == nil {
break // Successfully connected
}
logger.Error("Error connecting to PostgreSQL", "error", err)
time.Sleep(5 * time.Second) // Retry after 5 seconds
}
// Create table if not exists
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS p1 (
timestamp TIMESTAMPTZ,
delivered_tariff1 INT,
delivered_tariff2 INT,
returned_tariff1 INT,
returned_tariff2 INT,
delivery_all INT,
returning_all INT,
failures INT,
long_failures INT,
gas INT,
voltage_l1 INT,
voltage_l2 INT,
voltage_l3 INT,
current_l1 INT,
current_l2 INT,
current_l3 INT,
delivery_l1 INT,
delivery_l2 INT,
delivery_l3 INT,
returning_l1 INT,
returning_l2 INT,
returning_l3 INT
);
-- CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON p1 (timestamp);
`)
if err != nil {
log.Fatal("Error creating table:", err)
}
if config.TimescaleDB {
// Enable TimescaleDB
_, err = db.Exec(`
SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
`)
if err != nil {
log.Fatal("Error creating TimescaleDB hypertable:", err)
}
}
return nil
}