This commit is contained in:
parent
f62f85e788
commit
de11154b96
1 changed files with 504 additions and 0 deletions
504
main.go
Normal file
504
main.go
Normal file
|
@ -0,0 +1,504 @@
|
|||
// TODO: Remove inverters_DC Name and periodically / on request query name only once.
|
||||
// TODO: Storage optimisation: Map inverter serial to shorter serial. Use that for referring.
|
||||
// TODO: Use username and password provided using Basic Authentication.
|
||||
// TODO: Make the timestamp in log a foreign key to other DBs so timescaleDB hypertables can be created.
|
||||
// Make it unique so foreign key constraints can be used.
|
||||
// Create separate table for inverter_id, dc_number, name and store it there.
|
||||
// Do the same for string names.
|
||||
// Idea: Make a full admin / config GUI and only configure through this utility.
|
||||
// Idea: Gather settings only on start-up.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
_ "time/tzdata"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
type VUD struct {
|
||||
V float64 `json:"v"`
|
||||
U string `json:"u"`
|
||||
D int `json:"d"`
|
||||
}
|
||||
|
||||
type InverterAC struct {
|
||||
Power VUD `json:"Power"`
|
||||
Voltage VUD `json:"Voltage"`
|
||||
Current VUD `json:"Current"`
|
||||
Frequency VUD `json:"Frequency"`
|
||||
PowerFactor VUD `json:"PowerFactor"`
|
||||
ReactivePower VUD `json:"ReactivePower"`
|
||||
}
|
||||
|
||||
type InverterDC struct {
|
||||
Name struct {
|
||||
U string `json:"u"`
|
||||
}
|
||||
Power VUD
|
||||
Voltage VUD
|
||||
Current VUD
|
||||
YieldDay VUD
|
||||
YieldTotal VUD
|
||||
Irradiation struct {
|
||||
V float64 `json:"v"`
|
||||
U string `json:"u"`
|
||||
D int `json:"d"`
|
||||
Max int `json:"max"`
|
||||
}
|
||||
}
|
||||
|
||||
type InverterINV struct {
|
||||
Temperature VUD `json:"Temperature"`
|
||||
Efficiency VUD `json:"Efficiency"`
|
||||
PowerDC VUD `json:"Power DC"`
|
||||
YieldDay VUD `json:"YieldDay"`
|
||||
YieldTotal VUD `json:"YieldTotal"`
|
||||
}
|
||||
|
||||
// Inverter struct
|
||||
type Inverter struct {
|
||||
Serial string `json:"serial"`
|
||||
Name string `json:"name"`
|
||||
Producing bool `json:"producing"`
|
||||
LimitRelative float64 `json:"limit_relative"`
|
||||
LimitAbsolute float64 `json:"limit_absolute"`
|
||||
AC map[string]InverterAC `json:"AC"`
|
||||
DC map[string]InverterDC `json:"DC"`
|
||||
Events int `json:"events"`
|
||||
PollEnabled bool `json:"poll_enabled"`
|
||||
Reachable bool `json:"reachable"`
|
||||
DataAge int `json:"data_age"`
|
||||
INV map[string]InverterINV `json:"INV"`
|
||||
}
|
||||
|
||||
type Total struct {
|
||||
Power VUD `json:"Power"`
|
||||
YieldDay VUD `json:"YieldDay"`
|
||||
YieldTotal VUD `json:"YieldTotal"`
|
||||
}
|
||||
|
||||
type Hints struct {
|
||||
TimeSync bool `json:"time_sync"`
|
||||
RadioProblem bool `json:"radio_problem"`
|
||||
DefaultPassword bool `json:"default_password"`
|
||||
}
|
||||
|
||||
type LiveData struct {
|
||||
Inverters []Inverter `json:"inverters"`
|
||||
Total Total `json:"total"`
|
||||
Hints Hints `json:"hints"`
|
||||
}
|
||||
|
||||
// Events struct
|
||||
type Event struct {
|
||||
MessageID int `json:"message_id"`
|
||||
Message string `json:"message"`
|
||||
StartTime int `json:"start_time"`
|
||||
EndTime int `json:"end_time"`
|
||||
}
|
||||
|
||||
type EventsResponse struct {
|
||||
Count int `json:"count"`
|
||||
Events []Event `json:"events"`
|
||||
}
|
||||
|
||||
// Inverter settings structs
|
||||
type ChannelSettings struct {
|
||||
Name string `json:"name"`
|
||||
MaxPower int `json:"max_power"`
|
||||
YieldTotalOffset int `json:"yield_total_offset"`
|
||||
}
|
||||
|
||||
type InverterSettings struct {
|
||||
ID int `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Order int `json:"order"`
|
||||
Serial string `json:"serial"`
|
||||
PollEnable bool `json:"poll_enable"`
|
||||
PollEnableNight bool `json:"poll_enable_night"`
|
||||
CommandEnable bool `json:"command_enable"`
|
||||
CommandEnableNight bool `json:"command_enable_night"`
|
||||
ReachableThreshold int `json:"reachable_threshold"`
|
||||
ZeroRuntime bool `json:"zero_runtime"`
|
||||
ZeroDay bool `json:"zero_day"`
|
||||
Type string `json:"type"`
|
||||
Channels []ChannelSettings `json:"channel"`
|
||||
}
|
||||
|
||||
type InverterSettingsData struct {
|
||||
Inverters []InverterSettings `json:"inverter"`
|
||||
}
|
||||
|
||||
// Main program
|
||||
func main() {
|
||||
dbConnStr := (os.Getenv("DB_URL"))
|
||||
// Connect to PostgreSQL
|
||||
db, err := sql.Open("postgres", dbConnStr)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Create tables if they don't exist
|
||||
createTables(db)
|
||||
|
||||
// Get WebSocket URL from environment variable
|
||||
RemoteURL := os.Getenv("REMOTE_URL")
|
||||
wsURL := "ws://" + RemoteURL + "/livedata"
|
||||
if wsURL == "" {
|
||||
log.Fatal("WEBSOCKET_URL environment variable is not set.")
|
||||
}
|
||||
|
||||
// Establish WebSocket connection
|
||||
c, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
// Subscribe to the WebSocket feed
|
||||
if err := c.WriteMessage(websocket.TextMessage, []byte(`{"subscribe": "livedata"}`)); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Start listening for WebSocket messages
|
||||
go func() {
|
||||
for {
|
||||
_, message, err := c.ReadMessage()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Handle the received JSON message
|
||||
handleMessage(message, db)
|
||||
}
|
||||
}()
|
||||
|
||||
// go func() {
|
||||
// updateInverterConfig(db)
|
||||
// }
|
||||
|
||||
// Keep the program running
|
||||
select {}
|
||||
}
|
||||
|
||||
func handleMessage(message []byte, db *sql.DB) {
|
||||
var liveData LiveData
|
||||
|
||||
// Parse the JSON message into the LiveData struct
|
||||
if err := json.Unmarshal(message, &liveData); err != nil {
|
||||
log.Println("Error decoding JSON:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Check conditions for recording data
|
||||
for _, inverter := range liveData.Inverters {
|
||||
if inverter.Events > 0 && inverter.Events > getPreviousEventsCount(db, inverter.Serial) {
|
||||
// Query the endpoint for events
|
||||
events, err := queryEventsEndpoint(inverter.Serial)
|
||||
if err != nil {
|
||||
log.Println("Error querying events endpoint:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Insert events data into the events table
|
||||
insertEvents(db, inverter.Serial, events)
|
||||
}
|
||||
|
||||
if inverter.DataAge == 0 && inverter.Reachable {
|
||||
// Insert data into PostgreSQL tables
|
||||
insertLiveData(db, inverter, liveData.Total, liveData.Hints)
|
||||
fmt.Println("inserting data")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createTables(db *sql.DB) {
|
||||
// Execute SQL statements to create tables if they don't exist
|
||||
createTableSQL := `
|
||||
CREATE TABLE IF NOT EXISTS log (
|
||||
id SERIAL PRIMARY KEY,
|
||||
timestamp TIMESTAMPTZ UNIQUE DEFAULT CURRENT_TIMESTAMP,
|
||||
power NUMERIC,
|
||||
yieldday NUMERIC,
|
||||
yieldtotal NUMERIC,
|
||||
time_sync BOOL,
|
||||
radio_problem BOOL,
|
||||
default_password BOOL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS inverters (
|
||||
id SERIAL PRIMARY KEY,
|
||||
timestamp TIMESTAMPTZ,
|
||||
FOREIGN KEY (timestamp) REFERENCES log(timestamp),
|
||||
inverter_serial BIGINT,
|
||||
name TEXT,
|
||||
producing BOOL,
|
||||
limit_relative NUMERIC,
|
||||
limit_absolute NUMERIC
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS inverters_ac (
|
||||
id SERIAL PRIMARY KEY,
|
||||
timestamp TIMESTAMPTZ,
|
||||
FOREIGN KEY (timestamp) REFERENCES log(timestamp),
|
||||
inverter_serial BIGINT,
|
||||
ac_number INT,
|
||||
power NUMERIC,
|
||||
voltage NUMERIC,
|
||||
current NUMERIC,
|
||||
frequency NUMERIC,
|
||||
powerfactor NUMERIC,
|
||||
reactivepower NUMERIC
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS inverters_dc (
|
||||
id SERIAL PRIMARY KEY,
|
||||
timestamp TIMESTAMPTZ,
|
||||
FOREIGN KEY (timestamp) REFERENCES log(timestamp),
|
||||
inverter_serial BIGINT,
|
||||
dc_number INT,
|
||||
name TEXT,
|
||||
power NUMERIC,
|
||||
voltage NUMERIC,
|
||||
current NUMERIC,
|
||||
yieldday NUMERIC,
|
||||
yieldtotal NUMERIC,
|
||||
irradiation NUMERIC
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS inverters_inv (
|
||||
id SERIAL PRIMARY KEY,
|
||||
timestamp TIMESTAMPTZ,
|
||||
FOREIGN KEY (timestamp) REFERENCES log(timestamp),
|
||||
inverter_serial BIGINT,
|
||||
temperature NUMERIC,
|
||||
power_dc NUMERIC,
|
||||
yieldday NUMERIC,
|
||||
yieldtotal NUMERIC,
|
||||
efficiency NUMERIC
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
id SERIAL PRIMARY KEY,
|
||||
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
|
||||
inverter_serial BIGINT,
|
||||
message_id INT,
|
||||
message TEXT,
|
||||
start_time INT,
|
||||
end_time INT
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON log (timestamp);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters (timestamp);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_ac (timestamp);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_dc (timestamp);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON inverters_inv (timestamp);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON events (timestamp);
|
||||
`
|
||||
|
||||
_, err := db.Exec(createTableSQL)
|
||||
if err != nil {
|
||||
log.Fatal("Error creating tables: ", err)
|
||||
}
|
||||
timescaleEnabled := os.Getenv("TIMESCALEDB_ENABLED")
|
||||
|
||||
enableTimescaleDB := `
|
||||
CREATE EXTENSION IF NOT EXISTS timescaledb;
|
||||
SELECT create_hypertable('log', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
|
||||
SELECT create_hypertable('inverters', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
|
||||
SELECT create_hypertable('inverters_ac', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
|
||||
SELECT create_hypertable('inverters_dc', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
|
||||
SELECT create_hypertable('inverters_inv', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
|
||||
SELECT create_hypertable('events', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);
|
||||
`
|
||||
if timescaleEnabled == "true" {
|
||||
_, err := db.Exec(enableTimescaleDB)
|
||||
if err != nil {
|
||||
log.Fatal("Error enabling TimescaleDB: ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func insertLiveData(db *sql.DB, inverter Inverter, total Total, hints Hints) {
|
||||
timeZone := os.Getenv("TZ")
|
||||
loc, _ := time.LoadLocation(timeZone)
|
||||
timestamp := time.Now().In(loc)
|
||||
|
||||
// Insert data into log table
|
||||
_, err := db.Exec(`
|
||||
INSERT INTO log (timestamp, power, yieldday, yieldtotal, time_sync, radio_problem, default_password)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7);
|
||||
`, timestamp, total.Power.V, total.YieldDay.V, total.YieldTotal.V, hints.TimeSync, hints.RadioProblem, hints.DefaultPassword)
|
||||
if err != nil {
|
||||
log.Println("Error inserting into log table:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Get the log ID of the inserted record
|
||||
var logID int
|
||||
err = db.QueryRow("SELECT id FROM log WHERE timestamp = $1", timestamp).Scan(&logID)
|
||||
if err != nil {
|
||||
log.Println("Error getting log ID:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Insert data into inverters table
|
||||
_, err = db.Exec(`
|
||||
INSERT INTO inverters (timestamp, inverter_serial, name, producing, limit_relative, limit_absolute)
|
||||
VALUES ($1, $2, $3, $4, $5, $6);
|
||||
`, timestamp, inverter.Serial, inverter.Name, inverter.Producing, inverter.LimitRelative, inverter.LimitAbsolute)
|
||||
if err != nil {
|
||||
log.Println("Error inserting into inverters table:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Insert data into inverters_ac table
|
||||
for acNumber, acData := range inverter.AC {
|
||||
_, err := db.Exec(`
|
||||
INSERT INTO inverters_ac (timestamp, inverter_serial, ac_number, power, voltage, current, frequency, powerfactor, reactivepower, efficiency)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9);
|
||||
`, timestamp, inverter.Serial, acNumber, acData.Power.V, acData.Voltage.V, acData.Current.V, acData.Frequency.V, acData.PowerFactor.V, acData.ReactivePower.V)
|
||||
if err != nil {
|
||||
log.Printf("Error inserting into inverters_ac table for AC %s: %v\n", acNumber, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Insert data into inverters_dc table
|
||||
for dcNumber, dcData := range inverter.DC {
|
||||
_, err := db.Exec(`
|
||||
INSERT INTO inverters_dc (timestamp, inverter_serial, dc_number, name, power, voltage, current, yieldday, yieldtotal, irradiation)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);
|
||||
`, timestamp, inverter.Serial, dcNumber, dcData.Name.U, dcData.Power.V, dcData.Voltage.V, dcData.Current.V, dcData.YieldDay.V, dcData.YieldTotal.V, dcData.Irradiation.V)
|
||||
if err != nil {
|
||||
log.Printf("Error inserting into inverters_dc table for DC %s: %v\n", dcNumber, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Insert data into inverters_inv table
|
||||
for invNumber, invData := range inverter.INV {
|
||||
_, err := db.Exec(`
|
||||
INSERT INTO inverters_inv (timestamp, inverter_serial, temperature, efficiency, power_dc, yieldday, yieldtotal)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7);
|
||||
`, timestamp, inverter.Serial, invData.Temperature.V, invData.Efficiency.V, invData.PowerDC.V, invData.YieldDay.V, invData.YieldTotal.V)
|
||||
if err != nil {
|
||||
log.Printf("Error inserting into inverters_inv table for INV %s: %v\n", invNumber, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func queryEventsEndpoint(inverterSerial string) (*EventsResponse, error) {
|
||||
remoteURL := os.Getenv("REMOTE_URL")
|
||||
endpoint := fmt.Sprintf("http://"+remoteURL+"/api/eventlog/status?inv=%s", inverterSerial)
|
||||
|
||||
resp, err := http.Get(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var eventsResponse EventsResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&eventsResponse); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &eventsResponse, nil
|
||||
}
|
||||
|
||||
func getPreviousEventsCount(db *sql.DB, inverterSerial string) int {
|
||||
var count int
|
||||
err := db.QueryRow("SELECT COUNT(*) FROM events WHERE inverter_serial = $1", inverterSerial).Scan(&count)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
log.Println("Error querying previous events count:", err)
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func insertEvents(db *sql.DB, inverterSerial string, events *EventsResponse) {
|
||||
timestamp := time.Now()
|
||||
|
||||
for _, event := range events.Events {
|
||||
// Insert events data into the events table
|
||||
_, err := db.Exec(`
|
||||
INSERT INTO events (timestamp, inverter_serial, message_id, message, start_time, end_time)
|
||||
VALUES ($1, $2, $3, $4, $5, $6);
|
||||
`, timestamp, inverterSerial, event.MessageID, event.Message, event.StartTime, event.EndTime)
|
||||
if err != nil {
|
||||
log.Println("Error inserting into events table:", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(events.Events) > 0 && events.Events[0].EndTime == 0 {
|
||||
// If end_time is 0, schedule a job to update the corresponding message row every 30 minutes
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(30 * time.Minute)
|
||||
updatedEvents, err := queryEventsEndpoint(inverterSerial)
|
||||
if err != nil {
|
||||
log.Println("Error querying events endpoint for updates:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Update the corresponding message row
|
||||
updateEvents(db, inverterSerial, updatedEvents)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func updateEvents(db *sql.DB, inverterSerial string, events *EventsResponse) {
|
||||
for _, event := range events.Events {
|
||||
// Update events data in the events table
|
||||
_, err := db.Exec(`
|
||||
UPDATE events SET end_time = $1 WHERE inverter_serial = $2 AND start_time = $3 AND end_time = 0;
|
||||
`, event.EndTime, inverterSerial, event.StartTime)
|
||||
if err != nil {
|
||||
log.Println("Error updating events table:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: finish this function.
|
||||
// func updateInverterConfig(db *sql.DB) {
|
||||
// // Periodically query the /api/inverter/list
|
||||
// for {
|
||||
// updatedInverterConfig, err := queryConfigEndpoint()
|
||||
// if err != nil {
|
||||
// log.Println("Error querying events endpoint for updates:", err)
|
||||
// continue
|
||||
// }
|
||||
|
||||
// // Update the corresponding message row
|
||||
// updateEvents(db, inverterSerial, updatedInverterConfig)
|
||||
|
||||
// time.Sleep(60 * time.Minute)
|
||||
// }
|
||||
// }
|
||||
|
||||
// func queryConfigEndpoint() (*InverterSettingsData, error) {
|
||||
// remoteURL := os.Getenv("REMOTE_URL")
|
||||
// endpoint := fmt.Sprintf("http://" + remoteURL + "/api/inverter/list")
|
||||
|
||||
// resp, err := http.Get(endpoint)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// defer resp.Body.Close()
|
||||
|
||||
// var inverterSettingsResponse InverterSettingsData
|
||||
// if err := json.NewDecoder(resp.Body).Decode(&inverterSettingsResponse); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// return &inverterSettingsResponse, nil
|
||||
// }
|
Loading…
Reference in a new issue