commit 5801910be22dc3f1b7e8797fd927c1b215237f53 Author: Pieter Hollander Date: Tue Dec 5 17:37:50 2023 +0100 First commit. diff --git a/.gitea/workflows/docker.yml b/.gitea/workflows/docker.yml new file mode 100644 index 0000000..0a3cbda --- /dev/null +++ b/.gitea/workflows/docker.yml @@ -0,0 +1,58 @@ +name: Build Docker image +run-name: ${{ gitea.actor }} is building a new image 🚀 +on: + # schedule: + # - cron: "0 10 * * *" + push: + branches: + - "**" + tags: + - "v*.*.*" + pull_request: + branches: + - "main" + +jobs: + build: + runs-on: ubuntu-docker + steps: + - name: echo job info + run: echo "🎉 This job was automatically triggered by a ${{ gitea.event_name }} event and running on a ${{ runner.os }} repo:branch:${{ gitea.repository }}:${{ gitea.ref }}." + - name: Check out repository code + uses: actions/checkout@v4 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Generate image tags + # https://docs.docker.com/build/ci/github-actions/manage-tags-labels/ + id: meta + uses: docker/metadata-action@v5 + with: + # list of Docker images to use as base name for tags + images: | + git.hollander.online/${{ gitea.repository_owner }}/${{ gitea.event.repository.name }} + # generate Docker tags based on the following events/attributes + tags: | + type=schedule + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=semver,pattern={{major}} + type=sha + - name: Login to registry + if: gitea.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: git.hollander.online + username: ${{ gitea.repository_owner }} + password: ${{ secrets.CI_PACKAGES_RW }} + - name: Build and push + uses: docker/build-push-action@v5 + with: + context: . + push: ${{ gitea.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + platforms: linux/arm64,linux/amd64 + - name: Cleanup old images + run: docker system prune -f \ No newline at end of file diff --git a/.gitea/workflows/go.yml b/.gitea/workflows/go.yml new file mode 100644 index 0000000..3402a8e --- /dev/null +++ b/.gitea/workflows/go.yml @@ -0,0 +1,28 @@ +# # https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go +# # https://github.com/goreleaser/goreleaser-action + +# name: Release Go package +# on: [push] + +# jobs: + +# build: +# runs-on: ubuntu-docker +# strategy: +# matrix: +# go-version: [ '1.21' ] +# steps: +# - uses: actions/checkout@v4 +# # - name: Setup Go ${{ matrix.go-version }} +# # uses: actions/setup-go@v4 +# # with: +# # go-version: ${{ matrix.go-version }} +# # # You can test your matrix by printing the current Go version +# - name: Display Go version +# run: go version +# - name: Run GoReleaser +# uses: goreleaser/goreleaser-action@master +# with: +# version: latest +# args: release --rm-dist + \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a9aab3f --- /dev/null +++ b/.gitignore @@ -0,0 +1,72 @@ +# .env files +.env + +# Nano temporary files +*.swp + +# ---> macOS +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +# ---> Linux +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +# ---> Go +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +# ESPhome +.esphome \ No newline at end of file diff --git a/chatgpt prompt.md b/chatgpt prompt.md new file mode 100644 index 0000000..ace44a4 --- /dev/null +++ b/chatgpt prompt.md @@ -0,0 +1,53 @@ +# ChatGPT prompt + +Please write a golang program that subscribes to an mqtt topic which outputs the following payload + +```json +{"t":"231205164749W","dt1":830959,"dt2":729319,"rt1":33727,"rt2":111841,"d":224,"r":0,"f":18,"fl":17,"g":426077,"v1":219,"v2":227,"v3":223,"c1":0,"c2":0,"c3":0,"d1":84,"d2":50,"d3":90,"r1":0,"r2":0,"r3":0} +``` + +T is the Timestamp in Timezone Europe/Amsterdam and format YYMMDDhhmmss. The letter at the end of the timestamp can be either "W" for Winter or "S" for Summer and indicates daylight savings time. + +The data should be inserted into a configurable postgres database with structure + +```sql +CREATE TABLE p1 ( +timestamp TIMESTAMPTZ, +delivered_tariff1 INT, +delivered_tariff2 INT, +returned_tariff1 INT, +returned_tariff2 INT, +delivery_all INT, +returning_all INT, +failures INT, +long_failures INT, +gas INT, +voltage_l1 INT, +voltage_l2 INT, +voltage_l3 INT, +current_l1 INT, +current_l2 INT, +current_l3 INT, +delivery_l1 INT, +delivery_l2 INT, +delivery_l3 INT, +returning_l1 INT, +returning_l2 INT, +returning_l3 INT +); +CREATE EXTENSION IF NOT EXISTS timescaledb; +SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE); +``` + +The connections should be configured using the following environment variables + +```conf +MQTT_BROKER=tls://mqtt.example.com:8883 +MQTT_TOPIC=p1/# +MQTT_USERNAME=your_mqtt_username +MQTT_PASSWORD=your_mqtt_password + +PG_DB='host=localhost port=5432 user=p1 password=secret-replace dbname=p1 sslmode=disable' +``` + +The program should be usable in production and should automatically recover on database or mqtt service interruptions. diff --git a/compose.timescaledb.grafana.yml b/compose.timescaledb.grafana.yml new file mode 100644 index 0000000..6a1c078 --- /dev/null +++ b/compose.timescaledb.grafana.yml @@ -0,0 +1,44 @@ +version: '3.8' +services: + timescaledb: + image: timescale/timescaledb:latest-pg15 + environment: + POSTGRES_USER: ${PG_USER} + POSTGRES_PASSWORD: ${PG_PASSWORD} + POSTGRES_DB: ${PG_DB} + ports: + - "5433:5433" + networks: + - internal + volumes: + - /etc/timezone:/etc/timezone:ro + - /etc/localtime:/etc/localtime:ro + - /opt/p1-logger/database:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${PG_USER}"] + interval: 5s + timeout: 5s + retries: 20 + + p1-logger: + image: git.hollander.online/energy/p1-logger:main + environment: + MQTT_BROKER: ${MQTT_BROKER} + MQTT_TOPIC: ${MQTT_TOPIC} + MQTT_USERNAME: ${MQTT_USERNAME} + MQTT_PASSWORD: ${MQTT_PASSWORD} + PG_DB: ${PG_DB} + depends_on: + timescaledb: + condition: service_healthy + networks: + - internal + volumes: + - /etc/timezone:/etc/timezone:ro + - /etc/localtime:/etc/localtime:ro + +networks: + internal: + attachable: true + proxy: + external: {} diff --git a/example.env b/example.env new file mode 100644 index 0000000..630a1bc --- /dev/null +++ b/example.env @@ -0,0 +1,6 @@ +MQTT_BROKER=tls://mqtt.example.org:8883 +MQTT_TOPIC=p1/metrics +MQTT_USERNAME=your_username +MQTT_PASSWORD=your_password + +PG_DB='host=localhost port=5432 user=postgres password=secret-replace dbname=p1 sslmode=disable' diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..89a4322 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module git.hollander.online/energy/p1-logger + +go 1.21.4 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/lib/pq v1.10.9 +) + +require ( + github.com/gorilla/websocket v1.5.0 // indirect + github.com/joho/godotenv v1.5.1 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.1.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e1c9c1a --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..77ed3a4 --- /dev/null +++ b/main.go @@ -0,0 +1,185 @@ +package main + +import ( + "database/sql" + "encoding/json" + "log" + "os" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/joho/godotenv" + _ "github.com/lib/pq" +) + +type Payload struct { + T string `json:"t"` + Dt1 int `json:"dt1"` + Dt2 int `json:"dt2"` + Rt1 int `json:"rt1"` + Rt2 int `json:"rt2"` + D int `json:"d"` + R int `json:"r"` + F int `json:"f"` + Fl int `json:"fl"` + G int `json:"g"` + V1 int `json:"v1"` + V2 int `json:"v2"` + V3 int `json:"v3"` + C1 int `json:"c1"` + C2 int `json:"c2"` + C3 int `json:"c3"` + D1 int `json:"d1"` + D2 int `json:"d2"` + D3 int `json:"d3"` + R1 int `json:"r1"` + R2 int `json:"r2"` + R3 int `json:"r3"` +} + +var db *sql.DB + +func main() { + // Load environment variables from .env file if it exists + err := godotenv.Load() + if err != nil { + log.Println("Error loading .env file:", err) + } + + // Connect to PostgreSQL + pgConnStr := os.Getenv("PG_DB") + db, err = sql.Open("postgres", pgConnStr) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + // Enable TimescaleDB + _, err = db.Exec(` + CREATE EXTENSION IF NOT EXISTS timescaledb; + `) + if err != nil { + log.Fatal("Error creating TimescaleDB extension:", err) + } + + // Create table if not exists + _, err = db.Exec(` + CREATE TABLE IF NOT EXISTS p1 ( + timestamp TIMESTAMPTZ, + delivered_tariff1 INT, + delivered_tariff2 INT, + returned_tariff1 INT, + returned_tariff2 INT, + delivery_all INT, + returning_all INT, + failures INT, + long_failures INT, + gas INT, + voltage_l1 INT, + voltage_l2 INT, + voltage_l3 INT, + current_l1 INT, + current_l2 INT, + current_l3 INT, + delivery_l1 INT, + delivery_l2 INT, + delivery_l3 INT, + returning_l1 INT, + returning_l2 INT, + returning_l3 INT + ); + SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE); + `) + if err != nil { + log.Fatal("Error creating table:", err) + } + + // Initialize MQTT options + opts := mqtt.NewClientOptions() + opts.AddBroker(os.Getenv("MQTT_BROKER")) + opts.SetUsername(os.Getenv("MQTT_USERNAME")) + opts.SetPassword(os.Getenv("MQTT_PASSWORD")) + + // Connect to MQTT broker + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + + // Subscribe to MQTT topic + topic := os.Getenv("MQTT_TOPIC") + if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + + // Keep the program running + select {} +} + +func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { + // Parse JSON payload + var payload Payload + err := json.Unmarshal(msg.Payload(), &payload) + if err != nil { + log.Println("Error parsing MQTT payload:", err) + return + } + + // Parse timestamp to time.Time + timestamp, err := parseTimestamp(payload.T) + if err != nil { + log.Println("Error parsing timestamp:", err) + return + } + + // Insert data into PostgreSQL + err = insertData(timestamp, payload) + if err != nil { + log.Println("Error inserting data into PostgreSQL:", err) + } +} + +func parseTimestamp(t string) (time.Time, error) { + + // Extract values from timestamp string + year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0') + hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0') + + // Load location for "Europe/Amsterdam" time zone + loc, err := time.LoadLocation("Europe/Amsterdam") + if err != nil { + return time.Time{}, err + } + + // Create and return the timestamp + return time.Date(year, month, day, hour, min, sec, 0, loc), nil + +} + +func insertData(timestamp time.Time, payload Payload) error { + // Prepare SQL statement + stmt := ` + INSERT INTO p1 ( + timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2, + delivery_all, returning_all, failures, long_failures, gas, + voltage_l1, voltage_l2, voltage_l3, + current_l1, current_l2, current_l3, + delivery_l1, delivery_l2, delivery_l3, + returning_l1, returning_l2, returning_l3 + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22) + ` + _, err := db.Exec( + stmt, + timestamp, + payload.Dt1, payload.Dt2, + payload.Rt1, payload.Rt2, + payload.D, payload.R, + payload.F, payload.Fl, + payload.G, + payload.V1, payload.V2, payload.V3, + payload.C1, payload.C2, payload.C3, + payload.D1, payload.D2, payload.D3, + payload.R1, payload.R2, payload.R3, + ) + return err +}