From bfffaa233d11c37bb05f94b836bfa376092c6322 Mon Sep 17 00:00:00 2001 From: Pieter Hollander Date: Wed, 21 Feb 2024 18:56:29 +0100 Subject: [PATCH] Cleanup TODO, add documentation, add slog, change SQL for TimescaleDB compatibility. Split hints into hints table. --- main.go | 150 ++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 92 insertions(+), 58 deletions(-) diff --git a/main.go b/main.go index 1b4f925..743fff7 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,11 @@ -// TODO: Remove inverters_DC Name and periodically / on request query name only once. // TODO: Storage optimisation: Map inverter serial to shorter serial. Use that for referring. // TODO: Use username and password provided using Basic Authentication. -// TODO: Make the timestamp in log a foreign key to other DBs so timescaleDB hypertables can be created. -// Make it unique so foreign key constraints can be used. -// Create separate table for inverter_id, dc_number, name and store it there. -// Do the same for string names. +// TODO: Record Inverter struct data only on-change. // Idea: Make a full admin / config GUI and only configure through this utility. // Idea: Gather settings only on start-up. - +// TODO: handleMessage: Support older data age than 0, due to new OpenDTU WebSocket implementation. +// TODO: Only update meter readings such as yieldday, yieldtotal on-change. +// TODO: DB migrations. package main import ( @@ -15,6 +13,7 @@ import ( "encoding/json" "fmt" "log" + "log/slog" "net/http" "os" "time" @@ -24,12 +23,15 @@ import ( _ "github.com/lib/pq" ) +// VUD contains three variables used for most metrics sent by OpenDTU: +// Value, unit and decimal point accuracy. type VUD struct { - V float64 `json:"v"` - U string `json:"u"` - D int `json:"d"` + V float64 `json:"v"` // Value + U string `json:"u"` // Unit + D int `json:"d"` // Decimals } +// InverterAC stores AC generation metrics per inverter. type InverterAC struct { Power VUD `json:"Power"` Voltage VUD `json:"Voltage"` @@ -39,6 +41,7 @@ type InverterAC struct { ReactivePower VUD `json:"ReactivePower"` } +// InverterDC stores DC generation metrics per string (a string is usually 1 solar panel) type InverterDC struct { Name struct { U string `json:"u"` @@ -56,6 +59,7 @@ type InverterDC struct { } } +// InverterINV stores aggregated metrics for each inverter type InverterINV struct { Temperature VUD `json:"Temperature"` Efficiency VUD `json:"Efficiency"` @@ -138,8 +142,13 @@ type InverterSettingsData struct { Inverters []InverterSettings `json:"inverter"` } +var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) + // Main program func main() { + // Initial logger setup + slog.SetDefault(logger) + dbConnStr := (os.Getenv("DB_URL")) // Connect to PostgreSQL db, err := sql.Open("postgres", dbConnStr) @@ -175,7 +184,7 @@ func main() { for { _, message, err := c.ReadMessage() if err != nil { - log.Println(err) + logger.Error("Error reading WebSocket message", "error", err) return } @@ -197,7 +206,7 @@ func handleMessage(message []byte, db *sql.DB) { // Parse the JSON message into the LiveData struct if err := json.Unmarshal(message, &liveData); err != nil { - log.Println("Error decoding JSON:", err) + logger.Error("Error decoding JSON", "error", err) return } @@ -207,7 +216,7 @@ func handleMessage(message []byte, db *sql.DB) { // Query the endpoint for events events, err := queryEventsEndpoint(inverter.Serial) if err != nil { - log.Println("Error querying events endpoint:", err) + logger.Error("Error querying events endpoint", "error", err) continue } @@ -218,30 +227,31 @@ func handleMessage(message []byte, db *sql.DB) { if inverter.DataAge == 0 && inverter.Reachable { // Insert data into PostgreSQL tables insertLiveData(db, inverter, liveData.Total, liveData.Hints) - fmt.Println("inserting data") + logger.Debug("Inserting data") } } } func createTables(db *sql.DB) { // Execute SQL statements to create tables if they don't exist + // inverter_serial is TEXT as some non-Hoymiles inverters use non-numeric serial numbers. + // An additional advantage is that it makes plotting in Grafana easier. + // TODO: Foreign keys commented out as TimescaleDB hypertables don't support them. createTableSQL := ` - CREATE TABLE IF NOT EXISTS log ( - id SERIAL PRIMARY KEY, + CREATE TABLE IF NOT EXISTS dtu_log ( + -- id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ UNIQUE DEFAULT CURRENT_TIMESTAMP, + -- timestamp TIMESTAMPTZ, power NUMERIC, yieldday NUMERIC, - yieldtotal NUMERIC, - time_sync BOOL, - radio_problem BOOL, - default_password BOOL + yieldtotal NUMERIC ); CREATE TABLE IF NOT EXISTS inverters ( - id SERIAL PRIMARY KEY, + -- id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, - FOREIGN KEY (timestamp) REFERENCES log(timestamp), - inverter_serial BIGINT, + -- FOREIGN KEY (timestamp) REFERENCES dtu_log(timestamp), + inverter_serial TEXT, name TEXT, producing BOOL, limit_relative NUMERIC, @@ -249,10 +259,10 @@ func createTables(db *sql.DB) { ); CREATE TABLE IF NOT EXISTS inverters_ac ( - id SERIAL PRIMARY KEY, + -- id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, - FOREIGN KEY (timestamp) REFERENCES log(timestamp), - inverter_serial BIGINT, + -- FOREIGN KEY (timestamp) REFERENCES dtu_log(timestamp), + inverter_serial TEXT, ac_number INT, power NUMERIC, voltage NUMERIC, @@ -263,10 +273,10 @@ func createTables(db *sql.DB) { ); CREATE TABLE IF NOT EXISTS inverters_dc ( - id SERIAL PRIMARY KEY, + -- id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, - FOREIGN KEY (timestamp) REFERENCES log(timestamp), - inverter_serial BIGINT, + -- FOREIGN KEY (timestamp) REFERENCES dtu_log(timestamp), + inverter_serial TEXT, dc_number INT, name TEXT, power NUMERIC, @@ -278,10 +288,10 @@ func createTables(db *sql.DB) { ); CREATE TABLE IF NOT EXISTS inverters_inv ( - id SERIAL PRIMARY KEY, + -- id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, - FOREIGN KEY (timestamp) REFERENCES log(timestamp), - inverter_serial BIGINT, + -- FOREIGN KEY (timestamp) REFERENCES dtu_log(timestamp), + inverter_serial TEXT, temperature NUMERIC, power_dc NUMERIC, yieldday NUMERIC, @@ -290,21 +300,33 @@ func createTables(db *sql.DB) { ); CREATE TABLE IF NOT EXISTS events ( - id SERIAL PRIMARY KEY, + -- id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - inverter_serial BIGINT, + inverter_serial TEXT, message_id INT, message TEXT, start_time INT, end_time INT ); + + CREATE TABLE IF NOT EXISTS dtu_hints ( + -- id SERIAL PRIMARY KEY, + timestamp TIMESTAMPTZ, + -- FOREIGN KEY (timestamp) REFERENCES dtu_log(timestamp), + time_sync BOOL, + radio_problem BOOL, + default_password BOOL + ); - CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON log (timestamp); - CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters (timestamp); - CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_ac (timestamp); - CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_dc (timestamp); - CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_inv (timestamp); - CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON events (timestamp); + -- UNIQUE here doesn't work, as e.g. one timestamp may be valid for multiple string data. + CREATE INDEX IF NOT EXISTS dtu_log_timestamp_idx ON dtu_log (timestamp); + CREATE INDEX IF NOT EXISTS inverters_timestamp_idx ON inverters (timestamp); + CREATE INDEX IF NOT EXISTS inverters_ac_timestamp_idx ON inverters_ac (timestamp); + CREATE INDEX IF NOT EXISTS inverters_dc_timestamp_idx ON inverters_dc (timestamp); + CREATE INDEX IF NOT EXISTS inverters_inv_timestamp_idx ON inverters_inv (timestamp); + CREATE INDEX IF NOT EXISTS events_timestamp_idx ON events (timestamp); + CREATE INDEX IF NOT EXISTS dtu_hints_timestamp_idx ON dtu_hints (timestamp); + ` _, err := db.Exec(createTableSQL) @@ -314,13 +336,14 @@ func createTables(db *sql.DB) { timescaleEnabled := os.Getenv("TIMESCALEDB_ENABLED") enableTimescaleDB := ` - CREATE EXTENSION IF NOT EXISTS timescaledb; - SELECT create_hypertable('log', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); + -- CREATE EXTENSION IF NOT EXISTS timescaledb; + SELECT create_hypertable('dtu_log', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters_ac', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters_dc', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters_inv', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('events', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); + SELECT create_hypertable('dtu_hints', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); ` if timescaleEnabled == "true" { _, err := db.Exec(enableTimescaleDB) @@ -337,21 +360,22 @@ func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) { // Insert data into log table _, err := db.Exec(` - INSERT INTO log (timestamp, power, yieldday, yieldtotal, time_sync, radio_problem, default_password) - VALUES ($1, $2, $3, $4, $5, $6, $7); - `, timestamp, total.Power.V, total.YieldDay.V, total.YieldTotal.V, hints.TimeSync, hints.RadioProblem, hints.DefaultPassword) + INSERT INTO dtu_log (timestamp, power, yieldday, yieldtotal) + VALUES ($1, $2, $3, $4); + `, timestamp, total.Power.V, total.YieldDay.V, total.YieldTotal.V) if err != nil { - log.Println("Error inserting into log table:", err) + logger.Error("Error inserting into log table", "error", err) return } // Get the log ID of the inserted record - var logID int - err = db.QueryRow("SELECT id FROM log WHERE timestamp = $1", timestamp).Scan(&logID) - if err != nil { - log.Println("Error getting log ID:", err) - return - } + // NOT IN USE: TimescaleDB doesn't support it. + // var logID int + // err = db.QueryRow("SELECT id FROM dtu_log WHERE timestamp = $1", timestamp).Scan(&logID) + // if err != nil { + // logger.Error("Error getting dtu_log ID", "error", err) + // return + // } // Insert data into inverters table _, err = db.Exec(` @@ -359,18 +383,18 @@ func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) { VALUES ($1, $2, $3, $4, $5, $6); `, timestamp, inverter.Serial, inverter.Name, inverter.Producing, inverter.LimitRelative, inverter.LimitAbsolute) if err != nil { - log.Println("Error inserting into inverters table:", err) + logger.Error("Error inserting into inverters table", "error", err) return } // Insert data into inverters_ac table for acNumber, acData := range inverter.AC { _, err := db.Exec(` - INSERT INTO inverters_ac (timestamp, inverter_serial, ac_number, power, voltage, current, frequency, powerfactor, reactivepower, efficiency) + INSERT INTO inverters_ac (timestamp, inverter_serial, ac_number, power, voltage, current, frequency, powerfactor, reactivepower) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9); `, timestamp, inverter.Serial, acNumber, acData.Power.V, acData.Voltage.V, acData.Current.V, acData.Frequency.V, acData.PowerFactor.V, acData.ReactivePower.V) if err != nil { - log.Printf("Error inserting into inverters_ac table for AC %s: %v\n", acNumber, err) + logger.Error("Error inserting into inverters_ac table for AC", "error", "acNumber", acNumber, err) } } @@ -395,6 +419,16 @@ func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) { log.Printf("Error inserting into inverters_inv table for INV %s: %v\n", invNumber, err) } } + // Insert data into hints table + _, err = db.Exec(` + INSERT INTO dtu_hints (timestamp, time_sync, radio_problem, default_password) + VALUES ($1, $2, $3, $4); + `, timestamp, hints.TimeSync, hints.RadioProblem, hints.DefaultPassword) + if err != nil { + logger.Error("Error inserting into log table", "error", err) + return + } + } func queryEventsEndpoint(inverterSerial string) (*EventsResponse, error) { @@ -419,7 +453,7 @@ func getPreviousEventsCount(db *sql.DB, inverterSerial string) int { var count int err := db.QueryRow("SELECT COUNT(*) FROM events WHERE inverter_serial = $1", inverterSerial).Scan(&count) if err != nil && err != sql.ErrNoRows { - log.Println("Error querying previous events count:", err) + logger.Error("Error querying previous events count", "error", err) } return count } @@ -434,7 +468,7 @@ func insertEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { VALUES ($1, $2, $3, $4, $5, $6); `, timestamp, inverterSerial, event.MessageID, event.Message, event.StartTime, event.EndTime) if err != nil { - log.Println("Error inserting into events table:", err) + logger.Error("Error inserting into events table", "error", err) } } @@ -445,7 +479,7 @@ func insertEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { time.Sleep(30 * time.Minute) updatedEvents, err := queryEventsEndpoint(inverterSerial) if err != nil { - log.Println("Error querying events endpoint for updates:", err) + logger.Error("Error querying events endpoint for updates", "error", err) continue } @@ -463,7 +497,7 @@ func updateEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { UPDATE events SET end_time = $1 WHERE inverter_serial = $2 AND start_time = $3 AND end_time = 0; `, event.EndTime, inverterSerial, event.StartTime) if err != nil { - log.Println("Error updating events table:", err) + logger.Error("Error updating events table", "error", err) } } }