package main import ( "database/sql" "encoding/json" "log" "os" "time" mqtt "github.com/eclipse/paho.mqtt.golang" _ "github.com/lib/pq" ) type Payload struct { T string `json:"t"` Dt1 int `json:"dt1"` Dt2 int `json:"dt2"` Rt1 int `json:"rt1"` Rt2 int `json:"rt2"` D int `json:"d"` R int `json:"r"` F int `json:"f"` Fl int `json:"fl"` G int `json:"g"` V1 int `json:"v1"` V2 int `json:"v2"` V3 int `json:"v3"` C1 int `json:"c1"` C2 int `json:"c2"` C3 int `json:"c3"` D1 int `json:"d1"` D2 int `json:"d2"` D3 int `json:"d3"` R1 int `json:"r1"` R2 int `json:"r2"` R3 int `json:"r3"` } var db *sql.DB func main() { // Load environment variables from .env file if it exists // err := godotenv.Load() // if err != nil { // log.Println("Error loading .env file:", err) // } // Connect to PostgreSQL pgConnStr := os.Getenv("PG_DB") db, err = sql.Open("postgres", pgConnStr) if err != nil { log.Fatal(err) } defer db.Close() // Enable TimescaleDB _, err = db.Exec(` CREATE EXTENSION IF NOT EXISTS timescaledb; `) if err != nil { log.Fatal("Error creating TimescaleDB extension:", err) } // Create table if not exists _, err = db.Exec(` CREATE TABLE IF NOT EXISTS p1 ( timestamp TIMESTAMPTZ, delivered_tariff1 INT, delivered_tariff2 INT, returned_tariff1 INT, returned_tariff2 INT, delivery_all INT, returning_all INT, failures INT, long_failures INT, gas INT, voltage_l1 INT, voltage_l2 INT, voltage_l3 INT, current_l1 INT, current_l2 INT, current_l3 INT, delivery_l1 INT, delivery_l2 INT, delivery_l3 INT, returning_l1 INT, returning_l2 INT, returning_l3 INT ); SELECT create_hypertable('p1', 'timestamp', if_not_exists => TRUE); `) if err != nil { log.Fatal("Error creating table:", err) } // Initialize MQTT options opts := mqtt.NewClientOptions() opts.AddBroker(os.Getenv("MQTT_BROKER")) opts.SetUsername(os.Getenv("MQTT_USERNAME")) opts.SetPassword(os.Getenv("MQTT_PASSWORD")) // Connect to MQTT broker client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } // Subscribe to MQTT topic topic := os.Getenv("MQTT_TOPIC") if token := client.Subscribe(topic, 0, mqttMessageHandler); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } // Keep the program running select {} } func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { // Parse JSON payload var payload Payload err := json.Unmarshal(msg.Payload(), &payload) if err != nil { log.Println("Error parsing MQTT payload:", err) return } // Parse timestamp to time.Time timestamp, err := parseTimestamp(payload.T) if err != nil { log.Println("Error parsing timestamp:", err) return } // Insert data into PostgreSQL err = insertData(timestamp, payload) if err != nil { log.Println("Error inserting data into PostgreSQL:", err) } } func parseTimestamp(t string) (time.Time, error) { // Extract values from timestamp string year, month, day := 2000+int(t[0]-'0')*10+int(t[1]-'0'), time.Month(int(t[2]-'0')*10+int(t[3]-'0')), int(t[4]-'0')*10+int(t[5]-'0') hour, min, sec := int(t[6]-'0')*10+int(t[7]-'0'), int(t[8]-'0')*10+int(t[9]-'0'), int(t[10]-'0')*10+int(t[11]-'0') // Load location for "Europe/Amsterdam" time zone loc, err := time.LoadLocation("Europe/Amsterdam") if err != nil { return time.Time{}, err } // Create and return the timestamp return time.Date(year, month, day, hour, min, sec, 0, loc), nil } func insertData(timestamp time.Time, payload Payload) error { // Prepare SQL statement stmt := ` INSERT INTO p1 ( timestamp, delivered_tariff1, delivered_tariff2, returned_tariff1, returned_tariff2, delivery_all, returning_all, failures, long_failures, gas, voltage_l1, voltage_l2, voltage_l3, current_l1, current_l2, current_l3, delivery_l1, delivery_l2, delivery_l3, returning_l1, returning_l2, returning_l3 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22) ` _, err := db.Exec( stmt, timestamp, payload.Dt1, payload.Dt2, payload.Rt1, payload.Rt2, payload.D, payload.R, payload.F, payload.Fl, payload.G, payload.V1, payload.V2, payload.V3, payload.C1, payload.C2, payload.C3, payload.D1, payload.D2, payload.D3, payload.R1, payload.R2, payload.R3, ) return err }