Cleanup TODO, add documentation, add slog, change SQL for TimescaleDB compatibility. Split hints into hints table.
All checks were successful
Build Docker image / build (push) Successful in 1m11s

This commit is contained in:
Pieter Hollander 2024-02-21 18:56:29 +01:00
parent 3c9eb5d332
commit bfffaa233d
Signed by: pieter
SSH key fingerprint: SHA256:HbX+9cBXsop9SuvL+mELd29sK+7DehFfdVweFVDtMSg

150
main.go
View file

@ -1,13 +1,11 @@
// TODO: Remove inverters_DC Name and periodically / on request query name only once.
// TODO: Storage optimisation: Map inverter serial to shorter serial. Use that for referring. // TODO: Storage optimisation: Map inverter serial to shorter serial. Use that for referring.
// TODO: Use username and password provided using Basic Authentication. // TODO: Use username and password provided using Basic Authentication.
// TODO: Make the timestamp in log a foreign key to other DBs so timescaleDB hypertables can be created. // TODO: Record Inverter struct data only on-change.
// Make it unique so foreign key constraints can be used.
// Create separate table for inverter_id, dc_number, name and store it there.
// Do the same for string names.
// Idea: Make a full admin / config GUI and only configure through this utility. // Idea: Make a full admin / config GUI and only configure through this utility.
// Idea: Gather settings only on start-up. // Idea: Gather settings only on start-up.
// TODO: handleMessage: Support older data age than 0, due to new OpenDTU WebSocket implementation.
// TODO: Only update meter readings such as yieldday, yieldtotal on-change.
// TODO: DB migrations.
package main package main
import ( import (
@ -15,6 +13,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"log/slog"
"net/http" "net/http"
"os" "os"
"time" "time"
@ -24,12 +23,15 @@ import (
_ "github.com/lib/pq" _ "github.com/lib/pq"
) )
// VUD contains three variables used for most metrics sent by OpenDTU:
// Value, unit and decimal point accuracy.
type VUD struct { type VUD struct {
V float64 `json:"v"` V float64 `json:"v"` // Value
U string `json:"u"` U string `json:"u"` // Unit
D int `json:"d"` D int `json:"d"` // Decimals
} }
// InverterAC stores AC generation metrics per inverter.
type InverterAC struct { type InverterAC struct {
Power VUD `json:"Power"` Power VUD `json:"Power"`
Voltage VUD `json:"Voltage"` Voltage VUD `json:"Voltage"`
@ -39,6 +41,7 @@ type InverterAC struct {
ReactivePower VUD `json:"ReactivePower"` ReactivePower VUD `json:"ReactivePower"`
} }
// InverterDC stores DC generation metrics per string (a string is usually 1 solar panel)
type InverterDC struct { type InverterDC struct {
Name struct { Name struct {
U string `json:"u"` U string `json:"u"`
@ -56,6 +59,7 @@ type InverterDC struct {
} }
} }
// InverterINV stores aggregated metrics for each inverter
type InverterINV struct { type InverterINV struct {
Temperature VUD `json:"Temperature"` Temperature VUD `json:"Temperature"`
Efficiency VUD `json:"Efficiency"` Efficiency VUD `json:"Efficiency"`
@ -138,8 +142,13 @@ type InverterSettingsData struct {
Inverters []InverterSettings `json:"inverter"` Inverters []InverterSettings `json:"inverter"`
} }
var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
// Main program // Main program
func main() { func main() {
// Initial logger setup
slog.SetDefault(logger)
dbConnStr := (os.Getenv("DB_URL")) dbConnStr := (os.Getenv("DB_URL"))
// Connect to PostgreSQL // Connect to PostgreSQL
db, err := sql.Open("postgres", dbConnStr) db, err := sql.Open("postgres", dbConnStr)
@ -175,7 +184,7 @@ func main() {
for { for {
_, message, err := c.ReadMessage() _, message, err := c.ReadMessage()
if err != nil { if err != nil {
log.Println(err) logger.Error("Error reading WebSocket message", "error", err)
return return
} }
@ -197,7 +206,7 @@ func handleMessage(message []byte, db *sql.DB) {
// Parse the JSON message into the LiveData struct // Parse the JSON message into the LiveData struct
if err := json.Unmarshal(message, &liveData); err != nil { if err := json.Unmarshal(message, &liveData); err != nil {
log.Println("Error decoding JSON:", err) logger.Error("Error decoding JSON", "error", err)
return return
} }
@ -207,7 +216,7 @@ func handleMessage(message []byte, db *sql.DB) {
// Query the endpoint for events // Query the endpoint for events
events, err := queryEventsEndpoint(inverter.Serial) events, err := queryEventsEndpoint(inverter.Serial)
if err != nil { if err != nil {
log.Println("Error querying events endpoint:", err) logger.Error("Error querying events endpoint", "error", err)
continue continue
} }
@ -218,30 +227,31 @@ func handleMessage(message []byte, db *sql.DB) {
if inverter.DataAge == 0 && inverter.Reachable { if inverter.DataAge == 0 && inverter.Reachable {
// Insert data into PostgreSQL tables // Insert data into PostgreSQL tables
insertLiveData(db, inverter, liveData.Total, liveData.Hints) insertLiveData(db, inverter, liveData.Total, liveData.Hints)
fmt.Println("inserting data") logger.Debug("Inserting data")
} }
} }
} }
func createTables(db *sql.DB) { func createTables(db *sql.DB) {
// Execute SQL statements to create tables if they don't exist // Execute SQL statements to create tables if they don't exist
// inverter_serial is TEXT as some non-Hoymiles inverters use non-numeric serial numbers.
// An additional advantage is that it makes plotting in Grafana easier.
// TODO: Foreign keys commented out as TimescaleDB hypertables don't support them.
createTableSQL := ` createTableSQL := `
CREATE TABLE IF NOT EXISTS log ( CREATE TABLE IF NOT EXISTS dtu_log (
id SERIAL PRIMARY KEY, -- id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ UNIQUE DEFAULT CURRENT_TIMESTAMP, timestamp TIMESTAMPTZ UNIQUE DEFAULT CURRENT_TIMESTAMP,
-- timestamp TIMESTAMPTZ,
power NUMERIC, power NUMERIC,
yieldday NUMERIC, yieldday NUMERIC,
yieldtotal NUMERIC, yieldtotal NUMERIC
time_sync BOOL,
radio_problem BOOL,
default_password BOOL
); );
CREATE TABLE IF NOT EXISTS inverters ( CREATE TABLE IF NOT EXISTS inverters (
id SERIAL PRIMARY KEY, -- id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ, timestamp TIMESTAMPTZ,
FOREIGN KEY (timestamp) REFERENCES log(timestamp), -- FOREIGN KEY (timestamp) REFERENCES dtu_log(timestamp),
inverter_serial BIGINT, inverter_serial TEXT,
name TEXT, name TEXT,
producing BOOL, producing BOOL,
limit_relative NUMERIC, limit_relative NUMERIC,
@ -249,10 +259,10 @@ func createTables(db *sql.DB) {
); );
CREATE TABLE IF NOT EXISTS inverters_ac ( CREATE TABLE IF NOT EXISTS inverters_ac (
id SERIAL PRIMARY KEY, -- id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ, timestamp TIMESTAMPTZ,
FOREIGN KEY (timestamp) REFERENCES log(timestamp), -- FOREIGN KEY (timestamp) REFERENCES dtu_log(timestamp),
inverter_serial BIGINT, inverter_serial TEXT,
ac_number INT, ac_number INT,
power NUMERIC, power NUMERIC,
voltage NUMERIC, voltage NUMERIC,
@ -263,10 +273,10 @@ func createTables(db *sql.DB) {
); );
CREATE TABLE IF NOT EXISTS inverters_dc ( CREATE TABLE IF NOT EXISTS inverters_dc (
id SERIAL PRIMARY KEY, -- id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ, timestamp TIMESTAMPTZ,
FOREIGN KEY (timestamp) REFERENCES log(timestamp), -- FOREIGN KEY (timestamp) REFERENCES dtu_log(timestamp),
inverter_serial BIGINT, inverter_serial TEXT,
dc_number INT, dc_number INT,
name TEXT, name TEXT,
power NUMERIC, power NUMERIC,
@ -278,10 +288,10 @@ func createTables(db *sql.DB) {
); );
CREATE TABLE IF NOT EXISTS inverters_inv ( CREATE TABLE IF NOT EXISTS inverters_inv (
id SERIAL PRIMARY KEY, -- id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ, timestamp TIMESTAMPTZ,
FOREIGN KEY (timestamp) REFERENCES log(timestamp), -- FOREIGN KEY (timestamp) REFERENCES dtu_log(timestamp),
inverter_serial BIGINT, inverter_serial TEXT,
temperature NUMERIC, temperature NUMERIC,
power_dc NUMERIC, power_dc NUMERIC,
yieldday NUMERIC, yieldday NUMERIC,
@ -290,21 +300,33 @@ func createTables(db *sql.DB) {
); );
CREATE TABLE IF NOT EXISTS events ( CREATE TABLE IF NOT EXISTS events (
id SERIAL PRIMARY KEY, -- id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
inverter_serial BIGINT, inverter_serial TEXT,
message_id INT, message_id INT,
message TEXT, message TEXT,
start_time INT, start_time INT,
end_time INT end_time INT
); );
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON log (timestamp); CREATE TABLE IF NOT EXISTS dtu_hints (
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters (timestamp); -- id SERIAL PRIMARY KEY,
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_ac (timestamp); timestamp TIMESTAMPTZ,
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_dc (timestamp); -- FOREIGN KEY (timestamp) REFERENCES dtu_log(timestamp),
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_inv (timestamp); time_sync BOOL,
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON events (timestamp); radio_problem BOOL,
default_password BOOL
);
-- UNIQUE here doesn't work, as e.g. one timestamp may be valid for multiple string data.
CREATE INDEX IF NOT EXISTS dtu_log_timestamp_idx ON dtu_log (timestamp);
CREATE INDEX IF NOT EXISTS inverters_timestamp_idx ON inverters (timestamp);
CREATE INDEX IF NOT EXISTS inverters_ac_timestamp_idx ON inverters_ac (timestamp);
CREATE INDEX IF NOT EXISTS inverters_dc_timestamp_idx ON inverters_dc (timestamp);
CREATE INDEX IF NOT EXISTS inverters_inv_timestamp_idx ON inverters_inv (timestamp);
CREATE INDEX IF NOT EXISTS events_timestamp_idx ON events (timestamp);
CREATE INDEX IF NOT EXISTS dtu_hints_timestamp_idx ON dtu_hints (timestamp);
` `
_, err := db.Exec(createTableSQL) _, err := db.Exec(createTableSQL)
@ -314,13 +336,14 @@ func createTables(db *sql.DB) {
timescaleEnabled := os.Getenv("TIMESCALEDB_ENABLED") timescaleEnabled := os.Getenv("TIMESCALEDB_ENABLED")
enableTimescaleDB := ` enableTimescaleDB := `
CREATE EXTENSION IF NOT EXISTS timescaledb; -- CREATE EXTENSION IF NOT EXISTS timescaledb;
SELECT create_hypertable('log', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('dtu_log', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('inverters', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('inverters_ac', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters_ac', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('inverters_dc', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters_dc', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('inverters_inv', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters_inv', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('events', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('events', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('dtu_hints', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
` `
if timescaleEnabled == "true" { if timescaleEnabled == "true" {
_, err := db.Exec(enableTimescaleDB) _, err := db.Exec(enableTimescaleDB)
@ -337,21 +360,22 @@ func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) {
// Insert data into log table // Insert data into log table
_, err := db.Exec(` _, err := db.Exec(`
INSERT INTO log (timestamp, power, yieldday, yieldtotal, time_sync, radio_problem, default_password) INSERT INTO dtu_log (timestamp, power, yieldday, yieldtotal)
VALUES ($1, $2, $3, $4, $5, $6, $7); VALUES ($1, $2, $3, $4);
`, timestamp, total.Power.V, total.YieldDay.V, total.YieldTotal.V, hints.TimeSync, hints.RadioProblem, hints.DefaultPassword) `, timestamp, total.Power.V, total.YieldDay.V, total.YieldTotal.V)
if err != nil { if err != nil {
log.Println("Error inserting into log table:", err) logger.Error("Error inserting into log table", "error", err)
return return
} }
// Get the log ID of the inserted record // Get the log ID of the inserted record
var logID int // NOT IN USE: TimescaleDB doesn't support it.
err = db.QueryRow("SELECT id FROM log WHERE timestamp = $1", timestamp).Scan(&logID) // var logID int
if err != nil { // err = db.QueryRow("SELECT id FROM dtu_log WHERE timestamp = $1", timestamp).Scan(&logID)
log.Println("Error getting log ID:", err) // if err != nil {
return // logger.Error("Error getting dtu_log ID", "error", err)
} // return
// }
// Insert data into inverters table // Insert data into inverters table
_, err = db.Exec(` _, err = db.Exec(`
@ -359,18 +383,18 @@ func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) {
VALUES ($1, $2, $3, $4, $5, $6); VALUES ($1, $2, $3, $4, $5, $6);
`, timestamp, inverter.Serial, inverter.Name, inverter.Producing, inverter.LimitRelative, inverter.LimitAbsolute) `, timestamp, inverter.Serial, inverter.Name, inverter.Producing, inverter.LimitRelative, inverter.LimitAbsolute)
if err != nil { if err != nil {
log.Println("Error inserting into inverters table:", err) logger.Error("Error inserting into inverters table", "error", err)
return return
} }
// Insert data into inverters_ac table // Insert data into inverters_ac table
for acNumber, acData := range inverter.AC { for acNumber, acData := range inverter.AC {
_, err := db.Exec(` _, err := db.Exec(`
INSERT INTO inverters_ac (timestamp, inverter_serial, ac_number, power, voltage, current, frequency, powerfactor, reactivepower, efficiency) INSERT INTO inverters_ac (timestamp, inverter_serial, ac_number, power, voltage, current, frequency, powerfactor, reactivepower)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9); VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9);
`, timestamp, inverter.Serial, acNumber, acData.Power.V, acData.Voltage.V, acData.Current.V, acData.Frequency.V, acData.PowerFactor.V, acData.ReactivePower.V) `, timestamp, inverter.Serial, acNumber, acData.Power.V, acData.Voltage.V, acData.Current.V, acData.Frequency.V, acData.PowerFactor.V, acData.ReactivePower.V)
if err != nil { if err != nil {
log.Printf("Error inserting into inverters_ac table for AC %s: %v\n", acNumber, err) logger.Error("Error inserting into inverters_ac table for AC", "error", "acNumber", acNumber, err)
} }
} }
@ -395,6 +419,16 @@ func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) {
log.Printf("Error inserting into inverters_inv table for INV %s: %v\n", invNumber, err) log.Printf("Error inserting into inverters_inv table for INV %s: %v\n", invNumber, err)
} }
} }
// Insert data into hints table
_, err = db.Exec(`
INSERT INTO dtu_hints (timestamp, time_sync, radio_problem, default_password)
VALUES ($1, $2, $3, $4);
`, timestamp, hints.TimeSync, hints.RadioProblem, hints.DefaultPassword)
if err != nil {
logger.Error("Error inserting into log table", "error", err)
return
}
} }
func queryEventsEndpoint(inverterSerial string) (*EventsResponse, error) { func queryEventsEndpoint(inverterSerial string) (*EventsResponse, error) {
@ -419,7 +453,7 @@ func getPreviousEventsCount(db *sql.DB, inverterSerial string) int {
var count int var count int
err := db.QueryRow("SELECT COUNT(*) FROM events WHERE inverter_serial = $1", inverterSerial).Scan(&count) err := db.QueryRow("SELECT COUNT(*) FROM events WHERE inverter_serial = $1", inverterSerial).Scan(&count)
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
log.Println("Error querying previous events count:", err) logger.Error("Error querying previous events count", "error", err)
} }
return count return count
} }
@ -434,7 +468,7 @@ func insertEvents(db *sql.DB, inverterSerial string, events *EventsResponse) {
VALUES ($1, $2, $3, $4, $5, $6); VALUES ($1, $2, $3, $4, $5, $6);
`, timestamp, inverterSerial, event.MessageID, event.Message, event.StartTime, event.EndTime) `, timestamp, inverterSerial, event.MessageID, event.Message, event.StartTime, event.EndTime)
if err != nil { if err != nil {
log.Println("Error inserting into events table:", err) logger.Error("Error inserting into events table", "error", err)
} }
} }
@ -445,7 +479,7 @@ func insertEvents(db *sql.DB, inverterSerial string, events *EventsResponse) {
time.Sleep(30 * time.Minute) time.Sleep(30 * time.Minute)
updatedEvents, err := queryEventsEndpoint(inverterSerial) updatedEvents, err := queryEventsEndpoint(inverterSerial)
if err != nil { if err != nil {
log.Println("Error querying events endpoint for updates:", err) logger.Error("Error querying events endpoint for updates", "error", err)
continue continue
} }
@ -463,7 +497,7 @@ func updateEvents(db *sql.DB, inverterSerial string, events *EventsResponse) {
UPDATE events SET end_time = $1 WHERE inverter_serial = $2 AND start_time = $3 AND end_time = 0; UPDATE events SET end_time = $1 WHERE inverter_serial = $2 AND start_time = $3 AND end_time = 0;
`, event.EndTime, inverterSerial, event.StartTime) `, event.EndTime, inverterSerial, event.StartTime)
if err != nil { if err != nil {
log.Println("Error updating events table:", err) logger.Error("Error updating events table", "error", err)
} }
} }
} }