From 5801910be22dc3f1b7e8797fd927c1b215237f53 Mon Sep 17 00:00:00 2001 From: Pieter Hollander Date: Tue, 5 Dec 2023 17:37:50 +0100 Subject: [PATCH] First commit. --- .gitea/workflows/docker.yml | 58 ++++++++++ .gitea/workflows/go.yml | 28 +++++ .gitignore | 72 +++++++++++++ chatgpt prompt.md | 53 +++++++++ compose.timescaledb.grafana.yml | 44 ++++++++ example.env | 6 ++ go.mod | 15 +++ go.sum | 12 +++ main.go | 185 ++++++++++++++++++++++++++++++++ 9 files changed, 473 insertions(+) create mode 100644 .gitea/workflows/docker.yml create mode 100644 .gitea/workflows/go.yml create mode 100644 .gitignore create mode 100644 chatgpt prompt.md create mode 100644 compose.timescaledb.grafana.yml create mode 100644 example.env create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go diff --git a/.gitea/workflows/docker.yml b/.gitea/workflows/docker.yml new file mode 100644 index 0000000..0a3cbda --- /dev/null +++ b/.gitea/workflows/docker.yml @@ -0,0 +1,58 @@ +name: Build Docker image +run-name: ${{ gitea.actor }} is building a new image 🚀 +on: + # schedule: + # - cron: "0 10 * * *" + push: + branches: + - "**" + tags: + - "v*.*.*" + pull_request: + branches: + - "main" + +jobs: + build: + runs-on: ubuntu-docker + steps: + - name: echo job info + run: echo "🎉 This job was automatically triggered by a ${{ gitea.event_name }} event and running on a ${{ runner.os }} repo:branch:${{ gitea.repository }}:${{ gitea.ref }}." + - name: Check out repository code + uses: actions/checkout@v4 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Generate image tags + # https://docs.docker.com/build/ci/github-actions/manage-tags-labels/ + id: meta + uses: docker/metadata-action@v5 + with: + # list of Docker images to use as base name for tags + images: | + git.hollander.online/${{ gitea.repository_owner }}/${{ gitea.event.repository.name }} + # generate Docker tags based on the following events/attributes + tags: | + type=schedule + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=semver,pattern={{major}} + type=sha + - name: Login to registry + if: gitea.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: git.hollander.online + username: ${{ gitea.repository_owner }} + password: ${{ secrets.CI_PACKAGES_RW }} + - name: Build and push + uses: docker/build-push-action@v5 + with: + context: . + push: ${{ gitea.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + platforms: linux/arm64,linux/amd64 + - name: Cleanup old images + run: docker system prune -f \ No newline at end of file diff --git a/.gitea/workflows/go.yml b/.gitea/workflows/go.yml new file mode 100644 index 0000000..3402a8e --- /dev/null +++ b/.gitea/workflows/go.yml @@ -0,0 +1,28 @@ +# # https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go +# # https://github.com/goreleaser/goreleaser-action + +# name: Release Go package +# on: [push] + +# jobs: + +# build: +# runs-on: ubuntu-docker +# strategy: +# matrix: +# go-version: [ '1.21' ] +# steps: +# - uses: actions/checkout@v4 +# # - name: Setup Go ${{ matrix.go-version }} +# # uses: actions/setup-go@v4 +# # with: +# # go-version: ${{ matrix.go-version }} +# # # You can test your matrix by printing the current Go version +# - name: Display Go version +# run: go version +# - name: Run GoReleaser +# uses: goreleaser/goreleaser-action@master +# with: +# version: latest +# args: release --rm-dist + \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a9aab3f --- /dev/null +++ b/.gitignore @@ -0,0 +1,72 @@ +# .env files +.env + +# Nano temporary files +*.swp + +# ---> macOS +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +# ---> Linux +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +# ---> Go +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +# ESPhome +.esphome \ No newline at end of file diff --git a/chatgpt prompt.md b/chatgpt prompt.md new file mode 100644 index 0000000..ace44a4 --- /dev/null +++ b/chatgpt prompt.md @@ -0,0 +1,53 @@ +# ChatGPT prompt + +Please write a golang program that subscribes to an mqtt topic which outputs the following payload + +```json +{"t":"231205164749W","dt1":830959,"dt2":729319,"rt1":33727,"rt2":111841,"d":224,"r":0,"f":18,"fl":17,"g":426077,"v1":219,"v2":227,"v3":223,"c1":0,"c2":0,"c3":0,"d1":84,"d2":50,"d3":90,"r1":0,"r2":0,"r3":0} +``` + +T is the Timestamp in Timezone Europe/Amsterdam and format YYMMDDhhmmss. The letter at the end of the timestamp can be either "W" for Winter or "S" for Summer and indicates daylight savings time. + +The data should be inserted into a configurable postgres database with structure + +```sql +CREATE TABLE p1 ( +timestamp TIMESTAMPTZ, +delivered_tariff1 INT, +delivered_tariff2 INT, +returned_tariff1 INT, +returned_tariff2 INT, +delivery_all INT, +returning_all INT, +failures INT, +long_failures INT, +gas INT, +voltage_l1 INT, +voltage_l2 INT, +voltage_l3 INT, +current_l1 INT, +current_l2 INT, +current_l3 INT, +delivery_l1 INT, +delivery_l2 INT, +delivery_l3 INT, +returning_l1 INT, +returning_l2 INT, +returning_l3 INT +); +CREATE EXTENSION IF NOT EXISTS timescaledb; +SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE); +``` + +The connections should be configured using the following environment variables + +```conf +MQTT_BROKER=tls://mqtt.example.com:8883 +MQTT_TOPIC=p1/# +MQTT_USERNAME=your_mqtt_username +MQTT_PASSWORD=your_mqtt_password + +PG_DB='host=localhost port=5432 user=p1 password=secret-replace dbname=p1 sslmode=disable' +``` + +The program should be usable in production and should automatically recover on database or mqtt service interruptions. diff --git a/compose.timescaledb.grafana.yml b/compose.timescaledb.grafana.yml new file mode 100644 index 0000000..6a1c078 --- /dev/null +++ b/compose.timescaledb.grafana.yml @@ -0,0 +1,44 @@ +version: '3.8' +services: + timescaledb: + image: timescale/timescaledb:latest-pg15 + environment: + POSTGRES_USER: ${PG_USER} + POSTGRES_PASSWORD: ${PG_PASSWORD} + POSTGRES_DB: ${PG_DB} + ports: + - "5433:5433" + networks: + - internal + volumes: + - /etc/timezone:/etc/timezone:ro + - /etc/localtime:/etc/localtime:ro + - /opt/p1-logger/database:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${PG_USER}"] + interval: 5s + timeout: 5s + retries: 20 + + p1-logger: + image: git.hollander.online/energy/p1-logger:main + environment: + MQTT_BROKER: ${MQTT_BROKER} + MQTT_TOPIC: ${MQTT_TOPIC} + MQTT_USERNAME: ${MQTT_USERNAME} + MQTT_PASSWORD: ${MQTT_PASSWORD} + PG_DB: ${PG_DB} + depends_on: + timescaledb: + condition: service_healthy + networks: + - internal + volumes: + - /etc/timezone:/etc/timezone:ro + - /etc/localtime:/etc/localtime:ro + +networks: + internal: + attachable: true + proxy: + external: {} diff --git a/example.env b/example.env new file mode 100644 index 0000000..630a1bc --- /dev/null +++ b/example.env @@ -0,0 +1,6 @@ +MQTT_BROKER=tls://mqtt.example.org:8883 +MQTT_TOPIC=p1/metrics +MQTT_USERNAME=your_username +MQTT_PASSWORD=your_password + +PG_DB='host=localhost port=5432 user=postgres password=secret-replace dbname=p1 sslmode=disable' diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..89a4322 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module git.hollander.online/energy/p1-logger + +go 1.21.4 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/lib/pq v1.10.9 +) + +require ( + github.com/gorilla/websocket v1.5.0 // indirect + github.com/joho/godotenv v1.5.1 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.1.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e1c9c1a --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..77ed3a4 --- /dev/null +++ b/main.go @@ -0,0 +1,185 @@ +package main + +import ( + "database/sql" + "encoding/json" + "log" + "os" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/joho/godotenv" + _ "github.com/lib/pq" +) + +type Payload struct { + T string `json:"t"` + Dt1 int `json:"dt1"` + Dt2 int `json:"dt2"` + Rt1 int `json:"rt1"` + Rt2 int `json:"rt2"` + D int `json:"d"` + R int `json:"r"` + F int `json:"f"` + Fl int `json:"fl"` + G int `json:"g"` + V1 int `json:"v1"` + V2 int `json:"v2"` + V3 int `json:"v3"` + C1 int `json:"c1"` + C2 int `json:"c2"` + C3 int `json:"c3"` + D1 int `json:"d1"` + D2 int `json:"d2"` + D3 int `json:"d3"` + R1 int `json:"r1"` + R2 int `json:"r2"` + R3 int `json:"r3"` +} + +var db *sql.DB + +func main() { + // Load environment variables from .env file if it exists + err := godotenv.Load() + if err != nil { + log.Println("Error loading .env file:", err) + } + + // Connect to PostgreSQL + pgConnStr := os.Getenv("PG_DB") + db, err = sql.Open("postgres", pgConnStr) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + // Enable TimescaleDB + _, err = db.Exec(` + CREATE EXTENSION IF NOT EXISTS timescaledb; + `) + if err != nil { + log.Fatal("Error creating TimescaleDB extension:", err) + } + + // Create table if not exists + _, err = db.Exec(` + CREATE TABLE IF NOT EXISTS p1 ( + timestamp TIMESTAMPTZ, + delivered_tariff1 INT, + delivered_tariff2 INT, + returned_tariff1 INT, + returned_tariff2 INT, + delivery_all INT, + returning_all INT, + failures INT, + long_failures INT, + gas INT, + voltage_l1 INT, + voltage_l2 INT, + voltage_l3 INT, + current_l1 INT, + current_l2 INT, + current_l3 INT, + delivery_l1 INT, + delivery_l2 INT, + delivery_l3 INT, + returning_l1 INT, + returning_l2 INT, + returning_l3 INT + ); + SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE); + `) + if err != nil { + log.Fatal("Error creating table:", err) + } + + // Initialize MQTT options + opts := mqtt.NewClientOptions() + opts.AddBroker(os.Getenv("MQTT_BROKER")) + opts.SetUsername(os.Getenv("MQTT_USERNAME")) + opts.SetPassword(os.Getenv("MQTT_PASSWORD")) + + // Connect to MQTT broker + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + + // Subscribe to MQTT topic + topic := os.Getenv("MQTT_TOPIC") + if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + + // Keep the program running + select {} +} + +func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { + // Parse JSON payload + var payload Payload + err := json.Unmarshal(msg.Payload(), &payload) + if err != nil { + log.Println("Error parsing MQTT payload:", err) + return + } + + // Parse timestamp to time.Time + timestamp, err := parseTimestamp(payload.T) + if err != nil { + log.Println("Error parsing timestamp:", err) + return + } + + // Insert data into PostgreSQL + err = insertData(timestamp, payload) + if err != nil { + log.Println("Error inserting data into PostgreSQL:", err) + } +} + +func parseTimestamp(t string) (time.Time, error) { + + // Extract values from timestamp string + year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0') + hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0') + + // Load location for "Europe/Amsterdam" time zone + loc, err := time.LoadLocation("Europe/Amsterdam") + if err != nil { + return time.Time{}, err + } + + // Create and return the timestamp + return time.Date(year, month, day, hour, min, sec, 0, loc), nil + +} + +func insertData(timestamp time.Time, payload Payload) error { + // Prepare SQL statement + stmt := ` + INSERT INTO p1 ( + timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2, + delivery_all, returning_all, failures, long_failures, gas, + voltage_l1, voltage_l2, voltage_l3, + current_l1, current_l2, current_l3, + delivery_l1, delivery_l2, delivery_l3, + returning_l1, returning_l2, returning_l3 + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22) + ` + _, err := db.Exec( + stmt, + timestamp, + payload.Dt1, payload.Dt2, + payload.Rt1, payload.Rt2, + payload.D, payload.R, + payload.F, payload.Fl, + payload.G, + payload.V1, payload.V2, payload.V3, + payload.C1, payload.C2, payload.C3, + payload.D1, payload.D2, payload.D3, + payload.R1, payload.R2, payload.R3, + ) + return err +}