Added calculations for storage requirements. Adapt D, R, F, Fl, D1-3, R1-3 to only store on-change.
This commit is contained in:
parent
0d0ae0d213
commit
212a328d52
1 changed files with 88 additions and 21 deletions
109
main.go
109
main.go
|
@ -20,7 +20,8 @@ import (
|
||||||
//
|
//
|
||||||
// All metrics are stored in PostgreSQL as INT and not NUMERIC or FLOAT to optimise their storage size.
|
// All metrics are stored in PostgreSQL as INT and not NUMERIC or FLOAT to optimise their storage size.
|
||||||
//
|
//
|
||||||
// Dt1, Dt2, Rt1, Rt2 and G are cumulative meter readings.
|
// Dt1, Dt2, Rt1, Rt2, G, F and Fl are cumulative meter readings.
|
||||||
|
// Therefore, they function as counters that can only go up.
|
||||||
// By making them pointers to int, they can be set to 'nil' when the reading has not changed.
|
// By making them pointers to int, they can be set to 'nil' when the reading has not changed.
|
||||||
// This way, they are logged as NULL in the database when no change has happened, saving storage capacity.
|
// This way, they are logged as NULL in the database when no change has happened, saving storage capacity.
|
||||||
// As an added benefit, this also make data retrieval, visualisation and analysis more light-weight.
|
// As an added benefit, this also make data retrieval, visualisation and analysis more light-weight.
|
||||||
|
@ -28,14 +29,11 @@ import (
|
||||||
// Gas consumption is updated once per minute.
|
// Gas consumption is updated once per minute.
|
||||||
// Not saving intermittent values theoretically saves 4 bytes per value.
|
// Not saving intermittent values theoretically saves 4 bytes per value.
|
||||||
//
|
//
|
||||||
// Storage for non-NULL values every second:
|
// An average month has 43 829.0639 minutes.
|
||||||
// 86,400 entries/day×4 bytes/entry=345,600 bytes/day86,400entries/day×4bytes/entry=345,600bytes/day
|
// 4*43829,0639 = 175,316.26 bytes = 0.18 MB
|
||||||
// Storage for non-NULL values every minute:
|
//
|
||||||
// 1,440 entries/day×4 bytes/entry=5,760 bytes/day1,440entries/day×4bytes/entry=5,760bytes/day
|
// In practice, storage will be even lower when gas is not being consumed continuously,
|
||||||
// Storage savings:
|
// as a new metric won't be available for every minute.
|
||||||
// 345,600 bytes/day−5,760 bytes/day=339,840 bytes/day345,600bytes/day−5,760bytes/day=339,840bytes/day
|
|
||||||
// (about 331.5 KB / day or 9.72 MB / month)
|
|
||||||
// In practice, savings will be even higher when gas is not being consumed continuously.
|
|
||||||
//
|
//
|
||||||
// When Tariff 1 is active, Tariff 2 isn't.
|
// When Tariff 1 is active, Tariff 2 isn't.
|
||||||
// When energy is being imported, it is not being returned.
|
// When energy is being imported, it is not being returned.
|
||||||
|
@ -50,16 +48,36 @@ import (
|
||||||
// In addition, many meters only update these metrics once every 10 seconds,
|
// In addition, many meters only update these metrics once every 10 seconds,
|
||||||
// meaning even more storage capacity is saved:
|
// meaning even more storage capacity is saved:
|
||||||
// 10.52 / 10 * 9 = 9,468 MB.
|
// 10.52 / 10 * 9 = 9,468 MB.
|
||||||
|
//
|
||||||
|
// For D, R, D1-3 and R1-3, either D or R will be active.
|
||||||
|
// Therefore, their storage requirements can be sliced in half, saving 4 * 10.52 = 42.08 MB
|
||||||
|
//
|
||||||
|
// Not saving failures when there is no change, will save around 10.52MB per month per metric, so an additional 21.04MB
|
||||||
|
// Theoretical storage requirements for a DSMR5 meter with metrics emitted every second:
|
||||||
|
//
|
||||||
|
// Column Column size (bytes) Per month (MB) X Total (MB)
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Timestamp 8 bytes 021.04
|
||||||
|
// Dt1, Dt2, Rt1, Rt2 4 bytes 001.05
|
||||||
|
// D, R 4 bytes 010.52
|
||||||
|
// R 4 bytes 010.52
|
||||||
|
// F 4 bytes 000.00
|
||||||
|
// Fl 4 bytes 000.00
|
||||||
|
// G 4 bytes 000.18
|
||||||
|
// V1-3, C1-3 4 bytes 010.52 6 063.12
|
||||||
|
// D1-3, R1-3 4 bytes 010.52 3 031.56
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Total 137.99
|
||||||
type Payload struct {
|
type Payload struct {
|
||||||
T string `json:"t"` // Timestamp
|
T string `json:"t"` // Timestamp
|
||||||
Dt1 *int `json:"dt1"` // Delivered / imported meter reading tariff 1 (kWh)
|
Dt1 *int `json:"dt1"` // Delivered / imported meter reading tariff 1 (kWh)
|
||||||
Dt2 *int `json:"dt2"` // Delivered / imported tariff 2 (kWh)
|
Dt2 *int `json:"dt2"` // Delivered / imported tariff 2 (kWh)
|
||||||
Rt1 *int `json:"rt1"` // Returned / exported tariff 1 (kWh)
|
Rt1 *int `json:"rt1"` // Returned / exported tariff 1 (kWh)
|
||||||
Rt2 *int `json:"rt2"` // Returned / exported tariff 2 (kWh)
|
Rt2 *int `json:"rt2"` // Returned / exported tariff 2 (kWh)
|
||||||
D int `json:"d"` // Delivering / importing (W)
|
D *int `json:"d"` // Delivering / importing (W)
|
||||||
R int `json:"r"` // Returning / exporting (W)
|
R *int `json:"r"` // Returning / exporting (W)
|
||||||
F int `json:"f"` // Failure (counter)
|
F *int `json:"f"` // Failure (counter)
|
||||||
Fl int `json:"fl"` // Failure long duration (counter)
|
Fl *int `json:"fl"` // Failure long duration (counter)
|
||||||
G *int `json:"g"` // Gas meter reading (l)
|
G *int `json:"g"` // Gas meter reading (l)
|
||||||
V1 int `json:"v1"` // Voltage L1 (V)
|
V1 int `json:"v1"` // Voltage L1 (V)
|
||||||
V2 int `json:"v2"` // Voltage L2 (V)
|
V2 int `json:"v2"` // Voltage L2 (V)
|
||||||
|
@ -67,12 +85,12 @@ type Payload struct {
|
||||||
C1 int `json:"c1"` // Current L1 (A)
|
C1 int `json:"c1"` // Current L1 (A)
|
||||||
C2 int `json:"c2"` // Current L2 (A)
|
C2 int `json:"c2"` // Current L2 (A)
|
||||||
C3 int `json:"c3"` // Current L3 (A)
|
C3 int `json:"c3"` // Current L3 (A)
|
||||||
D1 int `json:"d1"` // Delivering / importing L1 (W)
|
D1 *int `json:"d1"` // Delivering / importing L1 (W)
|
||||||
D2 int `json:"d2"` // Delivering / importing L2 (W)
|
D2 *int `json:"d2"` // Delivering / importing L2 (W)
|
||||||
D3 int `json:"d3"` // Delivering / importing L3 (W)
|
D3 *int `json:"d3"` // Delivering / importing L3 (W)
|
||||||
R1 int `json:"r1"` // Returning / exporting L1 (W)
|
R1 *int `json:"r1"` // Returning / exporting L1 (W)
|
||||||
R2 int `json:"r2"` // Returning / exporting L2 (W)
|
R2 *int `json:"r2"` // Returning / exporting L2 (W)
|
||||||
R3 int `json:"r3"` // Returning / exporting L3 (W)
|
R3 *int `json:"r3"` // Returning / exporting L3 (W)
|
||||||
}
|
}
|
||||||
|
|
||||||
var db *sql.DB
|
var db *sql.DB
|
||||||
|
@ -152,6 +170,7 @@ func createLoggerWithLevel(level slog.Level) *slog.Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateFieldIfChanged ensures that meter readings that haven't been updated aren't written to the database, in order to save storage space.
|
// updateFieldIfChanged ensures that meter readings that haven't been updated aren't written to the database, in order to save storage space.
|
||||||
|
// If they haven't changed, they are set to nil, meaning they will be inserted as NULL into the database.
|
||||||
func updateFieldIfChanged(currentValue *int, previousValue *int) (*int, bool) {
|
func updateFieldIfChanged(currentValue *int, previousValue *int) (*int, bool) {
|
||||||
if currentValue != nil && *currentValue == *previousValue {
|
if currentValue != nil && *currentValue == *previousValue {
|
||||||
return nil, false // No change
|
return nil, false // No change
|
||||||
|
@ -172,7 +191,9 @@ func safeDerefInt(ptr *int) string {
|
||||||
return "nil" // Return a string indicating the value is nil
|
return "nil" // Return a string indicating the value is nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var prevDt1, prevDt2, prevRt1, prevRt2, prevG int
|
var prevDt1, prevDt2, prevRt1, prevRt2, prevG, prevF, prevFl int
|
||||||
|
var prevD, prevR int
|
||||||
|
var prevD1, prevD2, prevD3, prevR1, prevR2, prevR3 int
|
||||||
|
|
||||||
func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
|
func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||||
// Parse JSON payload
|
// Parse JSON payload
|
||||||
|
@ -194,6 +215,7 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||||
|
|
||||||
// Update each field, directly updating `changed` if any change is detected
|
// Update each field, directly updating `changed` if any change is detected
|
||||||
var tempChanged bool // Used to capture the change status for each field
|
var tempChanged bool // Used to capture the change status for each field
|
||||||
|
// Electricity meter readings
|
||||||
payload.Dt1, tempChanged = updateFieldIfChanged(payload.Dt1, &prevDt1)
|
payload.Dt1, tempChanged = updateFieldIfChanged(payload.Dt1, &prevDt1)
|
||||||
changed = changed || tempChanged
|
changed = changed || tempChanged
|
||||||
|
|
||||||
|
@ -206,9 +228,39 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||||
payload.Rt2, tempChanged = updateFieldIfChanged(payload.Rt2, &prevRt2)
|
payload.Rt2, tempChanged = updateFieldIfChanged(payload.Rt2, &prevRt2)
|
||||||
changed = changed || tempChanged
|
changed = changed || tempChanged
|
||||||
|
|
||||||
|
// Faults
|
||||||
|
payload.F, tempChanged = updateFieldIfChanged(payload.F, &prevF)
|
||||||
|
changed = changed || tempChanged
|
||||||
|
|
||||||
|
payload.Fl, tempChanged = updateFieldIfChanged(payload.Fl, &prevFl)
|
||||||
|
changed = changed || tempChanged
|
||||||
|
|
||||||
|
// Gas
|
||||||
payload.G, tempChanged = updateFieldIfChanged(payload.G, &prevG)
|
payload.G, tempChanged = updateFieldIfChanged(payload.G, &prevG)
|
||||||
changed = changed || tempChanged
|
changed = changed || tempChanged
|
||||||
|
|
||||||
|
// D, R
|
||||||
|
payload.D, tempChanged = updateFieldIfChanged(payload.D, &prevD)
|
||||||
|
changed = changed || tempChanged
|
||||||
|
payload.R, tempChanged = updateFieldIfChanged(payload.R, &prevR)
|
||||||
|
changed = changed || tempChanged
|
||||||
|
|
||||||
|
// D1-3
|
||||||
|
payload.D1, tempChanged = updateFieldIfChanged(payload.D1, &prevD1)
|
||||||
|
changed = changed || tempChanged
|
||||||
|
payload.D2, tempChanged = updateFieldIfChanged(payload.D2, &prevD2)
|
||||||
|
changed = changed || tempChanged
|
||||||
|
payload.D3, tempChanged = updateFieldIfChanged(payload.D3, &prevD3)
|
||||||
|
changed = changed || tempChanged
|
||||||
|
|
||||||
|
// R1-3
|
||||||
|
payload.R1, tempChanged = updateFieldIfChanged(payload.R1, &prevR1)
|
||||||
|
changed = changed || tempChanged
|
||||||
|
payload.R2, tempChanged = updateFieldIfChanged(payload.R2, &prevR2)
|
||||||
|
changed = changed || tempChanged
|
||||||
|
payload.R3, tempChanged = updateFieldIfChanged(payload.R3, &prevR3)
|
||||||
|
changed = changed || tempChanged
|
||||||
|
|
||||||
// If any value has changed, log all the relevant values
|
// If any value has changed, log all the relevant values
|
||||||
if changed {
|
if changed {
|
||||||
logger.Debug("Values changed",
|
logger.Debug("Values changed",
|
||||||
|
@ -216,7 +268,22 @@ func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||||
"dt2", safeDerefInt(payload.Dt2),
|
"dt2", safeDerefInt(payload.Dt2),
|
||||||
"rt1", safeDerefInt(payload.Rt1),
|
"rt1", safeDerefInt(payload.Rt1),
|
||||||
"rt2", safeDerefInt(payload.Rt2),
|
"rt2", safeDerefInt(payload.Rt2),
|
||||||
"g", safeDerefInt(payload.G))
|
|
||||||
|
"d", safeDerefInt(payload.D),
|
||||||
|
"r", safeDerefInt(payload.R),
|
||||||
|
|
||||||
|
"f", safeDerefInt(payload.F),
|
||||||
|
"fl", safeDerefInt(payload.Fl),
|
||||||
|
"g", safeDerefInt(payload.G),
|
||||||
|
|
||||||
|
"d1", safeDerefInt(payload.D1),
|
||||||
|
"d2", safeDerefInt(payload.D2),
|
||||||
|
"d3", safeDerefInt(payload.D3),
|
||||||
|
|
||||||
|
"r1", safeDerefInt(payload.R1),
|
||||||
|
"r2", safeDerefInt(payload.R2),
|
||||||
|
"r3", safeDerefInt(payload.R3),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
// Insert data into PostgreSQL
|
// Insert data into PostgreSQL
|
||||||
err = insertData(timestamp, payload)
|
err = insertData(timestamp, payload)
|
||||||
|
|
Loading…
Reference in a new issue