// TODO: Storage optimisation: Map inverter serial to shorter serial. Use that for referring. // TODO: Record Inverter struct data only on-change. // Idea: Make a full admin / config GUI and only configure through this utility. // Idea: Gather settings only on start-up. // TODO: Only update meter readings such as yieldday, yieldtotal on-change. // TODO: Add a health check endpoint, potentially log to it. // TODO: Add support for monitoring multiple OpenDTU's at once. package main import ( "database/sql" "encoding/base64" "encoding/json" "fmt" "io/fs" "log" "log/slog" "net/http" "os" "strconv" "time" _ "time/tzdata" "git.hollander.online/energy/opendtu-logger/migrations" "github.com/gorilla/websocket" _ "github.com/lib/pq" "github.com/pressly/goose/v3" ) // VUD contains three variables used for most metrics sent by OpenDTU: // Value, unit and decimal point accuracy. type VUD struct { V float64 `json:"v"` // Value U string `json:"u"` // Unit D int `json:"d"` // Decimals } // InverterAC stores AC generation metrics per inverter. type InverterAC struct { Power VUD `json:"Power"` Voltage VUD `json:"Voltage"` Current VUD `json:"Current"` Frequency VUD `json:"Frequency"` PowerFactor VUD `json:"PowerFactor"` ReactivePower VUD `json:"ReactivePower"` } // InverterDC stores DC generation metrics per string (a string is usually 1 solar panel) type InverterDC struct { Name struct { U string `json:"u"` } Power VUD Voltage VUD Current VUD YieldDay VUD YieldTotal VUD Irradiation struct { V float64 `json:"v"` U string `json:"u"` D int `json:"d"` Max int `json:"max"` } } // InverterINV stores aggregated metrics for each inverter type InverterINV struct { Temperature VUD `json:"Temperature"` Efficiency VUD `json:"Efficiency"` PowerDC VUD `json:"Power DC"` YieldDay VUD `json:"YieldDay"` YieldTotal VUD `json:"YieldTotal"` } // Inverter struct type Inverter struct { Serial string `json:"serial"` Name string `json:"name"` Producing bool `json:"producing"` LimitRelative float64 `json:"limit_relative"` LimitAbsolute float64 `json:"limit_absolute"` AC map[string]InverterAC `json:"AC"` DC map[string]InverterDC `json:"DC"` Events int `json:"events"` PollEnabled bool `json:"poll_enabled"` Reachable bool `json:"reachable"` DataAge int `json:"data_age"` INV map[string]InverterINV `json:"INV"` } type Total struct { Power VUD `json:"Power"` YieldDay VUD `json:"YieldDay"` YieldTotal VUD `json:"YieldTotal"` } type Hints struct { TimeSync bool `json:"time_sync"` RadioProblem bool `json:"radio_problem"` DefaultPassword bool `json:"default_password"` } type LiveData struct { Inverters []Inverter `json:"inverters"` Total Total `json:"total"` Hints Hints `json:"hints"` } // Events struct type Event struct { MessageID int `json:"message_id"` Message string `json:"message"` StartTime int `json:"start_time"` EndTime int `json:"end_time"` StartTimestamp time.Time EndTimestamp time.Time } type EventsResponse struct { Count int `json:"count"` Events []Event `json:"events"` } // Inverter settings structs type ChannelSettings struct { Name string `json:"name"` MaxPower int `json:"max_power"` YieldTotalOffset int `json:"yield_total_offset"` } type InverterSettings struct { ID int `json:"id"` Name string `json:"name"` Order int `json:"order"` Serial string `json:"serial"` PollEnable bool `json:"poll_enable"` PollEnableNight bool `json:"poll_enable_night"` CommandEnable bool `json:"command_enable"` CommandEnableNight bool `json:"command_enable_night"` ReachableThreshold int `json:"reachable_threshold"` ZeroRuntime bool `json:"zero_runtime"` ZeroDay bool `json:"zero_day"` Type string `json:"type"` Channels []ChannelSettings `json:"channel"` } type InverterSettingsData struct { Inverters []InverterSettings `json:"inverter"` } // Config settings struct type Config struct { DB string `json:"db"` OpenDTUAddress string `json:"opendtu_address"` OpenDTUAuth bool `json:"opendtu_auth"` OpenDTUUser string `json:"opendtu_username"` OpenDTUPassword string `json:"opendtu_password"` TimescaleDB bool `json:"timescaledb"` TZ string `json:"tz"` LogLevel string `json:"log_level"` } var logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) var config Config // LoadConfig attempts to read the configuration from options.json // If it fails, it falls back to using environment variables func loadConfig() Config { configFilePath := os.Getenv("CONFIG_FILE") if configFilePath == "" { configFilePath = "/data/options.json" } data, err := os.ReadFile(configFilePath) if err == nil { // Successfully read the file, parse the JSON err = json.Unmarshal(data, &config) if err != nil { log.Fatalf("Error parsing config file: %v", err) } if config.DB == "" { log.Fatal("db connection settings are not set") } if config.OpenDTUAddress == "" { log.Fatal("opendtu_address is not set") } if config.OpenDTUAuth { if config.OpenDTUUser == "" { log.Fatal("opendtu_username is not set, while opendtu_auth is set to enabled. Set opendtu_auth to false or set username") } if config.OpenDTUPassword == "" { log.Fatal("opendtu_password is not set, while opendtu_auth is set to enabled. Set opendtu_auth to false or set password") } } } else { logger.Info("JSON config file not found. Falling back to environment variables.") // Fallback to environment variables config.DB = os.Getenv("DB_URL") if config.DB == "" { log.Fatal("DB_URL environment variable is not set.") } config.OpenDTUAddress = os.Getenv("OPENDTU_ADDRESS") if config.OpenDTUAddress == "" { log.Fatal("OPENDTU_ADDRESS environment variable is not set.") } openDTUAuthStr := os.Getenv("OPENDTU_AUTH") if openDTUAuthStr != "" { openDTUAuth, err := strconv.ParseBool(openDTUAuthStr) if err != nil { log.Fatalf("Error parsing OPENDTU_AUTH: %v", err) } config.OpenDTUAuth = openDTUAuth } if config.OpenDTUAuth { config.OpenDTUUser = os.Getenv("OPENDTU_USERNAME") if config.OpenDTUUser == "" { log.Fatal("OPENDTU_USERNAME environment variable is not set.") } config.OpenDTUPassword = os.Getenv("OPENDTU_PASSWORD") if config.OpenDTUPassword == "" { log.Fatal("OPENDTU_PASSWORD environment variable is not set.") } } timescaleDBStr := os.Getenv("TIMESCALEDB_ENABLED") if timescaleDBStr != "" { timescaleDB, err := strconv.ParseBool(timescaleDBStr) if err != nil { log.Fatalf("Error parsing TIMESCALEDB_ENABLED: %v", err) } config.TimescaleDB = timescaleDB } config.TZ = os.Getenv("TZ") config.LogLevel = os.Getenv("LOG_LEVEL") } _, err = time.LoadLocation(config.TZ) if err != nil { logger.Warn("invalid timezone") } return config } // Helper function to map environment variable to slog.Level func getLogLevel(defaultLevel slog.Level) slog.Level { logLevelStr := config.LogLevel switch logLevelStr { case "DEBUG": return slog.LevelDebug case "INFO": return slog.LevelInfo case "WARN": return slog.LevelWarn case "ERROR": return slog.LevelError default: return defaultLevel } } // Function to create a new logger with a specified log level func createLoggerWithLevel(level slog.Level) *slog.Logger { return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: level, })) } // Main program func main() { // Initial logger setup slog.SetDefault(logger) // Load the configuration config := loadConfig() // Set the logLevel logLevel := getLogLevel(slog.LevelInfo) // Default to info level logger = createLoggerWithLevel(logLevel) dbConnStr := config.DB // Connect to PostgreSQL db, err := sql.Open("postgres", dbConnStr) if err != nil { log.Fatal(err) } defer db.Close() // Create tables if they don't exist migrateDB(db) // Create WebSocket URL from config variable wsURL := "ws://" + config.OpenDTUAddress + "/livedata" logger.Debug(wsURL) // Create headers with optional Basic Auth headers := http.Header{} if config.OpenDTUAuth { headers.Set("Authorization", basicAuth(config.OpenDTUUser, config.OpenDTUPassword)) } // Establish WebSocket connection c, _, err := websocket.DefaultDialer.Dial(wsURL, headers) if err != nil { log.Fatal(err) } defer c.Close() // Subscribe to the WebSocket feed if err := c.WriteMessage(websocket.TextMessage, []byte(`{"subscribe": "livedata"}`)); err != nil { log.Fatal(err) } logger.Info("OpenDTU Logger has been successfully initialised. Starting data recording...") // Start listening for WebSocket messages go func() { for { _, message, err := c.ReadMessage() if err != nil { logger.Error("Error reading WebSocket message", "error", err) return } // Handle the received JSON message handleMessage(message, db) } }() // go func() { // updateInverterConfig(db) // } // Keep the program running select {} } func handleMessage(message []byte, db *sql.DB) { var liveData LiveData // Parse the JSON message into the LiveData struct if err := json.Unmarshal(message, &liveData); err != nil { logger.Error("Error decoding JSON", "error", err) return } // Check conditions for recording data for _, inverter := range liveData.Inverters { // Record events data // Optional debugging code: // log.Println("Events data:", "age", inverter.DataAge, "event number", inverter.Events, "previous event count", getPreviousEventsCount(db, inverter.Serial)) if inverter.DataAge == 0 && inverter.Events > 0 && inverter.Events > getPreviousEventsCount(db, inverter.Serial) { // Query the endpoint for events events, err := queryEventsEndpoint(inverter.Serial) if err != nil { logger.Error("Error querying events endpoint", "error", err) continue } // Insert events data into the opendtu_events table insertEvents(db, inverter.Serial, events) } // Record inverter data if inverter.DataAge == 0 && inverter.Reachable { // Insert data into PostgreSQL tables insertLiveData(db, inverter, liveData.Total, liveData.Hints) logger.Debug("Inserting data") } } } func migrateDB(db *sql.DB) { // TODO: Foreign keys commented out as TimescaleDB hypertables don't support them. // Perform DB migrations err := migrateFS(db, migrations.FS, ".") if err != nil { log.Fatal("Error performing database migrations: ", err) } timescaleEnabled := config.TimescaleDB enableTimescaleDB := ` -- CREATE EXTENSION IF NOT EXISTS timescaledb; SELECT create_hypertable('opendtu_log', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_inverters', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_inverters_ac', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_inverters_dc', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_inverters_inv', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_events', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('opendtu_hints', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); ` if timescaleEnabled { _, err := db.Exec(enableTimescaleDB) if err != nil { log.Fatal("Error enabling TimescaleDB: ", err) } } } func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) { timeZone := config.TZ loc, _ := time.LoadLocation(timeZone) timestamp := time.Now().In(loc) // Insert data into log table _, err := db.Exec(` INSERT INTO opendtu_log (timestamp, power, yieldday, yieldtotal) VALUES ($1, $2, $3, $4); `, timestamp, total.Power.V, total.YieldDay.V, total.YieldTotal.V) if err != nil { logger.Error("Error inserting into log table", "error", err) return } // Get the log ID of the inserted record // NOT IN USE: TimescaleDB doesn't support it. // var logID int // err = db.QueryRow("SELECT id FROM opendtu_log WHERE timestamp = $1", timestamp).Scan(&logID) // if err != nil { // logger.Error("Error getting opendtu_log ID", "error", err) // return // } // Insert data into opendtu_inverters table _, err = db.Exec(` INSERT INTO opendtu_inverters (timestamp, inverter_serial, name, producing, limit_relative, limit_absolute) VALUES ($1, $2, $3, $4, $5, $6); `, timestamp, inverter.Serial, inverter.Name, inverter.Producing, inverter.LimitRelative, inverter.LimitAbsolute) if err != nil { logger.Error("Error inserting into opendtu_inverters table", "error", err) return } // Insert data into opendtu_inverters_ac table for acNumber, acData := range inverter.AC { _, err := db.Exec(` INSERT INTO opendtu_inverters_ac (timestamp, inverter_serial, ac_number, power, voltage, current, frequency, powerfactor, reactivepower) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9); `, timestamp, inverter.Serial, acNumber, acData.Power.V, acData.Voltage.V, acData.Current.V, acData.Frequency.V, acData.PowerFactor.V, acData.ReactivePower.V) if err != nil { logger.Error("Error inserting into opendtu_inverters_ac table for AC", "error", "acNumber", acNumber, err) } } // Insert data into opendtu_inverters_dc table for dcNumber, dcData := range inverter.DC { _, err := db.Exec(` INSERT INTO opendtu_inverters_dc (timestamp, inverter_serial, dc_number, name, power, voltage, current, yieldday, yieldtotal, irradiation) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10); `, timestamp, inverter.Serial, dcNumber, dcData.Name.U, dcData.Power.V, dcData.Voltage.V, dcData.Current.V, dcData.YieldDay.V, dcData.YieldTotal.V, dcData.Irradiation.V) if err != nil { log.Printf("Error inserting into opendtu_inverters_dc table for DC %s: %v\n", dcNumber, err) } } // Insert data into opendtu_inverters_inv table for invNumber, invData := range inverter.INV { _, err := db.Exec(` INSERT INTO opendtu_inverters_inv (timestamp, inverter_serial, temperature, efficiency, power_dc, yieldday, yieldtotal) VALUES ($1, $2, $3, $4, $5, $6, $7); `, timestamp, inverter.Serial, invData.Temperature.V, invData.Efficiency.V, invData.PowerDC.V, invData.YieldDay.V, invData.YieldTotal.V) if err != nil { log.Printf("Error inserting into opendtu_inverters_inv table for INV %s: %v\n", invNumber, err) } } // Insert data into hints table _, err = db.Exec(` INSERT INTO opendtu_hints (timestamp, time_sync, radio_problem, default_password) VALUES ($1, $2, $3, $4); `, timestamp, hints.TimeSync, hints.RadioProblem, hints.DefaultPassword) if err != nil { logger.Error("Error inserting into log table", "error", err) return } } func migrate(db *sql.DB, dir string) error { err := goose.SetDialect("postgres") if err != nil { return fmt.Errorf("migrate: %w", err) } err = goose.Up(db, dir) if err != nil { return fmt.Errorf("migrate: %w", err) } return nil } func migrateFS(db *sql.DB, migrationFS fs.FS, dir string) error { // In case the dir is an empty string, they probably meant the current directory and goose wants a period for that. if dir == "" { dir = "." } goose.SetBaseFS(migrationFS) defer func() { // Ensure that we remove the FS on the off chance some other part of our app uses goose for migrations and doesn't want to use our FS. goose.SetBaseFS(nil) }() return migrate(db, dir) } func queryEventsEndpoint(inverterSerial string) (*EventsResponse, error) { endpoint := fmt.Sprintf("http://"+config.OpenDTUAddress+"/api/eventlog/status?inv=%s", inverterSerial) // Create a new HTTP request req, err := http.NewRequest("GET", endpoint, nil) if err != nil { return nil, err } if config.OpenDTUAuth { // Add Basic Auth header req.Header.Add("Authorization", basicAuth(config.OpenDTUUser, config.OpenDTUPassword)) } // Send the request client := &http.Client{} resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() // Check for HTTP errors if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("HTTP request failed with status: %s", resp.Status) } // Decode the response var eventsResponse EventsResponse if err := json.NewDecoder(resp.Body).Decode(&eventsResponse); err != nil { return nil, err } return &eventsResponse, nil } // The events counter reported by OpenDTU resets every day. // However, this assumes that the inverters from which the events are pulled are reset every day, during the night. // Additionally, this function requires OpenDTU to be set to "Clear Eventlog at midnight" for each inverter. // "Clear Eventlog at midnight" should be set to ON in "Inverter settings" -> "pencil" -> "Advanced". // To account for possible time drifts, the first and last 10 minutes of the day are excluded. // // Longest day NL: sun up 4:16, sun down 22:50 // Shortest day NL: sun up 8:44, sun down 16:25 func getPreviousEventsCount(db *sql.DB, inverterSerial string) int { var count int err := db.QueryRow(` SELECT COUNT(*) FROM opendtu_events WHERE inverter_serial = $1 AND timestamp >= CURRENT_DATE + INTERVAL '10 minutes' AND timestamp < CURRENT_DATE + INTERVAL '23 hours 50 minutes' `, inverterSerial).Scan(&count) if err != nil && err != sql.ErrNoRows { logger.Error("Error querying previous events count", "error", err) } return count } func insertEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { timeZone := config.TZ loc, _ := time.LoadLocation(timeZone) timestamp := time.Now().In(loc) for _, event := range events.Events { // Insert events data into the events table _, err := db.Exec(` INSERT INTO opendtu_events (timestamp, inverter_serial, message_id, message, start_time, end_time) VALUES ($1, $2, $3, $4, $5, $6); `, timestamp, inverterSerial, event.MessageID, event.Message, event.StartTime, event.EndTime) if err != nil { logger.Error("Error inserting into opendtu_events table", "error", err) } } if len(events.Events) > 0 && events.Events[0].EndTime == 0 { // If end_time is 0, schedule a job to update the corresponding message row every 10 minutes go func() { for { time.Sleep(10 * time.Minute) updatedEvents, err := queryEventsEndpoint(inverterSerial) if err != nil { logger.Error("Error querying events endpoint for updates", "error", err) continue } // Update the corresponding message row updateEvents(db, inverterSerial, updatedEvents) } }() } } func updateEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { for _, event := range events.Events { // Update events data in the opendtu_events table _, err := db.Exec(` UPDATE opendtu_events SET end_time = $1 WHERE inverter_serial = $2 AND start_time = $3 AND end_time = 0; `, event.EndTime, inverterSerial, event.StartTime) if err != nil { logger.Error("Error updating opendtu_events table", "error", err) } } } // basicAuth generates the Basic Auth header value func basicAuth(username, password string) string { credentials := username + ":" + password return "Basic " + base64.StdEncoding.EncodeToString([]byte(credentials)) } // TODO: finish this function. // func updateInverterConfig(db *sql.DB) { // // Periodically query the /api/inverter/list // for { // updatedInverterConfig, err := queryConfigEndpoint() // if err != nil { // log.Println("Error querying events endpoint for updates:", err) // continue // } // // Update the corresponding message row // updateEvents(db, inverterSerial, updatedInverterConfig) // time.Sleep(60 * time.Minute) // } // } // func queryConfigEndpoint() (*InverterSettingsData, error) { // openDTUAddress := os.Getenv("OPENDTU_ADDRESS") // endpoint := fmt.Sprintf("http://" + openDTUAddress + "/api/inverter/list") // resp, err := http.Get(endpoint) // if err != nil { // return nil, err // } // defer resp.Body.Close() // var inverterSettingsResponse InverterSettingsData // if err := json.NewDecoder(resp.Body).Decode(&inverterSettingsResponse); err != nil { // return nil, err // } // return &inverterSettingsResponse, nil // }