Implement proper DB migrations. Add support for logging electricity sags and swells. Preliminary work on supporting BE and LU specific variables.
All checks were successful
Build Docker image / build (push) Successful in 1m2s
Build Golang packages / release (push) Successful in 2m14s

This commit is contained in:
Pieter Hollander 2024-07-26 22:02:47 +02:00
parent 49aeda69e9
commit b70c6bd8e2
Signed by: pieter
SSH key fingerprint: SHA256:HbX+9cBXsop9SuvL+mELd29sK+7DehFfdVweFVDtMSg
4 changed files with 182 additions and 39 deletions

13
go.mod
View file

@ -1,15 +1,22 @@
module git.hollander.online/energy/p1-logger
go 1.21
go 1.22
require (
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/lib/pq v1.10.9
github.com/pressly/goose/v3 v3.21.1
)
require (
github.com/mfridman/interpolate v0.0.2 // indirect
github.com/sethvargo/go-retry v0.2.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
)
require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/joho/godotenv v1.5.1
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.7.0 // indirect
)

52
go.sum
View file

@ -1,12 +1,56 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY=
github.com/mfridman/interpolate v0.0.2/go.mod h1:p+7uk6oE07mpE/Ik1b8EckO0O4ZXiGAfshKBWLUM9Xg=
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pressly/goose/v3 v3.21.1 h1:5SSAKKWej8LVVzNLuT6KIvP1eFDuPvxa+B6H0w78buQ=
github.com/pressly/goose/v3 v3.21.1/go.mod h1:sqthmzV8PitchEkjecFJII//l43dLOCzfWh8pHEe+vE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI=
modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4=
modernc.org/libc v1.41.0 h1:g9YAc6BkKlgORsUWj+JwqoB1wU3o4DE3bM3yvA3k+Gk=
modernc.org/libc v1.41.0/go.mod h1:w0eszPsiXoOnoMJgrXjglgLuDy/bt5RR4y3QzUUeodY=
modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4=
modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo=
modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E=
modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E=
modernc.org/sqlite v1.29.6 h1:0lOXGrycJPptfHDuohfYgNqoe4hu+gYuN/pKgY5XjS4=
modernc.org/sqlite v1.29.6/go.mod h1:S02dvcmm7TnTRvGhv8IGYyLnIt7AS2KPaB1F/71p75U=
modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA=
modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

150
main.go
View file

