diff --git a/main.go b/main.go new file mode 100644 index 0000000..1b4f925 --- /dev/null +++ b/main.go @@ -0,0 +1,504 @@ +// TODO: Remove inverters_DC Name and periodically / on request query name only once. +// TODO: Storage optimisation: Map inverter serial to shorter serial. Use that for referring. +// TODO: Use username and password provided using Basic Authentication. +// TODO: Make the timestamp in log a foreign key to other DBs so timescaleDB hypertables can be created. +// Make it unique so foreign key constraints can be used. +// Create separate table for inverter_id, dc_number, name and store it there. +// Do the same for string names. +// Idea: Make a full admin / config GUI and only configure through this utility. +// Idea: Gather settings only on start-up. + +package main + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "time" + _ "time/tzdata" + + "github.com/gorilla/websocket" + _ "github.com/lib/pq" +) + +type VUD struct { + V float64 `json:"v"` + U string `json:"u"` + D int `json:"d"` +} + +type InverterAC struct { + Power VUD `json:"Power"` + Voltage VUD `json:"Voltage"` + Current VUD `json:"Current"` + Frequency VUD `json:"Frequency"` + PowerFactor VUD `json:"PowerFactor"` + ReactivePower VUD `json:"ReactivePower"` +} + +type InverterDC struct { + Name struct { + U string `json:"u"` + } + Power VUD + Voltage VUD + Current VUD + YieldDay VUD + YieldTotal VUD + Irradiation struct { + V float64 `json:"v"` + U string `json:"u"` + D int `json:"d"` + Max int `json:"max"` + } +} + +type InverterINV struct { + Temperature VUD `json:"Temperature"` + Efficiency VUD `json:"Efficiency"` + PowerDC VUD `json:"Power DC"` + YieldDay VUD `json:"YieldDay"` + YieldTotal VUD `json:"YieldTotal"` +} + +// Inverter struct +type Inverter struct { + Serial string `json:"serial"` + Name string `json:"name"` + Producing bool `json:"producing"` + LimitRelative float64 `json:"limit_relative"` + LimitAbsolute float64 `json:"limit_absolute"` + AC map[string]InverterAC `json:"AC"` + DC map[string]InverterDC `json:"DC"` + Events int `json:"events"` + PollEnabled bool `json:"poll_enabled"` + Reachable bool `json:"reachable"` + DataAge int `json:"data_age"` + INV map[string]InverterINV `json:"INV"` +} + +type Total struct { + Power VUD `json:"Power"` + YieldDay VUD `json:"YieldDay"` + YieldTotal VUD `json:"YieldTotal"` +} + +type Hints struct { + TimeSync bool `json:"time_sync"` + RadioProblem bool `json:"radio_problem"` + DefaultPassword bool `json:"default_password"` +} + +type LiveData struct { + Inverters []Inverter `json:"inverters"` + Total Total `json:"total"` + Hints Hints `json:"hints"` +} + +// Events struct +type Event struct { + MessageID int `json:"message_id"` + Message string `json:"message"` + StartTime int `json:"start_time"` + EndTime int `json:"end_time"` +} + +type EventsResponse struct { + Count int `json:"count"` + Events []Event `json:"events"` +} + +// Inverter settings structs +type ChannelSettings struct { + Name string `json:"name"` + MaxPower int `json:"max_power"` + YieldTotalOffset int `json:"yield_total_offset"` +} + +type InverterSettings struct { + ID int `json:"id"` + Name string `json:"name"` + Order int `json:"order"` + Serial string `json:"serial"` + PollEnable bool `json:"poll_enable"` + PollEnableNight bool `json:"poll_enable_night"` + CommandEnable bool `json:"command_enable"` + CommandEnableNight bool `json:"command_enable_night"` + ReachableThreshold int `json:"reachable_threshold"` + ZeroRuntime bool `json:"zero_runtime"` + ZeroDay bool `json:"zero_day"` + Type string `json:"type"` + Channels []ChannelSettings `json:"channel"` +} + +type InverterSettingsData struct { + Inverters []InverterSettings `json:"inverter"` +} + +// Main program +func main() { + dbConnStr := (os.Getenv("DB_URL")) + // Connect to PostgreSQL + db, err := sql.Open("postgres", dbConnStr) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + // Create tables if they don't exist + createTables(db) + + // Get WebSocket URL from environment variable + RemoteURL := os.Getenv("REMOTE_URL") + wsURL := "ws://" + RemoteURL + "/livedata" + if wsURL == "" { + log.Fatal("WEBSOCKET_URL environment variable is not set.") + } + + // Establish WebSocket connection + c, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + log.Fatal(err) + } + defer c.Close() + + // Subscribe to the WebSocket feed + if err := c.WriteMessage(websocket.TextMessage, []byte(`{"subscribe": "livedata"}`)); err != nil { + log.Fatal(err) + } + + // Start listening for WebSocket messages + go func() { + for { + _, message, err := c.ReadMessage() + if err != nil { + log.Println(err) + return + } + + // Handle the received JSON message + handleMessage(message, db) + } + }() + + // go func() { + // updateInverterConfig(db) + // } + + // Keep the program running + select {} +} + +func handleMessage(message []byte, db *sql.DB) { + var liveData LiveData + + // Parse the JSON message into the LiveData struct + if err := json.Unmarshal(message, &liveData); err != nil { + log.Println("Error decoding JSON:", err) + return + } + + // Check conditions for recording data + for _, inverter := range liveData.Inverters { + if inverter.Events > 0 && inverter.Events > getPreviousEventsCount(db, inverter.Serial) { + // Query the endpoint for events + events, err := queryEventsEndpoint(inverter.Serial) + if err != nil { + log.Println("Error querying events endpoint:", err) + continue + } + + // Insert events data into the events table + insertEvents(db, inverter.Serial, events) + } + + if inverter.DataAge == 0 && inverter.Reachable { + // Insert data into PostgreSQL tables + insertLiveData(db, inverter, liveData.Total, liveData.Hints) + fmt.Println("inserting data") + } + } +} + +func createTables(db *sql.DB) { + // Execute SQL statements to create tables if they don't exist + createTableSQL := ` + CREATE TABLE IF NOT EXISTS log ( + id SERIAL PRIMARY KEY, + timestamp TIMESTAMPTZ UNIQUE DEFAULT CURRENT_TIMESTAMP, + power NUMERIC, + yieldday NUMERIC, + yieldtotal NUMERIC, + time_sync BOOL, + radio_problem BOOL, + default_password BOOL + ); + + CREATE TABLE IF NOT EXISTS inverters ( + id SERIAL PRIMARY KEY, + timestamp TIMESTAMPTZ, + FOREIGN KEY (timestamp) REFERENCES log(timestamp), + inverter_serial BIGINT, + name TEXT, + producing BOOL, + limit_relative NUMERIC, + limit_absolute NUMERIC + ); + + CREATE TABLE IF NOT EXISTS inverters_ac ( + id SERIAL PRIMARY KEY, + timestamp TIMESTAMPTZ, + FOREIGN KEY (timestamp) REFERENCES log(timestamp), + inverter_serial BIGINT, + ac_number INT, + power NUMERIC, + voltage NUMERIC, + current NUMERIC, + frequency NUMERIC, + powerfactor NUMERIC, + reactivepower NUMERIC + ); + + CREATE TABLE IF NOT EXISTS inverters_dc ( + id SERIAL PRIMARY KEY, + timestamp TIMESTAMPTZ, + FOREIGN KEY (timestamp) REFERENCES log(timestamp), + inverter_serial BIGINT, + dc_number INT, + name TEXT, + power NUMERIC, + voltage NUMERIC, + current NUMERIC, + yieldday NUMERIC, + yieldtotal NUMERIC, + irradiation NUMERIC + ); + + CREATE TABLE IF NOT EXISTS inverters_inv ( + id SERIAL PRIMARY KEY, + timestamp TIMESTAMPTZ, + FOREIGN KEY (timestamp) REFERENCES log(timestamp), + inverter_serial BIGINT, + temperature NUMERIC, + power_dc NUMERIC, + yieldday NUMERIC, + yieldtotal NUMERIC, + efficiency NUMERIC + ); + + CREATE TABLE IF NOT EXISTS events ( + id SERIAL PRIMARY KEY, + timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + inverter_serial BIGINT, + message_id INT, + message TEXT, + start_time INT, + end_time INT + ); + + CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON log (timestamp); + CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters (timestamp); + CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_ac (timestamp); + CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_dc (timestamp); + CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_inv (timestamp); + CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON events (timestamp); + ` + + _, err := db.Exec(createTableSQL) + if err != nil { + log.Fatal("Error creating tables: ", err) + } + timescaleEnabled := os.Getenv("TIMESCALEDB_ENABLED") + + enableTimescaleDB := ` + CREATE EXTENSION IF NOT EXISTS timescaledb; + SELECT create_hypertable('log', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); + SELECT create_hypertable('inverters', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); + SELECT create_hypertable('inverters_ac', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); + SELECT create_hypertable('inverters_dc', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); + SELECT create_hypertable('inverters_inv', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); + SELECT create_hypertable('events', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE); + ` + if timescaleEnabled == "true" { + _, err := db.Exec(enableTimescaleDB) + if err != nil { + log.Fatal("Error enabling TimescaleDB: ", err) + } + } +} + +func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) { + timeZone := os.Getenv("TZ") + loc, _ := time.LoadLocation(timeZone) + timestamp := time.Now().In(loc) + + // Insert data into log table + _, err := db.Exec(` + INSERT INTO log (timestamp, power, yieldday, yieldtotal, time_sync, radio_problem, default_password) + VALUES ($1, $2, $3, $4, $5, $6, $7); + `, timestamp, total.Power.V, total.YieldDay.V, total.YieldTotal.V, hints.TimeSync, hints.RadioProblem, hints.DefaultPassword) + if err != nil { + log.Println("Error inserting into log table:", err) + return + } + + // Get the log ID of the inserted record + var logID int + err = db.QueryRow("SELECT id FROM log WHERE timestamp = $1", timestamp).Scan(&logID) + if err != nil { + log.Println("Error getting log ID:", err) + return + } + + // Insert data into inverters table + _, err = db.Exec(` + INSERT INTO inverters (timestamp, inverter_serial, name, producing, limit_relative, limit_absolute) + VALUES ($1, $2, $3, $4, $5, $6); + `, timestamp, inverter.Serial, inverter.Name, inverter.Producing, inverter.LimitRelative, inverter.LimitAbsolute) + if err != nil { + log.Println("Error inserting into inverters table:", err) + return + } + + // Insert data into inverters_ac table + for acNumber, acData := range inverter.AC { + _, err := db.Exec(` + INSERT INTO inverters_ac (timestamp, inverter_serial, ac_number, power, voltage, current, frequency, powerfactor, reactivepower, efficiency) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9); + `, timestamp, inverter.Serial, acNumber, acData.Power.V, acData.Voltage.V, acData.Current.V, acData.Frequency.V, acData.PowerFactor.V, acData.ReactivePower.V) + if err != nil { + log.Printf("Error inserting into inverters_ac table for AC %s: %v\n", acNumber, err) + } + } + + // Insert data into inverters_dc table + for dcNumber, dcData := range inverter.DC { + _, err := db.Exec(` + INSERT INTO inverters_dc (timestamp, inverter_serial, dc_number, name, power, voltage, current, yieldday, yieldtotal, irradiation) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10); + `, timestamp, inverter.Serial, dcNumber, dcData.Name.U, dcData.Power.V, dcData.Voltage.V, dcData.Current.V, dcData.YieldDay.V, dcData.YieldTotal.V, dcData.Irradiation.V) + if err != nil { + log.Printf("Error inserting into inverters_dc table for DC %s: %v\n", dcNumber, err) + } + } + + // Insert data into inverters_inv table + for invNumber, invData := range inverter.INV { + _, err := db.Exec(` + INSERT INTO inverters_inv (timestamp, inverter_serial, temperature, efficiency, power_dc, yieldday, yieldtotal) + VALUES ($1, $2, $3, $4, $5, $6, $7); + `, timestamp, inverter.Serial, invData.Temperature.V, invData.Efficiency.V, invData.PowerDC.V, invData.YieldDay.V, invData.YieldTotal.V) + if err != nil { + log.Printf("Error inserting into inverters_inv table for INV %s: %v\n", invNumber, err) + } + } +} + +func queryEventsEndpoint(inverterSerial string) (*EventsResponse, error) { + remoteURL := os.Getenv("REMOTE_URL") + endpoint := fmt.Sprintf("http://"+remoteURL+"/api/eventlog/status?inv=%s", inverterSerial) + + resp, err := http.Get(endpoint) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var eventsResponse EventsResponse + if err := json.NewDecoder(resp.Body).Decode(&eventsResponse); err != nil { + return nil, err + } + + return &eventsResponse, nil +} + +func getPreviousEventsCount(db *sql.DB, inverterSerial string) int { + var count int + err := db.QueryRow("SELECT COUNT(*) FROM events WHERE inverter_serial = $1", inverterSerial).Scan(&count) + if err != nil && err != sql.ErrNoRows { + log.Println("Error querying previous events count:", err) + } + return count +} + +func insertEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { + timestamp := time.Now() + + for _, event := range events.Events { + // Insert events data into the events table + _, err := db.Exec(` + INSERT INTO events (timestamp, inverter_serial, message_id, message, start_time, end_time) + VALUES ($1, $2, $3, $4, $5, $6); + `, timestamp, inverterSerial, event.MessageID, event.Message, event.StartTime, event.EndTime) + if err != nil { + log.Println("Error inserting into events table:", err) + } + } + + if len(events.Events) > 0 && events.Events[0].EndTime == 0 { + // If end_time is 0, schedule a job to update the corresponding message row every 30 minutes + go func() { + for { + time.Sleep(30 * time.Minute) + updatedEvents, err := queryEventsEndpoint(inverterSerial) + if err != nil { + log.Println("Error querying events endpoint for updates:", err) + continue + } + + // Update the corresponding message row + updateEvents(db, inverterSerial, updatedEvents) + } + }() + } +} + +func updateEvents(db *sql.DB, inverterSerial string, events *EventsResponse) { + for _, event := range events.Events { + // Update events data in the events table + _, err := db.Exec(` + UPDATE events SET end_time = $1 WHERE inverter_serial = $2 AND start_time = $3 AND end_time = 0; + `, event.EndTime, inverterSerial, event.StartTime) + if err != nil { + log.Println("Error updating events table:", err) + } + } +} + +// TODO: finish this function. +// func updateInverterConfig(db *sql.DB) { +// // Periodically query the /api/inverter/list +// for { +// updatedInverterConfig, err := queryConfigEndpoint() +// if err != nil { +// log.Println("Error querying events endpoint for updates:", err) +// continue +// } + +// // Update the corresponding message row +// updateEvents(db, inverterSerial, updatedInverterConfig) + +// time.Sleep(60 * time.Minute) +// } +// } + +// func queryConfigEndpoint() (*InverterSettingsData, error) { +// remoteURL := os.Getenv("REMOTE_URL") +// endpoint := fmt.Sprintf("http://" + remoteURL + "/api/inverter/list") + +// resp, err := http.Get(endpoint) +// if err != nil { +// return nil, err +// } +// defer resp.Body.Close() + +// var inverterSettingsResponse InverterSettingsData +// if err := json.NewDecoder(resp.Body).Decode(&inverterSettingsResponse); err != nil { +// return nil, err +// } + +// return &inverterSettingsResponse, nil +// }