diff --git a/go.mod b/go.mod index 19b30ab..8973197 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,22 @@ module git.hollander.online/energy/p1-logger -go 1.21 +go 1.22 require ( github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/lib/pq v1.10.9 + github.com/pressly/goose/v3 v3.21.1 +) + +require ( + github.com/mfridman/interpolate v0.0.2 // indirect + github.com/sethvargo/go-retry v0.2.4 // indirect + go.uber.org/multierr v1.11.0 // indirect ) require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/joho/godotenv v1.5.1 - golang.org/x/net v0.8.0 // indirect - golang.org/x/sync v0.1.0 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sync v0.7.0 // indirect ) diff --git a/go.sum b/go.sum index e1c9c1a..474decb 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,56 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY= +github.com/mfridman/interpolate v0.0.2/go.mod h1:p+7uk6oE07mpE/Ik1b8EckO0O4ZXiGAfshKBWLUM9Xg= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pressly/goose/v3 v3.21.1 h1:5SSAKKWej8LVVzNLuT6KIvP1eFDuPvxa+B6H0w78buQ= +github.com/pressly/goose/v3 v3.21.1/go.mod h1:sqthmzV8PitchEkjecFJII//l43dLOCzfWh8pHEe+vE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec= +github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= +modernc.org/libc v1.41.0 h1:g9YAc6BkKlgORsUWj+JwqoB1wU3o4DE3bM3yvA3k+Gk= +modernc.org/libc v1.41.0/go.mod h1:w0eszPsiXoOnoMJgrXjglgLuDy/bt5RR4y3QzUUeodY= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E= +modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= +modernc.org/sqlite v1.29.6 h1:0lOXGrycJPptfHDuohfYgNqoe4hu+gYuN/pKgY5XjS4= +modernc.org/sqlite v1.29.6/go.mod h1:S02dvcmm7TnTRvGhv8IGYyLnIt7AS2KPaB1F/71p75U= +modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= +modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/main.go b/main.go index a106f42..02c569c 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "fmt" + "io/fs" "log" "log/slog" "os" @@ -14,6 +15,9 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/joho/godotenv" _ "github.com/lib/pq" + "github.com/pressly/goose/v3" + + "git.hollander.online/energy/p1-logger/migrations" ) // Payload struct @@ -93,6 +97,12 @@ type Payload struct { R *int `json:"r"` // Returning / exporting (W) F *int `json:"f"` // Failure (counter) Fl *int `json:"fl"` // Failure long duration (counter) + Sa1 *int `json:"sa1"` // Number of voltage sags L1 + Sa2 *int `json:"sa2"` // Number of voltage sags L2 + Sa3 *int `json:"sa3"` // Number of voltage sags L3 + Sw1 *int `json:"sw1"` // Number of voltage swells L1 + Sw2 *int `json:"sw2"` // Number of voltage swells L1 + Sw3 *int `json:"sw3"` // Number of voltage swells L1 G *int `json:"g"` // Gas meter reading (l) V1 int `json:"v1"` // Voltage L1 (V) V2 int `json:"v2"` // Voltage L2 (V) @@ -106,6 +116,14 @@ type Payload struct { R1 *int `json:"r1"` // Returning / exporting L1 (W) R2 *int `json:"r2"` // Returning / exporting L2 (W) R3 *int `json:"r3"` // Returning / exporting L3 (W) + + GBe *int `json:"gbe"` // Gas meter reading (l) (Belgium) + CAQDBe *int `json:"aeicad"` // Current Average Quarterly Demand for Peak Tariff (Belgium) + CMMQDBe *int `json:"cmmqd"` // Current Month’s Maximum Quarterly Demand for Peak Tarrif (Belgium) + TMMQDBe *int `json:"13mmqd"` // 13 Month Maximum Quarterly Demand for Peak Tarrif (Belgium). + + DLu *int `json:"dlu"` // Energy Delivered (Luxembourg) + RLu *int `json:"rlu"` // Energy Returned (Luxembourg) } type Config struct { @@ -281,8 +299,11 @@ func safeDerefInt(ptr *int) string { } var prevDt1, prevDt2, prevRt1, prevRt2, prevG, prevF, prevFl int +var prevSa1, prevSa2, prevSa3, prevSw1, prevSw2, prevSw3 int var prevD, prevR int var prevD1, prevD2, prevD3, prevR1, prevR2, prevR3 int +var prevGBe, prevCAQDBe, prevCMMQDBe, prevTMMQDBe int +var prevDLu, prevRLu int func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { // Parse JSON payload @@ -324,6 +345,22 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { payload.Fl, tempChanged = updateFieldIfChanged(payload.Fl, &prevFl) changed = changed || tempChanged + // Sags + payload.Sa1, tempChanged = updateFieldIfChanged(payload.Sa1, &prevSa1) + changed = changed || tempChanged + payload.Sa2, tempChanged = updateFieldIfChanged(payload.Sa2, &prevSa2) + changed = changed || tempChanged + payload.Sa3, tempChanged = updateFieldIfChanged(payload.Sa3, &prevSa3) + changed = changed || tempChanged + + // Swells + payload.Sw1, tempChanged = updateFieldIfChanged(payload.Sw1, &prevSw1) + changed = changed || tempChanged + payload.Sw2, tempChanged = updateFieldIfChanged(payload.Sw2, &prevSw2) + changed = changed || tempChanged + payload.Sw3, tempChanged = updateFieldIfChanged(payload.Sw3, &prevSw3) + changed = changed || tempChanged + // Gas payload.G, tempChanged = updateFieldIfChanged(payload.G, &prevG) changed = changed || tempChanged @@ -350,6 +387,22 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { payload.R3, tempChanged = updateFieldIfChanged(payload.R3, &prevR3) changed = changed || tempChanged + // Belgium + payload.GBe, tempChanged = updateFieldIfChanged(payload.GBe, &prevGBe) + changed = changed || tempChanged + payload.CAQDBe, tempChanged = updateFieldIfChanged(payload.CAQDBe, &prevCAQDBe) + changed = changed || tempChanged + payload.CMMQDBe, tempChanged = updateFieldIfChanged(payload.CMMQDBe, &prevCMMQDBe) + changed = changed || tempChanged + payload.TMMQDBe, tempChanged = updateFieldIfChanged(payload.TMMQDBe, &prevTMMQDBe) + changed = changed || tempChanged + + // Luxembourg + payload.DLu, tempChanged = updateFieldIfChanged(payload.DLu, &prevDLu) + changed = changed || tempChanged + payload.RLu, tempChanged = updateFieldIfChanged(payload.RLu, &prevRLu) + changed = changed || tempChanged + // If any value has changed, log all the relevant values if changed { logger.Debug("Values changed", @@ -363,6 +416,15 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { "f", safeDerefInt(payload.F), "fl", safeDerefInt(payload.Fl), + + "sa1", safeDerefInt(payload.Sa1), + "sa2", safeDerefInt(payload.Sa2), + "sa3", safeDerefInt(payload.Sa3), + + "sw1", safeDerefInt(payload.Sw1), + "sw2", safeDerefInt(payload.Sw2), + "sw3", safeDerefInt(payload.Sw3), + "g", safeDerefInt(payload.G), "d1", safeDerefInt(payload.D1), @@ -372,6 +434,14 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { "r1", safeDerefInt(payload.R1), "r2", safeDerefInt(payload.R2), "r3", safeDerefInt(payload.R3), + + "gbe", safeDerefInt(payload.GBe), + "caqdbe", safeDerefInt(payload.CAQDBe), + "cmmqdbe", safeDerefInt(payload.CMMQDBe), + "tmmqdbe", safeDerefInt(payload.TMMQDBe), + + "dlu", safeDerefInt(payload.DLu), + "rlu", safeDerefInt(payload.RLu), ) } // Insert data into PostgreSQL @@ -398,7 +468,17 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { "d3", payload.D3, "r1", payload.R1, "r2", payload.R2, - "r3", payload.R3) + "r3", payload.R3, + + "gbe", payload.GBe, + "caqdbe", payload.CAQDBe, + "cmmqdbe", payload.CMMQDBe, + "tmmqdbe", payload.TMMQDBe, + + "dlu", payload.DLu, + "rlu", payload.RLu, + ) + if err != nil { logger.Error("Error inserting data into PostgreSQL", "error", err) } @@ -435,12 +515,16 @@ func insertData(timestamp time.Time, payload Payload) error { stmt := ` INSERT INTO p1 ( timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2, - delivery_all, returning_all, failures, long_failures, gas, + delivery_all, returning_all, + failures, long_failures, + sags_l1, sags_l2, sags_l3, + swells_l1, swells_l2, swells_l3, + gas, voltage_l1, voltage_l2, voltage_l3, current_l1, current_l2, current_l3, delivery_l1, delivery_l2, delivery_l3, returning_l1, returning_l2, returning_l3 - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22) + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28) ` _, err := db.Exec( stmt, @@ -449,6 +533,8 @@ func insertData(timestamp time.Time, payload Payload) error { payload.Rt1, payload.Rt2, payload.D, payload.R, payload.F, payload.Fl, + payload.Sa1, payload.Sa2, payload.Sa3, + payload.Sw1, payload.Sw2, payload.Sw3, payload.G, payload.V1, payload.V2, payload.V3, payload.C1, payload.C2, payload.C3, @@ -471,36 +557,10 @@ func connectToPostgreSQL(pgConnStr string) error { time.Sleep(5 * time.Second) // Retry after 5 seconds } - // Create table if not exists - _, err = db.Exec(` - CREATE TABLE IF NOT EXISTS p1 ( - timestamp TIMESTAMPTZ, - delivered_tariff1 INT, - delivered_tariff2 INT, - returned_tariff1 INT, - returned_tariff2 INT, - delivery_all INT, - returning_all INT, - failures INT, - long_failures INT, - gas INT, - voltage_l1 INT, - voltage_l2 INT, - voltage_l3 INT, - current_l1 INT, - current_l2 INT, - current_l3 INT, - delivery_l1 INT, - delivery_l2 INT, - delivery_l3 INT, - returning_l1 INT, - returning_l2 INT, - returning_l3 INT - ); - -- CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON p1 (timestamp); - `) + // Perform DB migrations + err = MigrateFS(db, migrations.FS, ".") if err != nil { - log.Fatal("Error creating table:", err) + log.Fatal(err) } if config.TimescaleDB { @@ -516,3 +576,29 @@ func connectToPostgreSQL(pgConnStr string) error { return nil } + +func Migrate(db *sql.DB, dir string) error { + err := goose.SetDialect("postgres") + if err != nil { + return fmt.Errorf("migrate: %w", err) + } + err = goose.Up(db, dir) + if err != nil { + return fmt.Errorf("migrate: %w", err) + } + return nil + +} + +func MigrateFS(db *sql.DB, migrationFS fs.FS, dir string) error { + // In case the dir is an empty string, they probably meant the current directory and goose wants a period for that. + if dir == "" { + dir = "." + } + goose.SetBaseFS(migrationFS) + defer func() { + // Ensure that we remove the FS on the off chance some other part of our app uses goose for migrations and doesn't want to use our FS. + goose.SetBaseFS(nil) + }() + return Migrate(db, dir) +} diff --git a/migrations/fs.go b/migrations/fs.go new file mode 100644 index 0000000..91cca1c --- /dev/null +++ b/migrations/fs.go @@ -0,0 +1,6 @@ +package migrations + +import "embed" + +//go:embed *.sql +var FS embed.FS