// TODO: Remove inverters_DC Name and periodically / on request query name only once. // TODO: Storage optimisation: Map inverter serial to shorter serial. Use that for referring. // TODO: Use username and password provided using Basic Authentication. // TODO: Make the timestamp in log a foreign key to other DBs so timescaleDB hypertables can be created. // Make it unique so foreign key constraints can be used. // Create separate table for inverter_id, dc_number, name and store it there. // Do the same for string names. // Idea: Make a full admin / config GUI and only configure through this utility. // Idea: Gather settings only on start-up. package main import ( "database/sql" "encoding/json" "fmt" "log" "net/http" "os" "time" _ "time/tzdata" "github.com/gorilla/websocket" _ "github.com/lib/pq" ) type VUD struct { V float64 `json:"v"` U string `json:"u"` D int `json:"d"` } type InverterAC struct { Power VUD `json:"Power"` Voltage VUD `json:"Voltage"` Current VUD `json:"Current"` Frequency VUD `json:"Frequency"` PowerFactor VUD `json:"PowerFactor"` ReactivePower VUD `json:"ReactivePower"` } type InverterDC struct { Name struct { U string `json:"u"` } Power VUD Voltage VUD Current VUD YieldDay VUD YieldTotal VUD Irradiation struct { V float64 `json:"v"` U string `json:"u"` D int `json:"d"` Max int `json:"max"` } } type InverterINV struct { Temperature VUD `json:"Temperature"` Efficiency VUD `json:"Efficiency"` PowerDC VUD `json:"Power DC"` YieldDay VUD `json:"YieldDay"` YieldTotal VUD `json:"YieldTotal"` } // Inverter struct type Inverter struct { Serial string `json:"serial"` Name string `json:"name"` Producing bool `json:"producing"` LimitRelative float64 `json:"limit_relative"` LimitAbsolute float64 `json:"limit_absolute"` AC map[string]InverterAC `json:"AC"` DC map[string]InverterDC `json:"DC"` Events int `json:"events"` PollEnabled bool `json:"poll_enabled"` Reachable bool `json:"reachable"` DataAge int `json:"data_age"` INV map[string]InverterINV `json:"INV"` } type Total struct { Power VUD `json:"Power"` YieldDay VUD `json:"YieldDay"` YieldTotal VUD `json:"YieldTotal"` } type Hints struct { TimeSync bool `json:"time_sync"` RadioProblem bool `json:"radio_problem"` DefaultPassword bool `json:"default_password"` } type LiveData struct { Inverters []Inverter `json:"inverters"` Total Total `json:"total"` Hints Hints `json:"hints"` } // Events struct type Event struct { MessageID int `json:"message_id"` Message string `json:"message"` StartTime int `json:"start_time"` EndTime int `json:"end_time"` } type EventsResponse struct { Count int `json:"count"` Events []Event `json:"events"` } // Inverter settings structs type ChannelSettings struct { Name string `json:"name"` MaxPower int `json:"max_power"` YieldTotalOffset int `json:"yield_total_offset"` } type InverterSettings struct { ID int `json:"id"` Name string `json:"name"` Order int `json:"order"` Serial string `json:"serial"` PollEnable bool `json:"poll_enable"` PollEnableNight bool `json:"poll_enable_night"` CommandEnable bool `json:"command_enable"` CommandEnableNight bool `json:"command_enable_night"` ReachableThreshold int `json:"reachable_threshold"` ZeroRuntime bool `json:"zero_runtime"` ZeroDay bool `json:"zero_day"` Type string `json:"type"` Channels []ChannelSettings `json:"channel"` } type InverterSettingsData struct { Inverters []InverterSettings `json:"inverter"` } // Main program func main() { dbConnStr := (os.Getenv("DB_URL")) // Connect to PostgreSQL db, err := sql.Open("postgres", dbConnStr) if err != nil { log.Fatal(err) } defer db.Close() // Create tables if they don't exist createTables(db) // Get WebSocket URL from environment variable RemoteURL := os.Getenv("REMOTE_URL") wsURL := "ws://" + RemoteURL + "/livedata" if wsURL == "" { log.Fatal("WEBSOCKET_URL environment variable is not set.") } // Establish WebSocket connection c, _, err := websocket.DefaultDialer.Dial(wsURL, nil) if err != nil { log.Fatal(err) } defer c.Close() // Subscribe to the WebSocket feed if err := c.WriteMessage(websocket.TextMessage, []byte(`{"subscribe": "livedata"}`)); err != nil { log.Fatal(err) } // Start listening for WebSocket messages go func() { for { _, message, err := c.ReadMessage() if err != nil { log.Println(err) return } // Handle the received JSON message handleMessage(message, db) } }() // go func() { // updateInverterConfig(db) // } // Keep the program running select {} } func handleMessage(message []byte, db *sql.DB) { var liveData LiveData // Parse the JSON message into the LiveData struct if err := json.Unmarshal(message, &liveData); err != nil { log.Println("Error decoding JSON:", err) return } // Check conditions for recording data for _, inverter := range liveData.Inverters { if inverter.Events > 0 && inverter.Events > getPreviousEventsCount(db, inverter.Serial) { // Query the endpoint for events events, err := queryEventsEndpoint(inverter.Serial) if err != nil { log.Println("Error querying events endpoint:", err) continue } // Insert events data into the events table insertEvents(db, inverter.Serial, events) } if inverter.DataAge == 0 && inverter.Reachable { // Insert data into PostgreSQL tables insertLiveData(db, inverter, liveData.Total, liveData.Hints) fmt.Println("inserting data") } } } func createTables(db *sql.DB) { // Execute SQL statements to create tables if they don't exist createTableSQL := ` CREATE TABLE IF NOT EXISTS log ( id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ UNIQUE DEFAULT CURRENT_TIMESTAMP, power NUMERIC, yieldday NUMERIC, yieldtotal NUMERIC, time_sync BOOL, radio_problem BOOL, default_password BOOL ); CREATE TABLE IF NOT EXISTS inverters ( id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, FOREIGN KEY (timestamp) REFERENCES log(timestamp), inverter_serial BIGINT, name TEXT, producing BOOL, limit_relative NUMERIC, limit_absolute NUMERIC ); CREATE TABLE IF NOT EXISTS inverters_ac ( id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, FOREIGN KEY (timestamp) REFERENCES log(timestamp), inverter_serial BIGINT, ac_number INT, power NUMERIC, voltage NUMERIC, current NUMERIC, frequency NUMERIC, powerfactor NUMERIC, reactivepower NUMERIC ); CREATE TABLE IF NOT EXISTS inverters_dc ( id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, FOREIGN KEY (timestamp) REFERENCES log(timestamp), inverter_serial BIGINT, dc_number INT, name TEXT, power NUMERIC, voltage NUMERIC, current NUMERIC, yieldday NUMERIC, yieldtotal NUMERIC, irradiation NUMERIC ); CREATE TABLE IF NOT EXISTS inverters_inv ( id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ, FOREIGN KEY (timestamp) REFERENCES log(timestamp), inverter_serial BIGINT, temperature NUMERIC, power_dc NUMERIC, yieldday NUMERIC, yieldtotal NUMERIC, efficiency NUMERIC ); CREATE TABLE IF NOT EXISTS events ( id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, inverter_serial BIGINT, message_id INT, message TEXT, start_time INT, end_time INT ); CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON log (timestamp); CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters (timestamp); CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_ac (timestamp); CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_dc (timestamp); CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_inv (timestamp); CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON events (timestamp); ` _, err := db.Exec(createTableSQL) if err != nil { log.Fatal("Error creating tables: ", err) } timescaleEnabled := os.Getenv("TIMESCALEDB_ENABLED") enableTimescaleDB := ` CREATE EXTENSION IF NOT EXISTS timescaledb; SELECT create_hypertable('log', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters_ac', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters_dc', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('inverters_inv', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('events', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); ` if timescaleEnabled == "true" { _, err := db.Exec(enableTimescaleDB) if err != nil { log.Fatal("Error enabling TimescaleDB: ", err) } } } func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) { timeZone := os.Getenv("TZ") loc, _ := time.LoadLocation(timeZone) timestamp := time.Now().In(loc) // Insert data into log table _, err := db.Exec(` INSERT INTO log (timestamp, power, yieldday, yieldtotal, time_sync, radio_problem, default_password) VALUES ($1, $2, $3, $4, $5, $6, $7); `, timestamp, total.Power.V, total.YieldDay.V, total.YieldTotal.V, hints.TimeSync, hints.RadioProblem, hints.DefaultPassword) if err != nil { log.Println("Error inserting into log table:", err) return } // Get the log ID of the inserted record var logID int err = db.QueryRow("SELECT id FROM log WHERE timestamp = $1", timestamp).Scan(&logID) if err != nil { log.Println("Error getting log ID:", err) return } // Insert data into inverters table _, err = db.Exec(` INSERT INTO inverters (timestamp, inverter_serial, name, producing, limit_relative, limit_absolute) VALUES ($1, $2, $3, $4, $5, $6); `, timestamp, inverter.Serial, inverter.Name, inverter.Producing, inverter.LimitRelative, inverter.LimitAbsolute) if err != nil { log.Println("Error inserting into inverters table:", err) return } // Insert data into inverters_ac table for acNumber, acData := range inverter.AC { _, err := db.Exec(` INSERT INTO inverters_ac (timestamp, inverter_serial, ac_number, power, voltage, current, frequency, powerfactor, reactivepower, efficiency) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9); `, timestamp, inverter.Serial, acNumber, acData.Power.V, acData.Voltage.V, acData.Current.V, acData.Frequency.V, acData.PowerFactor.V, acData.ReactivePower.V) if err != nil { log.Printf("Error inserting into inverters_ac table for AC %s: %v\n", acNumber, err) } } // Insert data into inverters_dc table for dcNumber, dcData := range inverter.DC { _, err := db.Exec(` INSERT INTO inverters_dc (timestamp, inverter_serial, dc_number, name, power, voltage, current, yieldday, yieldtotal, irradiation) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10); `, timestamp, inverter.Serial, dcNumber, dcData.Name.U, dcData.Power.V, dcData.Voltage.V, dcData.Current.V, dcData.YieldDay.V, dcData.YieldTotal.V, dcData.Irradiation.V) if err != nil { log.Printf("Error inserting into inverters_dc table for DC %s: %v\n", dcNumber, err) } } // Insert data into inverters_inv table for invNumber, invData := range inverter.INV { _, err := db.Exec(` INSERT INTO inverters_inv (timestamp, inverter_serial, temperature, efficiency, power_dc, yieldday, yieldtotal) VALUES ($1, $2, $3, $4, $5, $6, $7); `, timestamp, inverter.Serial, invData.Temperature.V, invData.Efficiency.V, invData.PowerDC.V, invData.YieldDay.V, invData.YieldTotal.V) if err != nil { log.Printf("Error inserting into inverters_inv table for INV %s: %v\n", invNumber, err) } } } func queryEventsEndpoint(inverterSerial string) (*EventsResponse, error) { remoteURL := os.Getenv("REMOTE_URL") endpoint := fmt.Sprintf("http://"+remoteURL+"/api/eventlog/status?inv=%s", inverterSerial) resp, err := http.Get(endpoint) if err != nil { return nil, err } defer resp.Body.Close() var eventsResponse EventsResponse if err := json.NewDecoder(resp.Body).Decode(&eventsResponse); err != nil { return nil, err } return &eventsResponse, nil } func getPreviousEventsCount(db *sql.DB, inverterSerial string) int { var count int err := db.QueryRow("SELECT COUNT(*) FROM events WHERE inverter_serial = $1", inverterSerial).Scan(&count) if err != nil && err != sql.ErrNoRows { log.Println("Error querying previous events count:", err) } return count } func insertEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { timestamp := time.Now() for _, event := range events.Events { // Insert events data into the events table _, err := db.Exec(` INSERT INTO events (timestamp, inverter_serial, message_id, message, start_time, end_time) VALUES ($1, $2, $3, $4, $5, $6); `, timestamp, inverterSerial, event.MessageID, event.Message, event.StartTime, event.EndTime) if err != nil { log.Println("Error inserting into events table:", err) } } if len(events.Events) > 0 && events.Events[0].EndTime == 0 { // If end_time is 0, schedule a job to update the corresponding message row every 30 minutes go func() { for { time.Sleep(30 * time.Minute) updatedEvents, err := queryEventsEndpoint(inverterSerial) if err != nil { log.Println("Error querying events endpoint for updates:", err) continue } // Update the corresponding message row updateEvents(db, inverterSerial, updatedEvents) } }() } } func updateEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { for _, event := range events.Events { // Update events data in the events table _, err := db.Exec(` UPDATE events SET end_time = $1 WHERE inverter_serial = $2 AND start_time = $3 AND end_time = 0; `, event.EndTime, inverterSerial, event.StartTime) if err != nil { log.Println("Error updating events table:", err) } } } // TODO: finish this function. // func updateInverterConfig(db *sql.DB) { // // Periodically query the /api/inverter/list // for { // updatedInverterConfig, err := queryConfigEndpoint() // if err != nil { // log.Println("Error querying events endpoint for updates:", err) // continue // } // // Update the corresponding message row // updateEvents(db, inverterSerial, updatedInverterConfig) // time.Sleep(60 * time.Minute) // } // } // func queryConfigEndpoint() (*InverterSettingsData, error) { // remoteURL := os.Getenv("REMOTE_URL") // endpoint := fmt.Sprintf("http://" + remoteURL + "/api/inverter/list") // resp, err := http.Get(endpoint) // if err != nil { // return nil, err // } // defer resp.Body.Close() // var inverterSettingsResponse InverterSettingsData // if err := json.NewDecoder(resp.Body).Decode(&inverterSettingsResponse); err != nil { // return nil, err // } // return &inverterSettingsResponse, nil // }