// TODO: Storage optimisation: Map inverter serial to shorter serial. Use that for referring. // TODO: Use username and password provided using Basic Authentication. // TODO: Record Inverter struct data only on-change. // Idea: Make a full admin / config GUI and only configure through this utility. // Idea: Gather settings only on start-up. // TODO: Only update meter readings such as yieldday, yieldtotal on-change. // TODO: Implement proper DB migrations. package main import ( "database/sql" "encoding/json" "fmt" "log" "log/slog" "net/http" "os" "time" _ "time/tzdata" "github.com/gorilla/websocket" _ "github.com/lib/pq" ) // VUD contains three variables used for most metrics sent by OpenDTU: // Value, unit and decimal point accuracy. type VUD struct { V float64 `json:"v"` // Value U string `json:"u"` // Unit D int `json:"d"` // Decimals } // InverterAC stores AC generation metrics per inverter. type InverterAC struct { Power VUD `json:"Power"` Voltage VUD `json:"Voltage"` Current VUD `json:"Current"` Frequency VUD `json:"Frequency"` PowerFactor VUD `json:"PowerFactor"` ReactivePower VUD `json:"ReactivePower"` } // InverterDC stores DC generation metrics per string (a string is usually 1 solar panel) type InverterDC struct { Name struct { U string `json:"u"` } Power VUD Voltage VUD Current VUD YieldDay VUD YieldTotal VUD Irradiation struct { V float64 `json:"v"` U string `json:"u"` D int `json:"d"` Max int `json:"max"` } } // InverterINV stores aggregated metrics for each inverter type InverterINV struct { Temperature VUD `json:"Temperature"` Efficiency VUD `json:"Efficiency"` PowerDC VUD `json:"Power DC"` YieldDay VUD `json:"YieldDay"` YieldTotal VUD `json:"YieldTotal"` } // Inverter struct type Inverter struct { Serial string `json:"serial"` Name string `json:"name"` Producing bool `json:"producing"` LimitRelative float64 `json:"limit_relative"` LimitAbsolute float64 `json:"limit_absolute"` AC map[string]InverterAC `json:"AC"` DC map[string]InverterDC `json:"DC"` Events int `json:"events"` PollEnabled bool `json:"poll_enabled"` Reachable bool `json:"reachable"` DataAge int `json:"data_age"` INV map[string]InverterINV `json:"INV"` } type Total struct { Power VUD `json:"Power"` YieldDay VUD `json:"YieldDay"` YieldTotal VUD `json:"YieldTotal"` } type Hints struct { TimeSync bool `json:"time_sync"` RadioProblem bool `json:"radio_problem"` DefaultPassword bool `json:"default_password"` } type LiveData struct { Inverters []Inverter `json:"inverters"` Total Total `json:"total"` Hints Hints `json:"hints"` } // Events struct type Event struct { MessageID int `json:"message_id"` Message string `json:"message"` StartTime int `json:"start_time"` EndTime int `json:"end_time"` StartTimestamp time.Time EndTimestamp time.Time } type EventsResponse struct { Count int `json:"count"` Events []Event `json:"events"` } // Inverter settings structs type ChannelSettings struct { Name string `json:"name"` MaxPower int `json:"max_power"` YieldTotalOffset int `json:"yield_total_offset"` } type InverterSettings struct { ID int `json:"id"` Name string `json:"name"` Order int `json:"order"` Serial string `json:"serial"` PollEnable bool `json:"poll_enable"` PollEnableNight bool `json:"poll_enable_night"` CommandEnable bool `json:"command_enable"` CommandEnableNight bool `json:"command_enable_night"` ReachableThreshold int `json:"reachable_threshold"` ZeroRuntime bool `json:"zero_runtime"` ZeroDay bool `json:"zero_day"` Type string `json:"type"` Channels []ChannelSettings `json:"channel"` } type InverterSettingsData struct { Inverters []InverterSettings `json:"inverter"` } var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) // Main program func main() { // Initial logger setup slog.SetDefault(logger) dbConnStr := (os.Getenv("DB_URL")) // Connect to PostgreSQL db, err := sql.Open("postgres", dbConnStr) if err != nil { log.Fatal(err) } defer db.Close() // Create tables if they don't exist createTables(db) // Get WebSocket URL from environment variable RemoteURL := os.Getenv("REMOTE_URL") wsURL := "ws://" + RemoteURL + "/livedata" if wsURL == "" { log.Fatal("WEBSOCKET_URL environment variable is not set.") } // Establish WebSocket connection c, _, err := websocket.DefaultDialer.Dial(wsURL, nil) if err != nil { log.Fatal(err) } defer c.Close() // Subscribe to the WebSocket feed if err := c.WriteMessage(websocket.TextMessage, []byte(`{"subscribe": "livedata"}`)); err != nil { log.Fatal(err) } // Start listening for WebSocket messages go func() { for { _, message, err := c.ReadMessage() if err != nil { logger.Error("Error reading WebSocket message", "error", err) return } // Handle the received JSON message handleMessage(message, db) } }() // go func() { // updateInverterConfig(db) // } // Keep the program running select {} } func handleMessage(message []byte, db *sql.DB) { var liveData LiveData // Parse the JSON message into the LiveData struct if err := json.Unmarshal(message, &liveData); err != nil { logger.Error("Error decoding JSON", "error", err) return } // Check conditions for recording data for _, inverter := range liveData.Inverters { // Record events data if inverter.DataAge == 0 && inverter.Events > 0 && inverter.Events > getPreviousEventsCount(db, inverter.Serial) { // Query the endpoint for events events, err := queryEventsEndpoint(inverter.Serial) if err != nil { logger.Error("Error querying events endpoint", "error", err) continue } // Insert events data into the opendtu_events table insertEvents(db, inverter.Serial, events) } // Record inverter data if inverter.DataAge == 0 && inverter.Reachable { // Insert data into PostgreSQL tables insertLiveData(db, inverter, liveData.Total, liveData.Hints) logger.Debug("Inserting data") } } } func createTables(db *sql.DB) { // Execute SQL statements to create tables if they don't exist // inverter_serial is TEXT as some non-Hoymiles inverters use non-numeric serial numbers. // An additional advantage is that it makes plotting in Grafana easier. // TODO: Foreign keys commented out as TimescaleDB hypertables don't support them. createTableSQL := ` CREATE TABLE IF NOT EXISTS opendtu_log ( timestamp TIMESTAMPTZ UNIQUE DEFAULT CURRENT_TIMESTAMP, power NUMERIC, yieldday NUMERIC, yieldtotal NUMERIC ); CREATE TABLE IF NOT EXISTS opendtu_inverters ( timestamp TIMESTAMPTZ, -- FOREIGN KEY (timestamp) REFERENCES opendtu_log(timestamp), inverter_serial TEXT, name TEXT, producing BOOL, limit_relative NUMERIC, limit_absolute NUMERIC ); CREATE TABLE IF NOT EXISTS opendtu_inverters_ac ( timestamp TIMESTAMPTZ, -- FOREIGN KEY (timestamp) REFERENCES opendtu_log(timestamp), inverter_serial TEXT, ac_number INT, power NUMERIC, voltage NUMERIC, current NUMERIC, frequency NUMERIC, powerfactor NUMERIC, reactivepower NUMERIC ); CREATE TABLE IF NOT EXISTS opendtu_inverters_dc ( -- id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, -- FOREIGN KEY (timestamp) REFERENCES opendtu_log(timestamp), inverter_serial TEXT, dc_number INT, name TEXT, power NUMERIC, voltage NUMERIC, current NUMERIC, yieldday NUMERIC, yieldtotal NUMERIC, irradiation NUMERIC ); CREATE TABLE IF NOT EXISTS opendtu_inverters_inv ( -- id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, -- FOREIGN KEY (timestamp) REFERENCES opendtu_log(timestamp), inverter_serial TEXT, temperature NUMERIC, power_dc NUMERIC, yieldday NUMERIC, yieldtotal NUMERIC, efficiency NUMERIC ); CREATE TABLE IF NOT EXISTS opendtu_events ( -- id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, inverter_serial TEXT, message_id INT, message TEXT, start_time INT, end_time INT ); DO $$ BEGIN -- Check if start_timestamp column exists IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='opendtu_events' AND column_name='start_timestamp') THEN -- Add start_timestamp column ALTER TABLE opendtu_events ADD COLUMN start_timestamp TIMESTAMPTZ; END IF; -- Check if end_timestamp column exists IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='opendtu_events' AND column_name='end_timestamp') THEN -- Add end_timestamp column ALTER TABLE opendtu_events ADD COLUMN end_timestamp TIMESTAMPTZ; END IF; END $$; CREATE TABLE IF NOT EXISTS opendtu_hints ( -- id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, -- FOREIGN KEY (timestamp) REFERENCES opendtu_log(timestamp), time_sync BOOL, radio_problem BOOL, default_password BOOL ); CREATE INDEX IF NOT EXISTS opendtu_log_timestamp_idx ON opendtu_log (timestamp); CREATE INDEX IF NOT EXISTS opendtu_inverters_timestamp_idx ON opendtu_inverters (timestamp); CREATE INDEX IF NOT EXISTS opendtu_inverters_ac_timestamp_idx ON opendtu_inverters_ac (timestamp); CREATE INDEX IF NOT EXISTS opendtu_inverters_dc_timestamp_idx ON opendtu_inverters_dc (timestamp); CREATE INDEX IF NOT EXISTS opendtu_inverters_inv_timestamp_idx ON opendtu_inverters_inv (timestamp); CREATE INDEX IF NOT EXISTS opendtu_events_timestamp_idx ON opendtu_events (timestamp); CREATE INDEX IF NOT EXISTS opendtu_hints_timestamp_idx ON opendtu_hints (timestamp); ` _, err := db.Exec(createTableSQL) if err != nil { log.Fatal("Error creating tables: ", err) } timescaleEnabled := os.Getenv("TIMESCALEDB_ENABLED") enableTimescaleDB := ` -- CREATE EXTENSION IF NOT EXISTS timescaledb; SELECT create_hypertable('opendtu_log', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_inverters', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_inverters_ac', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_inverters_dc', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_inverters_inv', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_events', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_hints', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); ` if timescaleEnabled == "true" { _, err := db.Exec(enableTimescaleDB) if err != nil { log.Fatal("Error enabling TimescaleDB: ", err) } } } func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) { timeZone := os.Getenv("TZ") loc, _ := time.LoadLocation(timeZone) timestamp := time.Now().In(loc) // Insert data into log table _, err := db.Exec(` INSERT INTO opendtu_log (timestamp, power, yieldday, yieldtotal) VALUES ($1, $2, $3, $4); `, timestamp, total.Power.V, total.YieldDay.V, total.YieldTotal.V) if err != nil { logger.Error("Error inserting into log table", "error", err) return } // Get the log ID of the inserted record // NOT IN USE: TimescaleDB doesn't support it. // var logID int // err = db.QueryRow("SELECT id FROM opendtu_log WHERE timestamp = $1", timestamp).Scan(&logID) // if err != nil { // logger.Error("Error getting opendtu_log ID", "error", err) // return // } // Insert data into opendtu_inverters table _, err = db.Exec(` INSERT INTO opendtu_inverters (timestamp, inverter_serial, name, producing, limit_relative, limit_absolute) VALUES ($1, $2, $3, $4, $5, $6); `, timestamp, inverter.Serial, inverter.Name, inverter.Producing, inverter.LimitRelative, inverter.LimitAbsolute) if err != nil { logger.Error("Error inserting into opendtu_inverters table", "error", err) return } // Insert data into opendtu_inverters_ac table for acNumber, acData := range inverter.AC { _, err := db.Exec(` INSERT INTO opendtu_inverters_ac (timestamp, inverter_serial, ac_number, power, voltage, current, frequency, powerfactor, reactivepower) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9); `, timestamp, inverter.Serial, acNumber, acData.Power.V, acData.Voltage.V, acData.Current.V, acData.Frequency.V, acData.PowerFactor.V, acData.ReactivePower.V) if err != nil { logger.Error("Error inserting into opendtu_inverters_ac table for AC", "error", "acNumber", acNumber, err) } } // Insert data into opendtu_inverters_dc table for dcNumber, dcData := range inverter.DC { _, err := db.Exec(` INSERT INTO opendtu_inverters_dc (timestamp, inverter_serial, dc_number, name, power, voltage, current, yieldday, yieldtotal, irradiation) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10); `, timestamp, inverter.Serial, dcNumber, dcData.Name.U, dcData.Power.V, dcData.Voltage.V, dcData.Current.V, dcData.YieldDay.V, dcData.YieldTotal.V, dcData.Irradiation.V) if err != nil { log.Printf("Error inserting into opendtu_inverters_dc table for DC %s: %v\n", dcNumber, err) } } // Insert data into opendtu_inverters_inv table for invNumber, invData := range inverter.INV { _, err := db.Exec(` INSERT INTO opendtu_inverters_inv (timestamp, inverter_serial, temperature, efficiency, power_dc, yieldday, yieldtotal) VALUES ($1, $2, $3, $4, $5, $6, $7); `, timestamp, inverter.Serial, invData.Temperature.V, invData.Efficiency.V, invData.PowerDC.V, invData.YieldDay.V, invData.YieldTotal.V) if err != nil { log.Printf("Error inserting into opendtu_inverters_inv table for INV %s: %v\n", invNumber, err) } } // Insert data into hints table _, err = db.Exec(` INSERT INTO opendtu_hints (timestamp, time_sync, radio_problem, default_password) VALUES ($1, $2, $3, $4); `, timestamp, hints.TimeSync, hints.RadioProblem, hints.DefaultPassword) if err != nil { logger.Error("Error inserting into log table", "error", err) return } } func queryEventsEndpoint(inverterSerial string) (*EventsResponse, error) { remoteURL := os.Getenv("REMOTE_URL") endpoint := fmt.Sprintf("http://"+remoteURL+"/api/eventlog/status?inv=%s", inverterSerial) resp, err := http.Get(endpoint) if err != nil { return nil, err } defer resp.Body.Close() var eventsResponse EventsResponse if err := json.NewDecoder(resp.Body).Decode(&eventsResponse); err != nil { return nil, err } return &eventsResponse, nil } // The events counter reported by OpenDTU resets every day. // However, this assumes that the inverters from which the events are pulled are reset every day, during the night. // Additionally, this function requires OpenDTU to be set to "Clear Eventlog at midnight" for each inverter. // "Clear Eventlog at midnight" should be set to ON in "Inverter settings" -> "pencil" -> "Advanced". // To account for possible time drifts, the first and last 10 minutes of the day are excluded. // Longest day NL: sun up 4:16, sun down 22:50 // Shortest day NL: sun up 8:44, sun down 16:25 func getPreviousEventsCount(db *sql.DB, inverterSerial string) int { var count int err := db.QueryRow(` SELECT COUNT(*) FROM opendtu_events WHERE inverter_serial = $1 AND timestamp >= CURRENT_DATE + INTERVAL '10 minutes' AND timestamp < CURRENT_DATE + INTERVAL '23 hours 50 minutes' `, inverterSerial).Scan(&count) if err != nil && err != sql.ErrNoRows { logger.Error("Error querying previous events count", "error", err) } return count } func insertEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { timestamp := time.Now() for _, event := range events.Events { // Insert events data into the events table _, err := db.Exec(` INSERT INTO opendtu_events (timestamp, inverter_serial, message_id, message, start_time, end_time) VALUES ($1, $2, $3, $4, $5, $6); `, timestamp, inverterSerial, event.MessageID, event.Message, event.StartTime, event.EndTime) if err != nil { logger.Error("Error inserting into opendtu_events table", "error", err) } } if len(events.Events) > 0 && events.Events[0].EndTime == 0 { // If end_time is 0, schedule a job to update the corresponding message row every 10 minutes go func() { for { time.Sleep(10 * time.Minute) updatedEvents, err := queryEventsEndpoint(inverterSerial) if err != nil { logger.Error("Error querying events endpoint for updates", "error", err) continue } // Update the corresponding message row updateEvents(db, inverterSerial, updatedEvents) } }() } } func updateEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { for _, event := range events.Events { // Update events data in the opendtu_events table _, err := db.Exec(` UPDATE opendtu_events SET end_time = $1 WHERE inverter_serial = $2 AND start_time = $3 AND end_time = 0; `, event.EndTime, inverterSerial, event.StartTime) if err != nil { logger.Error("Error updating opendtu_events table", "error", err) } } } // TODO: finish this function. // func updateInverterConfig(db *sql.DB) { // // Periodically query the /api/inverter/list // for { // updatedInverterConfig, err := queryConfigEndpoint() // if err != nil { // log.Println("Error querying events endpoint for updates:", err) // continue // } // // Update the corresponding message row // updateEvents(db, inverterSerial, updatedInverterConfig) // time.Sleep(60 * time.Minute) // } // } // func queryConfigEndpoint() (*InverterSettingsData, error) { // remoteURL := os.Getenv("REMOTE_URL") // endpoint := fmt.Sprintf("http://" + remoteURL + "/api/inverter/list") // resp, err := http.Get(endpoint) // if err != nil { // return nil, err // } // defer resp.Body.Close() // var inverterSettingsResponse InverterSettingsData // if err := json.NewDecoder(resp.Body).Decode(&inverterSettingsResponse); err != nil { // return nil, err // } // return &inverterSettingsResponse, nil // }