@ -4,6 +4,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"io/fs"
"log"
"log/slog"
"os"
@ -14,6 +15,9 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/joho/godotenv"
_ "github.com/lib/pq"
"github.com/pressly/goose/v3"
"git.hollander.online/energy/p1-logger/migrations"
)
// Payload struct
@ -93,6 +97,12 @@ type Payload struct {
R *int `json:"r"` // Returning / exporting (W)
F *int `json:"f"` // Failure (counter)
Fl *int `json:"fl"` // Failure long duration (counter)
Sa1 *int `json:"sa1"` // Number of voltage sags L1
Sa2 *int `json:"sa2"` // Number of voltage sags L2
Sa3 *int `json:"sa3"` // Number of voltage sags L3
Sw1 *int `json:"sw1"` // Number of voltage swells L1
Sw2 *int `json:"sw2"` // Number of voltage swells L1
Sw3 *int `json:"sw3"` // Number of voltage swells L1
G *int `json:"g"` // Gas meter reading (l)
V1 int `json:"v1"` // Voltage L1 (V)
V2 int `json:"v2"` // Voltage L2 (V)
@ -106,6 +116,14 @@ type Payload struct {
R1 *int `json:"r1"` // Returning / exporting L1 (W)
R2 *int `json:"r2"` // Returning / exporting L2 (W)
R3 *int `json:"r3"` // Returning / exporting L3 (W)
GBe *int `json:"gbe"` // Gas meter reading (l) (Belgium)
CAQDBe *int `json:"aeicad"` // Current Average Quarterly Demand for Peak Tariff (Belgium)
CMMQDBe *int `json:"cmmqd"` // Current Months Maximum Quarterly Demand for Peak Tarrif (Belgium)
TMMQDBe *int `json:"13mmqd"` // 13 Month Maximum Quarterly Demand for Peak Tarrif (Belgium).
DLu *int `json:"dlu"` // Energy Delivered (Luxembourg)
RLu *int `json:"rlu"` // Energy Returned (Luxembourg)
}
type Config struct {
@ -281,8 +299,11 @@ func safeDerefInt(ptr *int) string {
}
var prevDt1, prevDt2, prevRt1, prevRt2, prevG, prevF, prevFl int
var prevSa1, prevSa2, prevSa3, prevSw1, prevSw2, prevSw3 int
var prevD, prevR int
var prevD1, prevD2, prevD3, prevR1, prevR2, prevR3 int
var prevGBe, prevCAQDBe, prevCMMQDBe, prevTMMQDBe int
var prevDLu, prevRLu int
func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
// Parse JSON payload
@ -324,6 +345,22 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
payload.Fl, tempChanged = updateFieldIfChanged(payload.Fl, &prevFl)
changed = changed || tempChanged
// Sags
payload.Sa1, tempChanged = updateFieldIfChanged(payload.Sa1, &prevSa1)
changed = changed || tempChanged
payload.Sa2, tempChanged = updateFieldIfChanged(payload.Sa2, &prevSa2)
changed = changed || tempChanged
payload.Sa3, tempChanged = updateFieldIfChanged(payload.Sa3, &prevSa3)
changed = changed || tempChanged
// Swells
payload.Sw1, tempChanged = updateFieldIfChanged(payload.Sw1, &prevSw1)
changed = changed || tempChanged
payload.Sw2, tempChanged = updateFieldIfChanged(payload.Sw2, &prevSw2)
changed = changed || tempChanged
payload.Sw3, tempChanged = updateFieldIfChanged(payload.Sw3, &prevSw3)
changed = changed || tempChanged
// Gas
payload.G, tempChanged = updateFieldIfChanged(payload.G, &prevG)
changed = changed || tempChanged
@ -350,6 +387,22 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
payload.R3, tempChanged = updateFieldIfChanged(payload.R3, &prevR3)
changed = changed || tempChanged
// Belgium
payload.GBe, tempChanged = updateFieldIfChanged(payload.GBe, &prevGBe)
changed = changed || tempChanged
payload.CAQDBe, tempChanged = updateFieldIfChanged(payload.CAQDBe, &prevCAQDBe)
changed = changed || tempChanged
payload.CMMQDBe, tempChanged = updateFieldIfChanged(payload.CMMQDBe, &prevCMMQDBe)
changed = changed || tempChanged
payload.TMMQDBe, tempChanged = updateFieldIfChanged(payload.TMMQDBe, &prevTMMQDBe)
changed = changed || tempChanged
// Luxembourg
payload.DLu, tempChanged = updateFieldIfChanged(payload.DLu, &prevDLu)
changed = changed || tempChanged
payload.RLu, tempChanged = updateFieldIfChanged(payload.RLu, &prevRLu)
changed = changed || tempChanged
// If any value has changed, log all the relevant values
if changed {
logger.Debug("Values changed",
@ -363,6 +416,15 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
"f", safeDerefInt(payload.F),
"fl", safeDerefInt(payload.Fl),
"sa1", safeDerefInt(payload.Sa1),
"sa2", safeDerefInt(payload.Sa2),
"sa3", safeDerefInt(payload.Sa3),
"sw1", safeDerefInt(payload.Sw1),
"sw2", safeDerefInt(payload.Sw2),
"sw3", safeDerefInt(payload.Sw3),
"g", safeDerefInt(payload.G),
"d1", safeDerefInt(payload.D1),
@ -372,6 +434,14 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
"r1", safeDerefInt(payload.R1),
"r2", safeDerefInt(payload.R2),
"r3", safeDerefInt(payload.R3),
"gbe", safeDerefInt(payload.GBe),
"caqdbe", safeDerefInt(payload.CAQDBe),
"cmmqdbe", safeDerefInt(payload.CMMQDBe),
"tmmqdbe", safeDerefInt(payload.TMMQDBe),
"dlu", safeDerefInt(payload.DLu),
"rlu", safeDerefInt(payload.RLu),
)
}
// Insert data into PostgreSQL
@ -398,7 +468,17 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
"d3", payload.D3,
"r1", payload.R1,
"r2", payload.R2,
"r3", payload.R3)
"r3", payload.R3,
"gbe", payload.GBe,
"caqdbe", payload.CAQDBe,
"cmmqdbe", payload.CMMQDBe,
"tmmqdbe", payload.TMMQDBe,
"dlu", payload.DLu,
"rlu", payload.RLu,
)
if err != nil {
logger.Error("Error inserting data into PostgreSQL", "error", err)
}
@ -435,12 +515,16 @@ func insertData(timestamp time.Time, payload Payload) error {
stmt := `
INSERT INTO p1 (
timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2,
delivery_all, returning_all, failures, long_failures, gas,
delivery_all, returning_all,
failures, long_failures,
sags_l1, sags_l2, sags_l3,
swells_l1, swells_l2, swells_l3,
gas,
voltage_l1, voltage_l2, voltage_l3,
current_l1, current_l2, current_l3,
delivery_l1, delivery_l2, delivery_l3,
returning_l1, returning_l2, returning_l3
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28)
`
_, err := db.Exec(
stmt,
@ -449,6 +533,8 @@ func insertData(timestamp time.Time, payload Payload) error {
payload.Rt1, payload.Rt2,
payload.D, payload.R,
payload.F, payload.Fl,
payload.Sa1, payload.Sa2, payload.Sa3,
payload.Sw1, payload.Sw2, payload.Sw3,
payload.G,
payload.V1, payload.V2, payload.V3,
payload.C1, payload.C2, payload.C3,
@ -471,36 +557,10 @@ func connectToPostgreSQL(pgConnStr string) error {
time.Sleep(5 * time.Second) // Retry after 5 seconds
}
// Create table if not exists
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS p1 (
timestamp TIMESTAMPTZ,
delivered_tariff1 INT,
delivered_tariff2 INT,
returned_tariff1 INT,
returned_tariff2 INT,
delivery_all INT,
returning_all INT,
failures INT,
long_failures INT,
gas INT,
voltage_l1 INT,
voltage_l2 INT,
voltage_l3 INT,
current_l1 INT,
current_l2 INT,
current_l3 INT,
delivery_l1 INT,
delivery_l2 INT,
delivery_l3 INT,
returning_l1 INT,
returning_l2 INT,
returning_l3 INT
);
-- CREATE UNIQUE INDEX IF NOT EXISTS timestamp_idx ON p1 (timestamp);
`)
// Perform DB migrations
err = MigrateFS(db, migrations.FS, ".")
if err != nil {
log.Fatal("Error creating table:", err)
log.Fatal(err)
}
if config.TimescaleDB {
@ -516,3 +576,29 @@ func connectToPostgreSQL(pgConnStr string) error {
return nil
}
func Migrate(db *sql.DB, dir string) error {
err := goose.SetDialect("postgres")
if err != nil {
return fmt.Errorf("migrate: %w", err)
}
err = goose.Up(db, dir)
if err != nil {
return fmt.Errorf("migrate: %w", err)
}
return nil
}
func MigrateFS(db *sql.DB, migrationFS fs.FS, dir string) error {
// In case the dir is an empty string, they probably meant the current directory and goose wants a period for that.
if dir == "" {
dir = "."
}
goose.SetBaseFS(migrationFS)
defer func() {
// Ensure that we remove the FS on the off chance some other part of our app uses goose for migrations and doesn't want to use our FS.
goose.SetBaseFS(nil)
}()
return Migrate(db, dir)
}

6
migrations/fs.go Normal file
View file

@ -0,0 +1,6 @@
package migrations
import "embed"
//go:embed *.sql
var FS embed.FS