2024-08-10 23:44:13 +02:00
|
|
|
|
// TODO: Process p1/online status.
|
|
|
|
|
// TODO: Add health check endpoint.
|
|
|
|
|
// TODO: Add multiple P1 monitoring capabilities
|
2023-12-05 17:37:50 +01:00
|
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"database/sql"
|
|
|
|
|
"encoding/json"
|
2024-02-18 18:02:07 +01:00
|
|
|
|
"fmt"
|
2024-07-26 22:02:47 +02:00
|
|
|
|
"io/fs"
|
2023-12-05 17:37:50 +01:00
|
|
|
|
"log"
|
2024-02-20 16:05:33 +01:00
|
|
|
|
"log/slog"
|
2023-12-05 17:37:50 +01:00
|
|
|
|
"os"
|
2024-07-25 23:00:09 +02:00
|
|
|
|
"strconv"
|
2023-12-05 17:37:50 +01:00
|
|
|
|
"time"
|
2023-12-05 18:00:10 +01:00
|
|
|
|
_ "time/tzdata"
|
2023-12-05 17:37:50 +01:00
|
|
|
|
|
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
2024-02-20 15:50:50 +01:00
|
|
|
|
"github.com/joho/godotenv"
|
2023-12-05 17:37:50 +01:00
|
|
|
|
_ "github.com/lib/pq"
|
2024-07-26 22:02:47 +02:00
|
|
|
|
"github.com/pressly/goose/v3"
|
|
|
|
|
|
|
|
|
|
"git.hollander.online/energy/p1-logger/migrations"
|
2023-12-05 17:37:50 +01:00
|
|
|
|
)
|
|
|
|
|
|
2024-02-18 15:10:37 +01:00
|
|
|
|
// Payload struct
|
2024-02-18 15:39:55 +01:00
|
|
|
|
//
|
|
|
|
|
// All metrics are stored in PostgreSQL as INT and not NUMERIC or FLOAT to optimise their storage size.
|
|
|
|
|
//
|
2024-02-22 12:18:11 +01:00
|
|
|
|
// Dt1, Dt2, Rt1, Rt2, G, F and Fl are cumulative meter readings.
|
|
|
|
|
// Therefore, they function as counters that can only go up.
|
2024-02-18 15:10:37 +01:00
|
|
|
|
// By making them pointers to int, they can be set to 'nil' when the reading has not changed.
|
|
|
|
|
// This way, they are logged as NULL in the database when no change has happened, saving storage capacity.
|
2024-02-18 15:39:55 +01:00
|
|
|
|
// As an added benefit, this also make data retrieval, visualisation and analysis more light-weight.
|
|
|
|
|
//
|
|
|
|
|
// Gas consumption is updated once per minute.
|
|
|
|
|
// Not saving intermittent values theoretically saves 4 bytes per value.
|
2024-02-20 16:55:12 +01:00
|
|
|
|
//
|
2024-02-22 12:18:11 +01:00
|
|
|
|
// An average month has 43 829.0639 minutes.
|
|
|
|
|
// 4*43829,0639 = 175,316.26 bytes = 0.18 MB
|
|
|
|
|
//
|
|
|
|
|
// In practice, storage will be even lower when gas is not being consumed continuously,
|
|
|
|
|
// as a new metric won't be available for every minute.
|
2024-02-18 15:39:55 +01:00
|
|
|
|
//
|
|
|
|
|
// When Tariff 1 is active, Tariff 2 isn't.
|
2024-02-18 18:04:29 +01:00
|
|
|
|
// When energy is being imported, it is not being returned.
|
2024-02-20 16:55:12 +01:00
|
|
|
|
//
|
|
|
|
|
// Therefore, only updating their values on-change should save at least 75% storage capacity requirements
|
|
|
|
|
// An average month has 2 629 743.83 seconds.
|
|
|
|
|
// This saves at least:
|
|
|
|
|
// 2,629,743.83 seconds * 4 bytes = 10,518,975.32 bytes = 10,518.98 kB = 10.52 MB
|
|
|
|
|
// This applies both to Dt1 and Dt2 as well as Rt1 and Rt2, only one is recorded at a time.
|
|
|
|
|
// 10.52 * 3 = 31.56 MB.
|
|
|
|
|
//
|
2024-02-18 18:10:12 +01:00
|
|
|
|
// In addition, many meters only update these metrics once every 10 seconds,
|
|
|
|
|
// meaning even more storage capacity is saved:
|
|
|
|
|
// 10.52 / 10 * 9 = 9,468 MB.
|
2024-02-22 12:18:11 +01:00
|
|
|
|
//
|
2024-03-20 00:42:34 +01:00
|
|
|
|
// For D1-3 and R1-3, either Dx or Rx will be active.
|
|
|
|
|
// Therefore, their storage requirements can be sliced in half, saving 3 * 10.52 = 42.08 MB
|
|
|
|
|
// This is different for D and R, as these contain the sum of each phase and are reported simultaneously.
|
2024-02-22 12:18:11 +01:00
|
|
|
|
//
|
|
|
|
|
// Not saving failures when there is no change, will save around 10.52MB per month per metric, so an additional 21.04MB
|
|
|
|
|
// Theoretical storage requirements for a DSMR5 meter with metrics emitted every second:
|
|
|
|
|
//
|
|
|
|
|
// Column Column size (bytes) Per month (MB) X Total (MB)
|
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
|
// Timestamp 8 bytes 021.04
|
|
|
|
|
// Dt1, Dt2, Rt1, Rt2 4 bytes 001.05
|
|
|
|
|
// D, R 4 bytes 010.52
|
|
|
|
|
// R 4 bytes 010.52
|
|
|
|
|
// F 4 bytes 000.00
|
|
|
|
|
// Fl 4 bytes 000.00
|
|
|
|
|
// G 4 bytes 000.18
|
|
|
|
|
// V1-3, C1-3 4 bytes 010.52 6 063.12
|
|
|
|
|
// D1-3, R1-3 4 bytes 010.52 3 031.56
|
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
|
// Total 137.99
|
2024-02-22 12:43:29 +01:00
|
|
|
|
//
|
|
|
|
|
// Compared to no optimisations:
|
|
|
|
|
//
|
|
|
|
|
// Column Column size (bytes) Per month (MB) X Total (MB)
|
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
|
// Timestamps 8 bytes 21.04
|
|
|
|
|
// Values 4 bytes 10.52 21 220.92
|
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
|
// Total 241.96
|
|
|
|
|
//
|
|
|
|
|
// Conclusion:
|
|
|
|
|
// At the expense of having to create more complicated queries for analysis and visualisation,
|
|
|
|
|
// the optimisations save at least 103.97 MB or 1-137.99/241.96 = 43%
|
|
|
|
|
// in uncompressed storage space per month.
|
2023-12-05 17:37:50 +01:00
|
|
|
|
type Payload struct {
|
2024-02-18 15:10:37 +01:00
|
|
|
|
T string `json:"t"` // Timestamp
|
|
|
|
|
Dt1 *int `json:"dt1"` // Delivered / imported meter reading tariff 1 (kWh)
|
|
|
|
|
Dt2 *int `json:"dt2"` // Delivered / imported tariff 2 (kWh)
|
|
|
|
|
Rt1 *int `json:"rt1"` // Returned / exported tariff 1 (kWh)
|
|
|
|
|
Rt2 *int `json:"rt2"` // Returned / exported tariff 2 (kWh)
|
2024-02-22 12:18:11 +01:00
|
|
|
|
D *int `json:"d"` // Delivering / importing (W)
|
|
|
|
|
R *int `json:"r"` // Returning / exporting (W)
|
|
|
|
|
F *int `json:"f"` // Failure (counter)
|
|
|
|
|
Fl *int `json:"fl"` // Failure long duration (counter)
|
2024-07-26 22:02:47 +02:00
|
|
|
|
Sa1 *int `json:"sa1"` // Number of voltage sags L1
|
|
|
|
|
Sa2 *int `json:"sa2"` // Number of voltage sags L2
|
|
|
|
|
Sa3 *int `json:"sa3"` // Number of voltage sags L3
|
|
|
|
|
Sw1 *int `json:"sw1"` // Number of voltage swells L1
|
|
|
|
|
Sw2 *int `json:"sw2"` // Number of voltage swells L1
|
|
|
|
|
Sw3 *int `json:"sw3"` // Number of voltage swells L1
|
2024-02-18 15:10:37 +01:00
|
|
|
|
G *int `json:"g"` // Gas meter reading (l)
|
|
|
|
|
V1 int `json:"v1"` // Voltage L1 (V)
|
|
|
|
|
V2 int `json:"v2"` // Voltage L2 (V)
|
|
|
|
|
V3 int `json:"v3"` // Voltage L3 (V)
|
|
|
|
|
C1 int `json:"c1"` // Current L1 (A)
|
|
|
|
|
C2 int `json:"c2"` // Current L2 (A)
|
|
|
|
|
C3 int `json:"c3"` // Current L3 (A)
|
2024-02-22 12:18:11 +01:00
|
|
|
|
D1 *int `json:"d1"` // Delivering / importing L1 (W)
|
|
|
|
|
D2 *int `json:"d2"` // Delivering / importing L2 (W)
|
|
|
|
|
D3 *int `json:"d3"` // Delivering / importing L3 (W)
|
|
|
|
|
R1 *int `json:"r1"` // Returning / exporting L1 (W)
|
|
|
|
|
R2 *int `json:"r2"` // Returning / exporting L2 (W)
|
|
|
|
|
R3 *int `json:"r3"` // Returning / exporting L3 (W)
|
2024-07-26 22:02:47 +02:00
|
|
|
|
|
|
|
|
|
GBe *int `json:"gbe"` // Gas meter reading (l) (Belgium)
|
|
|
|
|
CAQDBe *int `json:"aeicad"` // Current Average Quarterly Demand for Peak Tariff (Belgium)
|
|
|
|
|
CMMQDBe *int `json:"cmmqd"` // Current Month’s Maximum Quarterly Demand for Peak Tarrif (Belgium)
|
|
|
|
|
TMMQDBe *int `json:"13mmqd"` // 13 Month Maximum Quarterly Demand for Peak Tarrif (Belgium).
|
|
|
|
|
|
|
|
|
|
DLu *int `json:"dlu"` // Energy Delivered (Luxembourg)
|
|
|
|
|
RLu *int `json:"rlu"` // Energy Returned (Luxembourg)
|
2023-12-05 17:37:50 +01:00
|
|
|
|
}
|
|
|
|
|
|
2024-07-25 23:00:09 +02:00
|
|
|
|
type Config struct {
|
|
|
|
|
MQTTBroker string `json:"mqtt_broker"`
|
|
|
|
|
MQTTTopic string `json:"mqtt_topic"`
|
|
|
|
|
MQTTUser string `json:"mqtt_user"`
|
|
|
|
|
MQTTPassword string `json:"mqtt_password"`
|
2024-07-25 23:25:30 +02:00
|
|
|
|
DB string `json:"db"`
|
2024-07-25 23:00:09 +02:00
|
|
|
|
TimescaleDB bool `json:"timescaledb"`
|
|
|
|
|
LogLevel string `json:"log_level"`
|
|
|
|
|
TZ string `json:"timezone"`
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-05 17:37:50 +01:00
|
|
|
|
var db *sql.DB
|
2024-02-20 16:23:41 +01:00
|
|
|
|
var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
2024-07-25 23:00:09 +02:00
|
|
|
|
var config Config
|
|
|
|
|
|
|
|
|
|
func loadConfig() Config {
|
|
|
|
|
configFilePath := os.Getenv("CONFIG_FILE")
|
|
|
|
|
if configFilePath == "" {
|
|
|
|
|
configFilePath = "/data/options.json"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
data, err := os.ReadFile(configFilePath)
|
|
|
|
|
if err == nil {
|
|
|
|
|
// Successfully read the file, parse the JSON
|
|
|
|
|
err = json.Unmarshal(data, &config)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Fatalf("Error parsing config file: %v", err)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Load environment variables from .env file if it exists
|
|
|
|
|
if err := godotenv.Load(); err != nil {
|
|
|
|
|
logger.Info("No .env file found or error loading .env file (ignore this message when using container)", "error", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Fallback to environment variables
|
|
|
|
|
config.MQTTBroker = os.Getenv("MQTT_BROKER")
|
|
|
|
|
if config.MQTTBroker == "" {
|
|
|
|
|
log.Fatal("MQTT_BROKER environment variable is not set.")
|
|
|
|
|
}
|
|
|
|
|
config.MQTTTopic = os.Getenv("MQTT_TOPIC")
|
|
|
|
|
if config.MQTTTopic == "" {
|
|
|
|
|
log.Fatal("MQTT_TOPIC environment variable is not set.")
|
|
|
|
|
}
|
|
|
|
|
config.MQTTUser = os.Getenv("MQTT_USERNAME")
|
|
|
|
|
if config.MQTTUser == "" {
|
|
|
|
|
log.Fatal("MQTT_USERNAME environment variable is not set.")
|
|
|
|
|
}
|
|
|
|
|
config.MQTTPassword = os.Getenv("MQTT_PASSWORD")
|
|
|
|
|
if config.MQTTPassword == "" {
|
|
|
|
|
log.Fatal("MQTT_PASSWORD environment variable is not set.")
|
|
|
|
|
}
|
|
|
|
|
config.DB = os.Getenv("PG_DB")
|
|
|
|
|
if config.DB == "" {
|
|
|
|
|
log.Fatal("PG_DB environment variable is not set.")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
timescaleDBStr := os.Getenv("TIMESCALEDB_ENABLED")
|
|
|
|
|
if timescaleDBStr != "" {
|
|
|
|
|
timescaleDB, err := strconv.ParseBool(timescaleDBStr)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Fatalf("Error parsing TIMESCALEDB_ENABLED: %v", err)
|
|
|
|
|
}
|
|
|
|
|
config.TimescaleDB = timescaleDB
|
|
|
|
|
}
|
|
|
|
|
config.LogLevel = os.Getenv("LOG_LEVEL")
|
|
|
|
|
if config.DB == "" {
|
|
|
|
|
log.Fatal("LOG_LEVEL environment variable is not set.")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
config.TZ = os.Getenv("TZ")
|
|
|
|
|
if config.DB == "" {
|
|
|
|
|
log.Fatal("TZ environment variable is not set.")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return config
|
|
|
|
|
|
|
|
|
|
}
|
2023-12-05 17:37:50 +01:00
|
|
|
|
|
|
|
|
|
func main() {
|
2024-02-20 16:31:51 +01:00
|
|
|
|
// Initial logger setup
|
2024-02-20 16:05:33 +01:00
|
|
|
|
slog.SetDefault(logger)
|
|
|
|
|
|
2024-07-25 23:00:09 +02:00
|
|
|
|
loadConfig()
|
2023-12-05 17:37:50 +01:00
|
|
|
|
|
2024-02-20 16:31:51 +01:00
|
|
|
|
// Update the logger based on the LOG_LEVEL environment variable
|
|
|
|
|
logLevel := getLogLevelFromEnv(slog.LevelInfo) // Default to info level
|
|
|
|
|
logger = createLoggerWithLevel(logLevel)
|
|
|
|
|
|
|
|
|
|
// Example usage of the logger with the new log level
|
|
|
|
|
logger.Info("Logger initialized with dynamic log level")
|
|
|
|
|
|
2023-12-05 17:37:50 +01:00
|
|
|
|
// Connect to PostgreSQL
|
2024-07-25 23:00:09 +02:00
|
|
|
|
pgConnStr := config.DB
|
2023-12-06 13:51:14 +01:00
|
|
|
|
if err := connectToPostgreSQL(pgConnStr); err != nil {
|
2024-02-20 16:05:33 +01:00
|
|
|
|
log.Fatal("Error connecting to PostgreSQL", "error", err)
|
2023-12-05 17:37:50 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Initialize MQTT options
|
|
|
|
|
opts := mqtt.NewClientOptions()
|
2024-07-25 23:00:09 +02:00
|
|
|
|
opts.AddBroker(config.MQTTBroker)
|
|
|
|
|
opts.SetUsername(config.MQTTUser)
|
|
|
|
|
opts.SetPassword(config.MQTTPassword)
|
2024-02-17 16:54:24 +01:00
|
|
|
|
opts.SetAutoReconnect(true)
|
2024-02-17 18:56:09 +01:00
|
|
|
|
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
|
2024-02-20 16:19:01 +01:00
|
|
|
|
logger.Error("Connection lost", "error", err)
|
2024-02-17 18:56:09 +01:00
|
|
|
|
})
|
|
|
|
|
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
2024-07-25 23:00:09 +02:00
|
|
|
|
topic := config.MQTTTopic
|
2024-02-20 16:19:01 +01:00
|
|
|
|
logger.Info("Connected to MQTT broker, subscribing to topic...", "topic", topic)
|
2024-02-17 18:56:09 +01:00
|
|
|
|
if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil {
|
2024-02-20 16:19:01 +01:00
|
|
|
|
logger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error())
|
2024-02-17 18:56:09 +01:00
|
|
|
|
}
|
|
|
|
|
})
|
2023-12-05 17:37:50 +01:00
|
|
|
|
|
|
|
|
|
// Connect to MQTT broker
|
|
|
|
|
client := mqtt.NewClient(opts)
|
|
|
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
|
|
|
log.Fatal(token.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Keep the program running
|
|
|
|
|
select {}
|
|
|
|
|
}
|
|
|
|
|
|
2024-02-20 16:37:25 +01:00
|
|
|
|
// Helper function to map environment variable to slog.Level
|
|
|
|
|
func getLogLevelFromEnv(defaultLevel slog.Level) slog.Level {
|
2024-07-25 23:00:09 +02:00
|
|
|
|
logLevelStr := config.LogLevel
|
2024-02-20 16:37:25 +01:00
|
|
|
|
switch logLevelStr {
|
|
|
|
|
case "DEBUG":
|
|
|
|
|
return slog.LevelDebug
|
|
|
|
|
case "INFO":
|
|
|
|
|
return slog.LevelInfo
|
|
|
|
|
case "WARN":
|
|
|
|
|
return slog.LevelWarn
|
|
|
|
|
case "ERROR":
|
|
|
|
|
return slog.LevelError
|
|
|
|
|
default:
|
|
|
|
|
return defaultLevel
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Function to create a new logger with a specified log level
|
|
|
|
|
func createLoggerWithLevel(level slog.Level) *slog.Logger {
|
|
|
|
|
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
|
|
|
|
Level: level,
|
|
|
|
|
}))
|
|
|
|
|
}
|
2024-02-18 14:50:01 +01:00
|
|
|
|
|
2024-02-20 16:52:09 +01:00
|
|
|
|
// updateFieldIfChanged ensures that meter readings that haven't been updated aren't written to the database, in order to save storage space.
|
2024-02-22 12:18:11 +01:00
|
|
|
|
// If they haven't changed, they are set to nil, meaning they will be inserted as NULL into the database.
|
2024-02-18 17:56:06 +01:00
|
|
|
|
func updateFieldIfChanged(currentValue *int, previousValue *int) (*int, bool) {
|
2024-02-18 14:50:01 +01:00
|
|
|
|
if currentValue != nil && *currentValue == *previousValue {
|
2024-02-18 17:56:06 +01:00
|
|
|
|
return nil, false // No change
|
2024-02-18 14:50:01 +01:00
|
|
|
|
} else if currentValue != nil {
|
|
|
|
|
*previousValue = *currentValue // Update the previous value to the current one
|
2024-02-18 17:56:06 +01:00
|
|
|
|
return currentValue, true // Change occurred
|
2024-02-18 14:50:01 +01:00
|
|
|
|
}
|
2024-02-18 17:56:06 +01:00
|
|
|
|
return currentValue, false // Return the original value if it's nil, indicating no change
|
2024-02-18 14:50:01 +01:00
|
|
|
|
}
|
|
|
|
|
|
2024-02-18 18:02:07 +01:00
|
|
|
|
// safeDerefInt handles potential nil pointers to avoid a runtime panic
|
|
|
|
|
// log messages will display the actual integer values if the pointers are not nil,
|
|
|
|
|
// or "nil" if the pointers are nil.
|
|
|
|
|
func safeDerefInt(ptr *int) string {
|
|
|
|
|
if ptr != nil {
|
|
|
|
|
return fmt.Sprintf("%d", *ptr) // Dereference the pointer to get the value
|
|
|
|
|
}
|
|
|
|
|
return "nil" // Return a string indicating the value is nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-02-22 12:18:11 +01:00
|
|
|
|
var prevDt1, prevDt2, prevRt1, prevRt2, prevG, prevF, prevFl int
|
2024-07-26 22:02:47 +02:00
|
|
|
|
var prevSa1, prevSa2, prevSa3, prevSw1, prevSw2, prevSw3 int
|
2024-02-22 12:18:11 +01:00
|
|
|
|
var prevD, prevR int
|
|
|
|
|
var prevD1, prevD2, prevD3, prevR1, prevR2, prevR3 int
|
2024-07-26 22:02:47 +02:00
|
|
|
|
var prevGBe, prevCAQDBe, prevCMMQDBe, prevTMMQDBe int
|
|
|
|
|
var prevDLu, prevRLu int
|
2024-02-20 16:37:25 +01:00
|
|
|
|
|
2023-12-05 17:37:50 +01:00
|
|
|
|
func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
|
|
|
|
|
// Parse JSON payload
|
|
|
|
|
var payload Payload
|
|
|
|
|
err := json.Unmarshal(msg.Payload(), &payload)
|
|
|
|
|
if err != nil {
|
2024-02-20 16:19:01 +01:00
|
|
|
|
logger.Error("Error parsing MQTT payload", "error", err)
|
2023-12-05 17:37:50 +01:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Parse timestamp to time.Time
|
2024-02-20 16:44:57 +01:00
|
|
|
|
timestamp, err := parseDSMRTimestamp(payload.T)
|
2023-12-05 17:37:50 +01:00
|
|
|
|
if err != nil {
|
2024-02-20 16:19:01 +01:00
|
|
|
|
logger.Error("Error parsing timestamp", "error", err)
|
2023-12-05 17:37:50 +01:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2024-02-18 17:56:06 +01:00
|
|
|
|
var changed bool // Flag to track if any value changed
|
|
|
|
|
|
|
|
|
|
// Update each field, directly updating `changed` if any change is detected
|
|
|
|
|
var tempChanged bool // Used to capture the change status for each field
|
2024-02-22 12:18:11 +01:00
|
|
|
|
// Electricity meter readings
|
2024-02-18 17:56:06 +01:00
|
|
|
|
payload.Dt1, tempChanged = updateFieldIfChanged(payload.Dt1, &prevDt1)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
|
|
|
|
payload.Dt2, tempChanged = updateFieldIfChanged(payload.Dt2, &prevDt2)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
|
|
|
|
payload.Rt1, tempChanged = updateFieldIfChanged(payload.Rt1, &prevRt1)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
|
|
|
|
payload.Rt2, tempChanged = updateFieldIfChanged(payload.Rt2, &prevRt2)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
2024-02-22 12:18:11 +01:00
|
|
|
|
// Faults
|
|
|
|
|
payload.F, tempChanged = updateFieldIfChanged(payload.F, &prevF)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
|
|
|
|
payload.Fl, tempChanged = updateFieldIfChanged(payload.Fl, &prevFl)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
2024-07-26 22:02:47 +02:00
|
|
|
|
// Sags
|
|
|
|
|
payload.Sa1, tempChanged = updateFieldIfChanged(payload.Sa1, &prevSa1)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.Sa2, tempChanged = updateFieldIfChanged(payload.Sa2, &prevSa2)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.Sa3, tempChanged = updateFieldIfChanged(payload.Sa3, &prevSa3)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
|
|
|
|
// Swells
|
|
|
|
|
payload.Sw1, tempChanged = updateFieldIfChanged(payload.Sw1, &prevSw1)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.Sw2, tempChanged = updateFieldIfChanged(payload.Sw2, &prevSw2)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.Sw3, tempChanged = updateFieldIfChanged(payload.Sw3, &prevSw3)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
2024-02-22 12:18:11 +01:00
|
|
|
|
// Gas
|
2024-02-18 17:56:06 +01:00
|
|
|
|
payload.G, tempChanged = updateFieldIfChanged(payload.G, &prevG)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
2024-02-22 12:18:11 +01:00
|
|
|
|
// D, R
|
|
|
|
|
payload.D, tempChanged = updateFieldIfChanged(payload.D, &prevD)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.R, tempChanged = updateFieldIfChanged(payload.R, &prevR)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
|
|
|
|
// D1-3
|
|
|
|
|
payload.D1, tempChanged = updateFieldIfChanged(payload.D1, &prevD1)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.D2, tempChanged = updateFieldIfChanged(payload.D2, &prevD2)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.D3, tempChanged = updateFieldIfChanged(payload.D3, &prevD3)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
|
|
|
|
// R1-3
|
|
|
|
|
payload.R1, tempChanged = updateFieldIfChanged(payload.R1, &prevR1)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.R2, tempChanged = updateFieldIfChanged(payload.R2, &prevR2)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.R3, tempChanged = updateFieldIfChanged(payload.R3, &prevR3)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
2024-07-26 22:02:47 +02:00
|
|
|
|
// Belgium
|
|
|
|
|
payload.GBe, tempChanged = updateFieldIfChanged(payload.GBe, &prevGBe)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.CAQDBe, tempChanged = updateFieldIfChanged(payload.CAQDBe, &prevCAQDBe)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.CMMQDBe, tempChanged = updateFieldIfChanged(payload.CMMQDBe, &prevCMMQDBe)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.TMMQDBe, tempChanged = updateFieldIfChanged(payload.TMMQDBe, &prevTMMQDBe)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
|
|
|
|
// Luxembourg
|
|
|
|
|
payload.DLu, tempChanged = updateFieldIfChanged(payload.DLu, &prevDLu)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
payload.RLu, tempChanged = updateFieldIfChanged(payload.RLu, &prevRLu)
|
|
|
|
|
changed = changed || tempChanged
|
|
|
|
|
|
2024-02-18 17:56:06 +01:00
|
|
|
|
// If any value has changed, log all the relevant values
|
|
|
|
|
if changed {
|
2024-02-20 16:19:01 +01:00
|
|
|
|
logger.Debug("Values changed",
|
|
|
|
|
"dt1", safeDerefInt(payload.Dt1),
|
|
|
|
|
"dt2", safeDerefInt(payload.Dt2),
|
|
|
|
|
"rt1", safeDerefInt(payload.Rt1),
|
|
|
|
|
"rt2", safeDerefInt(payload.Rt2),
|
2024-02-22 12:18:11 +01:00
|
|
|
|
|
|
|
|
|
"d", safeDerefInt(payload.D),
|
|
|
|
|
"r", safeDerefInt(payload.R),
|
|
|
|
|
|
|
|
|
|
"f", safeDerefInt(payload.F),
|
|
|
|
|
"fl", safeDerefInt(payload.Fl),
|
2024-07-26 22:02:47 +02:00
|
|
|
|
|
|
|
|
|
"sa1", safeDerefInt(payload.Sa1),
|
|
|
|
|
"sa2", safeDerefInt(payload.Sa2),
|
|
|
|
|
"sa3", safeDerefInt(payload.Sa3),
|
|
|
|
|
|
|
|
|
|
"sw1", safeDerefInt(payload.Sw1),
|
|
|
|
|
"sw2", safeDerefInt(payload.Sw2),
|
|
|
|
|
"sw3", safeDerefInt(payload.Sw3),
|
|
|
|
|
|
2024-02-22 12:18:11 +01:00
|
|
|
|
"g", safeDerefInt(payload.G),
|
|
|
|
|
|
|
|
|
|
"d1", safeDerefInt(payload.D1),
|
|
|
|
|
"d2", safeDerefInt(payload.D2),
|
|
|
|
|
"d3", safeDerefInt(payload.D3),
|
|
|
|
|
|
|
|
|
|
"r1", safeDerefInt(payload.R1),
|
|
|
|
|
"r2", safeDerefInt(payload.R2),
|
|
|
|
|
"r3", safeDerefInt(payload.R3),
|
2024-07-26 22:02:47 +02:00
|
|
|
|
|
|
|
|
|
"gbe", safeDerefInt(payload.GBe),
|
|
|
|
|
"caqdbe", safeDerefInt(payload.CAQDBe),
|
|
|
|
|
"cmmqdbe", safeDerefInt(payload.CMMQDBe),
|
|
|
|
|
"tmmqdbe", safeDerefInt(payload.TMMQDBe),
|
|
|
|
|
|
|
|
|
|
"dlu", safeDerefInt(payload.DLu),
|
|
|
|
|
"rlu", safeDerefInt(payload.RLu),
|
2024-02-22 12:18:11 +01:00
|
|
|
|
)
|
2024-02-18 17:56:06 +01:00
|
|
|
|
}
|
2023-12-05 17:37:50 +01:00
|
|
|
|
// Insert data into PostgreSQL
|
|
|
|
|
err = insertData(timestamp, payload)
|
2024-02-20 16:44:57 +01:00
|
|
|
|
logger.Debug("Inserting values",
|
|
|
|
|
"t", payload.T,
|
|
|
|
|
"dt1", payload.Dt1,
|
|
|
|
|
"dt2", payload.Dt2,
|
|
|
|
|
"rt1", payload.Rt1,
|
|
|
|
|
"rt2", payload.Rt2,
|
|
|
|
|
"d", payload.D,
|
|
|
|
|
"r", payload.R,
|
|
|
|
|
"f", payload.F,
|
|
|
|
|
"fl", payload.Fl,
|
|
|
|
|
"g", payload.G,
|
|
|
|
|
"v1", payload.V1,
|
|
|
|
|
"v2", payload.V2,
|
|
|
|
|
"v3", payload.V3,
|
|
|
|
|
"c1", payload.C1,
|
|
|
|
|
"c2", payload.C2,
|
|
|
|
|
"c3", payload.C3,
|
|
|
|
|
"d1", payload.D1,
|
|
|
|
|
"d2", payload.D2,
|
|
|
|
|
"d3", payload.D3,
|
|
|
|
|
"r1", payload.R1,
|
|
|
|
|
"r2", payload.R2,
|
2024-07-26 22:02:47 +02:00
|
|
|
|
"r3", payload.R3,
|
|
|
|
|
|
|
|
|
|
"gbe", payload.GBe,
|
|
|
|
|
"caqdbe", payload.CAQDBe,
|
|
|
|
|
"cmmqdbe", payload.CMMQDBe,
|
|
|
|
|
"tmmqdbe", payload.TMMQDBe,
|
|
|
|
|
|
|
|
|
|
"dlu", payload.DLu,
|
|
|
|
|
"rlu", payload.RLu,
|
|
|
|
|
)
|
|
|
|
|
|
2023-12-05 17:37:50 +01:00
|
|
|
|
if err != nil {
|
2024-02-20 16:19:01 +01:00
|
|
|
|
logger.Error("Error inserting data into PostgreSQL", "error", err)
|
2023-12-05 17:37:50 +01:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-02-20 16:54:22 +01:00
|
|
|
|
// parseDSMRTimestamp parses the timestamp as emitted by the P1 meter
|
|
|
|
|
// (YYMMDDhhmmssX, where X is S or W for summer- or wintertime).
|
|
|
|
|
//
|
|
|
|
|
// More information:
|
|
|
|
|
// https://github.com/matthijskooijman/arduino-dsmr/blob/master/README.md
|
|
|
|
|
// https://srolija.medium.com/gos-summer-time-localization-issues-4c8ab702806b
|
|
|
|
|
// https://github.com/thomasvnl/P1DSMRReader-ESPHome
|
|
|
|
|
// https://pkg.go.dev/github.com/mijnverbruik/dsmr#Timestamp.Position
|
|
|
|
|
// https://pkg.go.dev/github.com/mijnverbruik/dsmr#Timestamp
|
2024-02-20 16:44:57 +01:00
|
|
|
|
func parseDSMRTimestamp(t string) (time.Time, error) {
|
2023-12-05 17:37:50 +01:00
|
|
|
|
|
|
|
|
|
// Extract values from timestamp string
|
|
|
|
|
year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0')
|
|
|
|
|
hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0')
|
|
|
|
|
|
|
|
|
|
// Load location for "Europe/Amsterdam" time zone
|
|
|
|
|
loc, err := time.LoadLocation("Europe/Amsterdam")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return time.Time{}, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create and return the timestamp
|
|
|
|
|
return time.Date(year, month, day, hour, min, sec, 0, loc), nil
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func insertData(timestamp time.Time, payload Payload) error {
|
|
|
|
|
// Prepare SQL statement
|
|
|
|
|
stmt := `
|
|
|
|
|
INSERT INTO p1 (
|
|
|
|
|
timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2,
|
2024-07-26 22:02:47 +02:00
|
|
|
|
delivery_all, returning_all,
|
|
|
|
|
failures, long_failures,
|
|
|
|
|
sags_l1, sags_l2, sags_l3,
|
|
|
|
|
swells_l1, swells_l2, swells_l3,
|
|
|
|
|
gas,
|
2023-12-05 17:37:50 +01:00
|
|
|
|
voltage_l1, voltage_l2, voltage_l3,
|
|
|
|
|
current_l1, current_l2, current_l3,
|
|
|
|
|
delivery_l1, delivery_l2, delivery_l3,
|
|
|
|
|
returning_l1, returning_l2, returning_l3
|
2024-07-26 22:02:47 +02:00
|
|
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28)
|
2023-12-05 17:37:50 +01:00
|
|
|
|
`
|
|
|
|
|
_, err := db.Exec(
|
|
|
|
|
stmt,
|
|
|
|
|
timestamp,
|
|
|
|
|
payload.Dt1, payload.Dt2,
|
|
|
|
|
payload.Rt1, payload.Rt2,
|
|
|
|
|
payload.D, payload.R,
|
|
|
|
|
payload.F, payload.Fl,
|
2024-07-26 22:02:47 +02:00
|
|
|
|
payload.Sa1, payload.Sa2, payload.Sa3,
|
|
|
|
|
payload.Sw1, payload.Sw2, payload.Sw3,
|
2023-12-05 17:37:50 +01:00
|
|
|
|
payload.G,
|
|
|
|
|
payload.V1, payload.V2, payload.V3,
|
|
|
|
|
payload.C1, payload.C2, payload.C3,
|
|
|
|
|
payload.D1, payload.D2, payload.D3,
|
|
|
|
|
payload.R1, payload.R2, payload.R3,
|
|
|
|
|
)
|
|
|
|
|
return err
|
|
|
|
|
}
|
2023-12-06 13:51:14 +01:00
|
|
|
|
|
|
|
|
|
func connectToPostgreSQL(pgConnStr string) error {
|
|
|
|
|
// Connect to PostgreSQL
|
|
|
|
|
var err error
|
|
|
|
|
for {
|
|
|
|
|
db, err = sql.Open("postgres", pgConnStr)
|
|
|
|
|
if err == nil {
|
|
|
|
|
break // Successfully connected
|
|
|
|
|
}
|
|
|
|
|
|
2024-02-20 16:19:01 +01:00
|
|
|
|
logger.Error("Error connecting to PostgreSQL", "error", err)
|
2023-12-06 13:51:14 +01:00
|
|
|
|
time.Sleep(5 * time.Second) // Retry after 5 seconds
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-26 22:02:47 +02:00
|
|
|
|
// Perform DB migrations
|
2024-07-26 22:29:05 +02:00
|
|
|
|
err = migrateFS(db, migrations.FS, ".")
|
2023-12-06 13:51:14 +01:00
|
|
|
|
if err != nil {
|
2024-07-26 22:02:47 +02:00
|
|
|
|
log.Fatal(err)
|
2023-12-06 13:51:14 +01:00
|
|
|
|
}
|
|
|
|
|
|
2024-07-25 23:00:09 +02:00
|
|
|
|
if config.TimescaleDB {
|
2024-02-20 18:04:30 +01:00
|
|
|
|
// Enable TimescaleDB
|
|
|
|
|
_, err = db.Exec(`
|
2024-02-21 13:23:35 +01:00
|
|
|
|
SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
|
2024-02-20 18:04:30 +01:00
|
|
|
|
`)
|
|
|
|
|
if err != nil {
|
2024-03-20 00:42:34 +01:00
|
|
|
|
log.Fatal("Error creating TimescaleDB hypertable:", err)
|
2024-02-20 18:04:30 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-06 13:51:14 +01:00
|
|
|
|
return nil
|
|
|
|
|
}
|
2024-07-26 22:02:47 +02:00
|
|
|
|
|
2024-07-26 22:29:05 +02:00
|
|
|
|
func migrate(db *sql.DB, dir string) error {
|
2024-07-26 22:02:47 +02:00
|
|
|
|
err := goose.SetDialect("postgres")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("migrate: %w", err)
|
|
|
|
|
}
|
|
|
|
|
err = goose.Up(db, dir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("migrate: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-26 22:29:05 +02:00
|
|
|
|
func migrateFS(db *sql.DB, migrationFS fs.FS, dir string) error {
|
2024-07-26 22:02:47 +02:00
|
|
|
|
// In case the dir is an empty string, they probably meant the current directory and goose wants a period for that.
|
|
|
|
|
if dir == "" {
|
|
|
|
|
dir = "."
|
|
|
|
|
}
|
|
|
|
|
goose.SetBaseFS(migrationFS)
|
|
|
|
|
defer func() {
|
|
|
|
|
// Ensure that we remove the FS on the off chance some other part of our app uses goose for migrations and doesn't want to use our FS.
|
|
|
|
|
goose.SetBaseFS(nil)
|
|
|
|
|
}()
|
2024-07-26 22:29:05 +02:00
|
|
|
|
return migrate(db, dir)
|
2024-07-26 22:02:47 +02:00
|
|
|
|
}
|