opendtu-logger/main.go

572 lines
18 KiB
Go

// TODO: Storage optimisation: Map inverter serial to shorter serial. Use that for referring.
// TODO: Use username and password provided using Basic Authentication.
// TODO: Record Inverter struct data only on-change.
// Idea: Make a full admin / config GUI and only configure through this utility.
// Idea: Gather settings only on start-up.
// TODO: Only update meter readings such as yieldday, yieldtotal on-change.
// TODO: Implement proper DB migrations.
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"time"
_ "time/tzdata"
"github.com/gorilla/websocket"
_ "github.com/lib/pq"
)
// VUD contains three variables used for most metrics sent by OpenDTU:
// Value, unit and decimal point accuracy.
type VUD struct {
V float64 `json:"v"` // Value
U string `json:"u"` // Unit
D int `json:"d"` // Decimals
}
// InverterAC stores AC generation metrics per inverter.
type InverterAC struct {
Power VUD `json:"Power"`
Voltage VUD `json:"Voltage"`
Current VUD `json:"Current"`
Frequency VUD `json:"Frequency"`
PowerFactor VUD `json:"PowerFactor"`
ReactivePower VUD `json:"ReactivePower"`
}
// InverterDC stores DC generation metrics per string (a string is usually 1 solar panel)
type InverterDC struct {
Name struct {
U string `json:"u"`
}
Power VUD
Voltage VUD
Current VUD
YieldDay VUD
YieldTotal VUD
Irradiation struct {
V float64 `json:"v"`
U string `json:"u"`
D int `json:"d"`
Max int `json:"max"`
}
}
// InverterINV stores aggregated metrics for each inverter
type InverterINV struct {
Temperature VUD `json:"Temperature"`
Efficiency VUD `json:"Efficiency"`
PowerDC VUD `json:"Power DC"`
YieldDay VUD `json:"YieldDay"`
YieldTotal VUD `json:"YieldTotal"`
}
// Inverter struct
type Inverter struct {
Serial string `json:"serial"`
Name string `json:"name"`
Producing bool `json:"producing"`
LimitRelative float64 `json:"limit_relative"`
LimitAbsolute float64 `json:"limit_absolute"`
AC map[string]InverterAC `json:"AC"`
DC map[string]InverterDC `json:"DC"`
Events int `json:"events"`
PollEnabled bool `json:"poll_enabled"`
Reachable bool `json:"reachable"`
DataAge int `json:"data_age"`
INV map[string]InverterINV `json:"INV"`
}
type Total struct {
Power VUD `json:"Power"`
YieldDay VUD `json:"YieldDay"`
YieldTotal VUD `json:"YieldTotal"`
}
type Hints struct {
TimeSync bool `json:"time_sync"`
RadioProblem bool `json:"radio_problem"`
DefaultPassword bool `json:"default_password"`
}
type LiveData struct {
Inverters []Inverter `json:"inverters"`
Total Total `json:"total"`
Hints Hints `json:"hints"`
}
// Events struct
type Event struct {
MessageID int `json:"message_id"`
Message string `json:"message"`
StartTime int `json:"start_time"`
EndTime int `json:"end_time"`
StartTimestamp time.Time
EndTimestamp time.Time
}
type EventsResponse struct {
Count int `json:"count"`
Events []Event `json:"events"`
}
// Inverter settings structs
type ChannelSettings struct {
Name string `json:"name"`
MaxPower int `json:"max_power"`
YieldTotalOffset int `json:"yield_total_offset"`
}
type InverterSettings struct {
ID int `json:"id"`
Name string `json:"name"`
Order int `json:"order"`
Serial string `json:"serial"`
PollEnable bool `json:"poll_enable"`
PollEnableNight bool `json:"poll_enable_night"`
CommandEnable bool `json:"command_enable"`
CommandEnableNight bool `json:"command_enable_night"`
ReachableThreshold int `json:"reachable_threshold"`
ZeroRuntime bool `json:"zero_runtime"`
ZeroDay bool `json:"zero_day"`
Type string `json:"type"`
Channels []ChannelSettings `json:"channel"`
}
type InverterSettingsData struct {
Inverters []InverterSettings `json:"inverter"`
}
var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
// Main program
func main() {
// Initial logger setup
slog.SetDefault(logger)
dbConnStr := (os.Getenv("DB_URL"))
// Connect to PostgreSQL
db, err := sql.Open("postgres", dbConnStr)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Create tables if they don't exist
createTables(db)
// Get WebSocket URL from environment variable
RemoteURL := os.Getenv("REMOTE_URL")
wsURL := "ws://" + RemoteURL + "/livedata"
if wsURL == "" {
log.Fatal("WEBSOCKET_URL environment variable is not set.")
}
// Establish WebSocket connection
c, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
log.Fatal(err)
}
defer c.Close()
// Subscribe to the WebSocket feed
if err := c.WriteMessage(websocket.TextMessage, []byte(`{"subscribe": "livedata"}`)); err != nil {
log.Fatal(err)
}
// Start listening for WebSocket messages
go func() {
for {
_, message, err := c.ReadMessage()
if err != nil {
logger.Error("Error reading WebSocket message", "error", err)
return
}
// Handle the received JSON message
handleMessage(message, db)
}
}()
// go func() {
// updateInverterConfig(db)
// }
// Keep the program running
select {}
}
func handleMessage(message []byte, db *sql.DB) {
var liveData LiveData
// Parse the JSON message into the LiveData struct
if err := json.Unmarshal(message, &liveData); err != nil {
logger.Error("Error decoding JSON", "error", err)
return
}
// Check conditions for recording data
for _, inverter := range liveData.Inverters {
// Record events data
// Optional debugging code:
// log.Println("Events data:", "age", inverter.DataAge, "event number", inverter.Events, "previous event count", getPreviousEventsCount(db, inverter.Serial))
if inverter.DataAge == 0 && inverter.Events > 0 && inverter.Events > getPreviousEventsCount(db, inverter.Serial) {
// Query the endpoint for events
events, err := queryEventsEndpoint(inverter.Serial)
if err != nil {
logger.Error("Error querying events endpoint", "error", err)
continue
}
// Insert events data into the opendtu_events table
insertEvents(db, inverter.Serial, events)
}
// Record inverter data
if inverter.DataAge == 0 && inverter.Reachable {
// Insert data into PostgreSQL tables
insertLiveData(db, inverter, liveData.Total, liveData.Hints)
logger.Debug("Inserting data")
}
}
}
func createTables(db *sql.DB) {
// Execute SQL statements to create tables if they don't exist
// inverter_serial is TEXT as some non-Hoymiles inverters use non-numeric serial numbers.
// An additional advantage is that it makes plotting in Grafana easier.
// TODO: Foreign keys commented out as TimescaleDB hypertables don't support them.
createTableSQL := `
CREATE TABLE IF NOT EXISTS opendtu_log (
timestamp TIMESTAMPTZ UNIQUE DEFAULT CURRENT_TIMESTAMP,
power NUMERIC,
yieldday NUMERIC,
yieldtotal NUMERIC
);
CREATE TABLE IF NOT EXISTS opendtu_inverters (
timestamp TIMESTAMPTZ,
-- FOREIGN KEY (timestamp) REFERENCES opendtu_log(timestamp),
inverter_serial TEXT,
name TEXT,
producing BOOL,
limit_relative NUMERIC,
limit_absolute NUMERIC
);
CREATE TABLE IF NOT EXISTS opendtu_inverters_ac (
timestamp TIMESTAMPTZ,
-- FOREIGN KEY (timestamp) REFERENCES opendtu_log(timestamp),
inverter_serial TEXT,
ac_number INT,
power NUMERIC,
voltage NUMERIC,
current NUMERIC,
frequency NUMERIC,
powerfactor NUMERIC,
reactivepower NUMERIC
);
CREATE TABLE IF NOT EXISTS opendtu_inverters_dc (
-- id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ,
-- FOREIGN KEY (timestamp) REFERENCES opendtu_log(timestamp),
inverter_serial TEXT,
dc_number INT,
name TEXT,
power NUMERIC,
voltage NUMERIC,
current NUMERIC,
yieldday NUMERIC,
yieldtotal NUMERIC,
irradiation NUMERIC
);
CREATE TABLE IF NOT EXISTS opendtu_inverters_inv (
-- id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ,
-- FOREIGN KEY (timestamp) REFERENCES opendtu_log(timestamp),
inverter_serial TEXT,
temperature NUMERIC,
power_dc NUMERIC,
yieldday NUMERIC,
yieldtotal NUMERIC,
efficiency NUMERIC
);
CREATE TABLE IF NOT EXISTS opendtu_events (
-- id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
inverter_serial TEXT,
message_id INT,
message TEXT,
start_time INT,
end_time INT
);
DO $$
BEGIN
-- Check if start_timestamp column exists
IF NOT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name='opendtu_events'
AND column_name='start_timestamp') THEN
-- Add start_timestamp column
ALTER TABLE opendtu_events
ADD COLUMN start_timestamp TIMESTAMPTZ;
END IF;
-- Check if end_timestamp column exists
IF NOT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name='opendtu_events'
AND column_name='end_timestamp') THEN
-- Add end_timestamp column
ALTER TABLE opendtu_events
ADD COLUMN end_timestamp TIMESTAMPTZ;
END IF;
END $$;
CREATE TABLE IF NOT EXISTS opendtu_hints (
-- id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ,
-- FOREIGN KEY (timestamp) REFERENCES opendtu_log(timestamp),
time_sync BOOL,
radio_problem BOOL,
default_password BOOL
);
CREATE INDEX IF NOT EXISTS opendtu_log_timestamp_idx ON opendtu_log (timestamp);
CREATE INDEX IF NOT EXISTS opendtu_inverters_timestamp_idx ON opendtu_inverters (timestamp);
CREATE INDEX IF NOT EXISTS opendtu_inverters_ac_timestamp_idx ON opendtu_inverters_ac (timestamp);
CREATE INDEX IF NOT EXISTS opendtu_inverters_dc_timestamp_idx ON opendtu_inverters_dc (timestamp);
CREATE INDEX IF NOT EXISTS opendtu_inverters_inv_timestamp_idx ON opendtu_inverters_inv (timestamp);
CREATE INDEX IF NOT EXISTS opendtu_events_timestamp_idx ON opendtu_events (timestamp);
CREATE INDEX IF NOT EXISTS opendtu_hints_timestamp_idx ON opendtu_hints (timestamp);
`
_, err := db.Exec(createTableSQL)
if err != nil {
log.Fatal("Error creating tables: ", err)
}
timescaleEnabled := os.Getenv("TIMESCALEDB_ENABLED")
enableTimescaleDB := `
-- CREATE EXTENSION IF NOT EXISTS timescaledb;
SELECT create_hypertable('opendtu_log', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('opendtu_inverters', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('opendtu_inverters_ac', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('opendtu_inverters_dc', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('opendtu_inverters_inv', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('opendtu_events', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
SELECT create_hypertable('opendtu_hints', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
`
if timescaleEnabled == "true" {
_, err := db.Exec(enableTimescaleDB)
if err != nil {
log.Fatal("Error enabling TimescaleDB: ", err)
}
}
}
func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) {
timeZone := os.Getenv("TZ")
loc, _ := time.LoadLocation(timeZone)
timestamp := time.Now().In(loc)
// Insert data into log table
_, err := db.Exec(`
INSERT INTO opendtu_log (timestamp, power, yieldday, yieldtotal)
VALUES ($1, $2, $3, $4);
`, timestamp, total.Power.V, total.YieldDay.V, total.YieldTotal.V)
if err != nil {
logger.Error("Error inserting into log table", "error", err)
return
}
// Get the log ID of the inserted record
// NOT IN USE: TimescaleDB doesn't support it.
// var logID int
// err = db.QueryRow("SELECT id FROM opendtu_log WHERE timestamp = $1", timestamp).Scan(&logID)
// if err != nil {
// logger.Error("Error getting opendtu_log ID", "error", err)
// return
// }
// Insert data into opendtu_inverters table
_, err = db.Exec(`
INSERT INTO opendtu_inverters (timestamp, inverter_serial, name, producing, limit_relative, limit_absolute)
VALUES ($1, $2, $3, $4, $5, $6);
`, timestamp, inverter.Serial, inverter.Name, inverter.Producing, inverter.LimitRelative, inverter.LimitAbsolute)
if err != nil {
logger.Error("Error inserting into opendtu_inverters table", "error", err)
return
}
// Insert data into opendtu_inverters_ac table
for acNumber, acData := range inverter.AC {
_, err := db.Exec(`
INSERT INTO opendtu_inverters_ac (timestamp, inverter_serial, ac_number, power, voltage, current, frequency, powerfactor, reactivepower)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9);
`, timestamp, inverter.Serial, acNumber, acData.Power.V, acData.Voltage.V, acData.Current.V, acData.Frequency.V, acData.PowerFactor.V, acData.ReactivePower.V)
if err != nil {
logger.Error("Error inserting into opendtu_inverters_ac table for AC", "error", "acNumber", acNumber, err)
}
}
// Insert data into opendtu_inverters_dc table
for dcNumber, dcData := range inverter.DC {
_, err := db.Exec(`
INSERT INTO opendtu_inverters_dc (timestamp, inverter_serial, dc_number, name, power, voltage, current, yieldday, yieldtotal, irradiation)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);
`, timestamp, inverter.Serial, dcNumber, dcData.Name.U, dcData.Power.V, dcData.Voltage.V, dcData.Current.V, dcData.YieldDay.V, dcData.YieldTotal.V, dcData.Irradiation.V)
if err != nil {
log.Printf("Error inserting into opendtu_inverters_dc table for DC %s: %v\n", dcNumber, err)
}
}
// Insert data into opendtu_inverters_inv table
for invNumber, invData := range inverter.INV {
_, err := db.Exec(`
INSERT INTO opendtu_inverters_inv (timestamp, inverter_serial, temperature, efficiency, power_dc, yieldday, yieldtotal)
VALUES ($1, $2, $3, $4, $5, $6, $7);
`, timestamp, inverter.Serial, invData.Temperature.V, invData.Efficiency.V, invData.PowerDC.V, invData.YieldDay.V, invData.YieldTotal.V)
if err != nil {
log.Printf("Error inserting into opendtu_inverters_inv table for INV %s: %v\n", invNumber, err)
}
}
// Insert data into hints table
_, err = db.Exec(`
INSERT INTO opendtu_hints (timestamp, time_sync, radio_problem, default_password)
VALUES ($1, $2, $3, $4);
`, timestamp, hints.TimeSync, hints.RadioProblem, hints.DefaultPassword)
if err != nil {
logger.Error("Error inserting into log table", "error", err)
return
}
}
func queryEventsEndpoint(inverterSerial string) (*EventsResponse, error) {
remoteURL := os.Getenv("REMOTE_URL")
endpoint := fmt.Sprintf("http://"+remoteURL+"/api/eventlog/status?inv=%s", inverterSerial)
resp, err := http.Get(endpoint)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var eventsResponse EventsResponse
if err := json.NewDecoder(resp.Body).Decode(&eventsResponse); err != nil {
return nil, err
}
return &eventsResponse, nil
}
// The events counter reported by OpenDTU resets every day.
// However, this assumes that the inverters from which the events are pulled are reset every day, during the night.
// Additionally, this function requires OpenDTU to be set to "Clear Eventlog at midnight" for each inverter.
// "Clear Eventlog at midnight" should be set to ON in "Inverter settings" -> "pencil" -> "Advanced".
// To account for possible time drifts, the first and last 10 minutes of the day are excluded.
//
// Longest day NL: sun up 4:16, sun down 22:50
// Shortest day NL: sun up 8:44, sun down 16:25
func getPreviousEventsCount(db *sql.DB, inverterSerial string) int {
var count int
err := db.QueryRow(`
SELECT COUNT(*)
FROM opendtu_events
WHERE inverter_serial = $1
AND timestamp >= CURRENT_DATE + INTERVAL '10 minutes'
AND timestamp < CURRENT_DATE + INTERVAL '23 hours 50 minutes'
`, inverterSerial).Scan(&count)
if err != nil && err != sql.ErrNoRows {
logger.Error("Error querying previous events count", "error", err)
}
return count
}
func insertEvents(db *sql.DB, inverterSerial string, events *EventsResponse) {
timestamp := time.Now()
for _, event := range events.Events {
// Insert events data into the events table
_, err := db.Exec(`
INSERT INTO opendtu_events (timestamp, inverter_serial, message_id, message, start_time, end_time)
VALUES ($1, $2, $3, $4, $5, $6);
`, timestamp, inverterSerial, event.MessageID, event.Message, event.StartTime, event.EndTime)
if err != nil {
logger.Error("Error inserting into opendtu_events table", "error", err)
}
}
if len(events.Events) > 0 && events.Events[0].EndTime == 0 {
// If end_time is 0, schedule a job to update the corresponding message row every 10 minutes
go func() {
for {
time.Sleep(10 * time.Minute)
updatedEvents, err := queryEventsEndpoint(inverterSerial)
if err != nil {
logger.Error("Error querying events endpoint for updates", "error", err)
continue
}
// Update the corresponding message row
updateEvents(db, inverterSerial, updatedEvents)
}
}()
}
}
func updateEvents(db *sql.DB, inverterSerial string, events *EventsResponse) {
for _, event := range events.Events {
// Update events data in the opendtu_events table
_, err := db.Exec(`
UPDATE opendtu_events SET end_time = $1 WHERE inverter_serial = $2 AND start_time = $3 AND end_time = 0;
`, event.EndTime, inverterSerial, event.StartTime)
if err != nil {
logger.Error("Error updating opendtu_events table", "error", err)
}
}
}
// TODO: finish this function.
// func updateInverterConfig(db *sql.DB) {
// // Periodically query the /api/inverter/list
// for {
// updatedInverterConfig, err := queryConfigEndpoint()
// if err != nil {
// log.Println("Error querying events endpoint for updates:", err)
// continue
// }
// // Update the corresponding message row
// updateEvents(db, inverterSerial, updatedInverterConfig)
// time.Sleep(60 * time.Minute)
// }
// }
// func queryConfigEndpoint() (*InverterSettingsData, error) {
// remoteURL := os.Getenv("REMOTE_URL")
// endpoint := fmt.Sprintf("http://" + remoteURL + "/api/inverter/list")
// resp, err := http.Get(endpoint)
// if err != nil {
// return nil, err
// }
// defer resp.Body.Close()
// var inverterSettingsResponse InverterSettingsData
// if err := json.NewDecoder(resp.Body).Decode(&inverterSettingsResponse); err != nil {
// return nil, err
// }
// return &inverterSettingsResponse, nil
// }