refactor: optimize record management and deletion logic

This commit is contained in:
henrygd
2025-03-03 23:44:50 -05:00
parent 17a163de26
commit d81db6e319

View File

@@ -4,14 +4,15 @@ package records
import ( import (
"beszel/internal/entities/container" "beszel/internal/entities/container"
"beszel/internal/entities/system" "beszel/internal/entities/system"
"fmt"
"log" "log"
"math" "math"
"strings"
"time" "time"
"github.com/goccy/go-json" "github.com/goccy/go-json"
"github.com/pocketbase/dbx" "github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/types"
) )
type RecordManager struct { type RecordManager struct {
@@ -25,11 +26,6 @@ type LongerRecordData struct {
minShorterRecords int minShorterRecords int
} }
type RecordDeletionData struct {
recordType string
retention time.Duration
}
type RecordStats []struct { type RecordStats []struct {
Stats []byte `db:"stats"` Stats []byte `db:"stats"`
} }
@@ -39,7 +35,7 @@ func NewRecordManager(app core.App) *RecordManager {
} }
// Create longer records by averaging shorter records // Create longer records by averaging shorter records
func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) { func (rm *RecordManager) CreateLongerRecords() {
// start := time.Now() // start := time.Now()
longerRecordData := []LongerRecordData{ longerRecordData := []LongerRecordData{
{ {
@@ -70,14 +66,24 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) {
} }
// wrap the operations in a transaction // wrap the operations in a transaction
rm.app.RunInTransaction(func(txApp core.App) error { rm.app.RunInTransaction(func(txApp core.App) error {
activeSystems, err := txApp.FindAllRecords("systems", dbx.NewExp("status = 'up'")) var err error
collections := [2]*core.Collection{}
collections[0], err = txApp.FindCachedCollectionByNameOrId("system_stats")
if err != nil { if err != nil {
log.Println("failed to get active systems", "err", err.Error())
return err return err
} }
collections[1], err = txApp.FindCachedCollectionByNameOrId("container_stats")
if err != nil {
return err
}
var systems []struct {
Id string `db:"id"`
}
txApp.DB().NewQuery("SELECT id FROM systems WHERE status='up'").All(&systems)
// loop through all active systems, time periods, and collections // loop through all active systems, time periods, and collections
for _, system := range activeSystems { for _, system := range systems {
// log.Println("processing system", system.GetString("name")) // log.Println("processing system", system.GetString("name"))
for i := range longerRecordData { for i := range longerRecordData {
recordData := longerRecordData[i] recordData := longerRecordData[i]
@@ -92,7 +98,7 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) {
if recordData.longerType != "10m" { if recordData.longerType != "10m" {
lastLongerRecord, err := txApp.FindFirstRecordByFilter( lastLongerRecord, err := txApp.FindFirstRecordByFilter(
collection.Id, collection.Id,
"type = {:type} && system = {:system} && created > {:created}", "system = {:system} && type = {:type} && created > {:created}",
dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod}, dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod},
) )
// continue if longer record exists // continue if longer record exists
@@ -108,7 +114,7 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) {
Select("stats"). Select("stats").
From(collection.Name). From(collection.Name).
AndWhere(dbx.NewExp( AndWhere(dbx.NewExp(
"type={:type} AND system={:system} AND created > {:created}", "system={:system} AND type={:type} AND created > {:created}",
dbx.Params{ dbx.Params{
"type": recordData.shorterType, "type": recordData.shorterType,
"system": system.Id, "system": system.Id,
@@ -119,7 +125,6 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) {
// continue if not enough shorter records // continue if not enough shorter records
if err != nil || len(stats) < recordData.minShorterRecords { if err != nil || len(stats) < recordData.minShorterRecords {
// log.Println("not enough shorter records. continue.", len(allShorterRecords), recordData.expectedShorterRecords)
continue continue
} }
// average the shorter records and create longer record // average the shorter records and create longer record
@@ -133,7 +138,7 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) {
longerRecord.Set("stats", rm.AverageContainerStats(stats)) longerRecord.Set("stats", rm.AverageContainerStats(stats))
} }
if err := txApp.SaveNoValidate(longerRecord); err != nil { if err := txApp.SaveNoValidate(longerRecord); err != nil {
log.Println("failed to save longer record", "err", err.Error()) log.Println("failed to save longer record", "err", err)
} }
} }
} }
@@ -146,16 +151,20 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) {
} }
// Calculate the average stats of a list of system_stats records without reflect // Calculate the average stats of a list of system_stats records without reflect
func (rm *RecordManager) AverageSystemStats(records RecordStats) system.Stats { func (rm *RecordManager) AverageSystemStats(records RecordStats) *system.Stats {
sum := system.Stats{} sum := &system.Stats{}
count := float64(len(records)) count := float64(len(records))
// use different counter for temps in case some records don't have them
tempCount := float64(0) tempCount := float64(0)
var stats system.Stats // Temporary struct for unmarshaling
stats := &system.Stats{}
// Accumulate totals
for i := range records { for i := range records {
stats = system.Stats{} // Zero the struct before unmarshalling *stats = system.Stats{} // Reset tempStats for unmarshaling
json.Unmarshal(records[i].Stats, &stats) if err := json.Unmarshal(records[i].Stats, stats); err != nil {
continue
}
sum.Cpu += stats.Cpu sum.Cpu += stats.Cpu
sum.Mem += stats.Mem sum.Mem += stats.Mem
sum.MemUsed += stats.MemUsed sum.MemUsed += stats.MemUsed
@@ -171,26 +180,25 @@ func (rm *RecordManager) AverageSystemStats(records RecordStats) system.Stats {
sum.DiskWritePs += stats.DiskWritePs sum.DiskWritePs += stats.DiskWritePs
sum.NetworkSent += stats.NetworkSent sum.NetworkSent += stats.NetworkSent
sum.NetworkRecv += stats.NetworkRecv sum.NetworkRecv += stats.NetworkRecv
// set peak values // Set peak values
sum.MaxCpu = max(sum.MaxCpu, stats.MaxCpu, stats.Cpu) sum.MaxCpu = max(sum.MaxCpu, stats.MaxCpu, stats.Cpu)
sum.MaxNetworkSent = max(sum.MaxNetworkSent, stats.MaxNetworkSent, stats.NetworkSent) sum.MaxNetworkSent = max(sum.MaxNetworkSent, stats.MaxNetworkSent, stats.NetworkSent)
sum.MaxNetworkRecv = max(sum.MaxNetworkRecv, stats.MaxNetworkRecv, stats.NetworkRecv) sum.MaxNetworkRecv = max(sum.MaxNetworkRecv, stats.MaxNetworkRecv, stats.NetworkRecv)
sum.MaxDiskReadPs = max(sum.MaxDiskReadPs, stats.MaxDiskReadPs, stats.DiskReadPs) sum.MaxDiskReadPs = max(sum.MaxDiskReadPs, stats.MaxDiskReadPs, stats.DiskReadPs)
sum.MaxDiskWritePs = max(sum.MaxDiskWritePs, stats.MaxDiskWritePs, stats.DiskWritePs) sum.MaxDiskWritePs = max(sum.MaxDiskWritePs, stats.MaxDiskWritePs, stats.DiskWritePs)
// add temps to sum
// Accumulate temperatures
if stats.Temperatures != nil { if stats.Temperatures != nil {
if sum.Temperatures == nil { if sum.Temperatures == nil {
sum.Temperatures = make(map[string]float64, len(stats.Temperatures)) sum.Temperatures = make(map[string]float64, len(stats.Temperatures))
} }
tempCount++ tempCount++
for key, value := range stats.Temperatures { for key, value := range stats.Temperatures {
if _, ok := sum.Temperatures[key]; !ok {
sum.Temperatures[key] = 0
}
sum.Temperatures[key] += value sum.Temperatures[key] += value
} }
} }
// add extra fs to sum
// Accumulate extra filesystem stats
if stats.ExtraFs != nil { if stats.ExtraFs != nil {
if sum.ExtraFs == nil { if sum.ExtraFs == nil {
sum.ExtraFs = make(map[string]*system.FsStats, len(stats.ExtraFs)) sum.ExtraFs = make(map[string]*system.FsStats, len(stats.ExtraFs))
@@ -199,25 +207,26 @@ func (rm *RecordManager) AverageSystemStats(records RecordStats) system.Stats {
if _, ok := sum.ExtraFs[key]; !ok { if _, ok := sum.ExtraFs[key]; !ok {
sum.ExtraFs[key] = &system.FsStats{} sum.ExtraFs[key] = &system.FsStats{}
} }
sum.ExtraFs[key].DiskTotal += value.DiskTotal fs := sum.ExtraFs[key]
sum.ExtraFs[key].DiskUsed += value.DiskUsed fs.DiskTotal += value.DiskTotal
sum.ExtraFs[key].DiskWritePs += value.DiskWritePs fs.DiskUsed += value.DiskUsed
sum.ExtraFs[key].DiskReadPs += value.DiskReadPs fs.DiskWritePs += value.DiskWritePs
// peak values fs.DiskReadPs += value.DiskReadPs
sum.ExtraFs[key].MaxDiskReadPS = max(sum.ExtraFs[key].MaxDiskReadPS, value.MaxDiskReadPS, value.DiskReadPs) fs.MaxDiskReadPS = max(fs.MaxDiskReadPS, value.MaxDiskReadPS, value.DiskReadPs)
sum.ExtraFs[key].MaxDiskWritePS = max(sum.ExtraFs[key].MaxDiskWritePS, value.MaxDiskWritePS, value.DiskWritePs) fs.MaxDiskWritePS = max(fs.MaxDiskWritePS, value.MaxDiskWritePS, value.DiskWritePs)
} }
} }
// add GPU data
// Accumulate GPU data
if stats.GPUData != nil { if stats.GPUData != nil {
if sum.GPUData == nil { if sum.GPUData == nil {
sum.GPUData = make(map[string]system.GPUData, len(stats.GPUData)) sum.GPUData = make(map[string]system.GPUData, len(stats.GPUData))
} }
for id, value := range stats.GPUData { for id, value := range stats.GPUData {
if _, ok := sum.GPUData[id]; !ok { gpu, ok := sum.GPUData[id]
sum.GPUData[id] = system.GPUData{Name: value.Name} if !ok {
gpu = system.GPUData{Name: value.Name}
} }
gpu := sum.GPUData[id]
gpu.Temperature += value.Temperature gpu.Temperature += value.Temperature
gpu.MemoryUsed += value.MemoryUsed gpu.MemoryUsed += value.MemoryUsed
gpu.MemoryTotal += value.MemoryTotal gpu.MemoryTotal += value.MemoryTotal
@@ -229,76 +238,67 @@ func (rm *RecordManager) AverageSystemStats(records RecordStats) system.Stats {
} }
} }
stats = system.Stats{ // Compute averages in place
Cpu: twoDecimals(sum.Cpu / count), if count > 0 {
Mem: twoDecimals(sum.Mem / count), sum.Cpu = twoDecimals(sum.Cpu / count)
MemUsed: twoDecimals(sum.MemUsed / count), sum.Mem = twoDecimals(sum.Mem / count)
MemPct: twoDecimals(sum.MemPct / count), sum.MemUsed = twoDecimals(sum.MemUsed / count)
MemBuffCache: twoDecimals(sum.MemBuffCache / count), sum.MemPct = twoDecimals(sum.MemPct / count)
MemZfsArc: twoDecimals(sum.MemZfsArc / count), sum.MemBuffCache = twoDecimals(sum.MemBuffCache / count)
Swap: twoDecimals(sum.Swap / count), sum.MemZfsArc = twoDecimals(sum.MemZfsArc / count)
SwapUsed: twoDecimals(sum.SwapUsed / count), sum.Swap = twoDecimals(sum.Swap / count)
DiskTotal: twoDecimals(sum.DiskTotal / count), sum.SwapUsed = twoDecimals(sum.SwapUsed / count)
DiskUsed: twoDecimals(sum.DiskUsed / count), sum.DiskTotal = twoDecimals(sum.DiskTotal / count)
DiskPct: twoDecimals(sum.DiskPct / count), sum.DiskUsed = twoDecimals(sum.DiskUsed / count)
DiskReadPs: twoDecimals(sum.DiskReadPs / count), sum.DiskPct = twoDecimals(sum.DiskPct / count)
DiskWritePs: twoDecimals(sum.DiskWritePs / count), sum.DiskReadPs = twoDecimals(sum.DiskReadPs / count)
NetworkSent: twoDecimals(sum.NetworkSent / count), sum.DiskWritePs = twoDecimals(sum.DiskWritePs / count)
NetworkRecv: twoDecimals(sum.NetworkRecv / count), sum.NetworkSent = twoDecimals(sum.NetworkSent / count)
MaxCpu: sum.MaxCpu, sum.NetworkRecv = twoDecimals(sum.NetworkRecv / count)
MaxDiskReadPs: sum.MaxDiskReadPs,
MaxDiskWritePs: sum.MaxDiskWritePs,
MaxNetworkSent: sum.MaxNetworkSent,
MaxNetworkRecv: sum.MaxNetworkRecv,
}
if sum.Temperatures != nil { // Average temperatures
stats.Temperatures = make(map[string]float64, len(sum.Temperatures)) if sum.Temperatures != nil && tempCount > 0 {
for key, value := range sum.Temperatures { for key := range sum.Temperatures {
stats.Temperatures[key] = twoDecimals(value / tempCount) sum.Temperatures[key] = twoDecimals(sum.Temperatures[key] / tempCount)
}
} }
}
if sum.ExtraFs != nil { // Average extra filesystem stats
stats.ExtraFs = make(map[string]*system.FsStats, len(sum.ExtraFs)) if sum.ExtraFs != nil {
for key, value := range sum.ExtraFs { for key := range sum.ExtraFs {
stats.ExtraFs[key] = &system.FsStats{ fs := sum.ExtraFs[key]
DiskTotal: twoDecimals(value.DiskTotal / count), fs.DiskTotal = twoDecimals(fs.DiskTotal / count)
DiskUsed: twoDecimals(value.DiskUsed / count), fs.DiskUsed = twoDecimals(fs.DiskUsed / count)
DiskWritePs: twoDecimals(value.DiskWritePs / count), fs.DiskWritePs = twoDecimals(fs.DiskWritePs / count)
DiskReadPs: twoDecimals(value.DiskReadPs / count), fs.DiskReadPs = twoDecimals(fs.DiskReadPs / count)
MaxDiskReadPS: value.MaxDiskReadPS, }
MaxDiskWritePS: value.MaxDiskWritePS, }
// Average GPU data
if sum.GPUData != nil {
for id := range sum.GPUData {
gpu := sum.GPUData[id]
gpu.Temperature = twoDecimals(gpu.Temperature / count)
gpu.MemoryUsed = twoDecimals(gpu.MemoryUsed / count)
gpu.MemoryTotal = twoDecimals(gpu.MemoryTotal / count)
gpu.Usage = twoDecimals(gpu.Usage / count)
gpu.Power = twoDecimals(gpu.Power / count)
gpu.Count = twoDecimals(gpu.Count / count)
sum.GPUData[id] = gpu
} }
} }
} }
if sum.GPUData != nil { return sum
stats.GPUData = make(map[string]system.GPUData, len(sum.GPUData))
for id, value := range sum.GPUData {
stats.GPUData[id] = system.GPUData{
Name: value.Name,
Temperature: twoDecimals(value.Temperature / count),
MemoryUsed: twoDecimals(value.MemoryUsed / count),
MemoryTotal: twoDecimals(value.MemoryTotal / count),
Usage: twoDecimals(value.Usage / count),
Power: twoDecimals(value.Power / count),
Count: twoDecimals(value.Count / count),
}
}
}
return stats
} }
// Calculate the average stats of a list of container_stats records // Calculate the average stats of a list of container_stats records
func (rm *RecordManager) AverageContainerStats(records RecordStats) []container.Stats { func (rm *RecordManager) AverageContainerStats(records RecordStats) []container.Stats {
sums := make(map[string]*container.Stats) sums := make(map[string]*container.Stats)
count := float64(len(records)) count := float64(len(records))
containerStats := make([]container.Stats, 0, 50)
var containerStats []container.Stats
for i := range records { for i := range records {
// Reset the slice length to 0, but keep the capacity // reset slice
containerStats = containerStats[:0] containerStats = containerStats[:0]
if err := json.Unmarshal(records[i].Stats, &containerStats); err != nil { if err := json.Unmarshal(records[i].Stats, &containerStats); err != nil {
return []container.Stats{} return []container.Stats{}
@@ -330,38 +330,45 @@ func (rm *RecordManager) AverageContainerStats(records RecordStats) []container.
// Deletes records older than what is displayed in the UI // Deletes records older than what is displayed in the UI
func (rm *RecordManager) DeleteOldRecords() { func (rm *RecordManager) DeleteOldRecords() {
// Define the collections to process
collections := []string{"system_stats", "container_stats"} collections := []string{"system_stats", "container_stats"}
recordData := []RecordDeletionData{
{ // Define record types and their retention periods
recordType: "1m", type RecordDeletionData struct {
retention: time.Hour, recordType string
}, retention time.Duration
{
recordType: "10m",
retention: 12 * time.Hour,
},
{
recordType: "20m",
retention: 24 * time.Hour,
},
{
recordType: "120m",
retention: 7 * 24 * time.Hour,
},
{
recordType: "480m",
retention: 30 * 24 * time.Hour,
},
} }
db := rm.app.NonconcurrentDB() recordData := []RecordDeletionData{
for _, recordData := range recordData { {recordType: "1m", retention: time.Hour}, // 1 hour
for _, collectionSlug := range collections { {recordType: "10m", retention: 12 * time.Hour}, // 12 hours
formattedDate := time.Now().UTC().Add(-recordData.retention).Format(types.DefaultDateLayout) {recordType: "20m", retention: 24 * time.Hour}, // 1 day
expr := dbx.NewExp("[[created]] < {:date} AND [[type]] = {:type}", dbx.Params{"date": formattedDate, "type": recordData.recordType}) {recordType: "120m", retention: 7 * 24 * time.Hour}, // 7 days
_, err := db.Delete(collectionSlug, expr).Execute() {recordType: "480m", retention: 30 * 24 * time.Hour}, // 30 days
if err != nil { }
rm.app.Logger().Error("Failed to delete records", "err", err.Error())
} // Process each collection
for _, collection := range collections {
// Build the WHERE clause dynamically
var conditionParts []string
var params dbx.Params = make(map[string]any)
for i, rd := range recordData {
// Create parameterized condition for this record type
dateParam := fmt.Sprintf("date%d", i)
conditionParts = append(conditionParts, fmt.Sprintf("(type = '%s' AND created < {:%s})", rd.recordType, dateParam))
params[dateParam] = time.Now().UTC().Add(-rd.retention)
}
// Combine conditions with OR
conditionStr := strings.Join(conditionParts, " OR ")
// Construct the full raw query
rawQuery := fmt.Sprintf("DELETE FROM %s WHERE %s", collection, conditionStr)
// Execute the query with parameters
if _, err := rm.app.DB().NewQuery(rawQuery).Bind(params).Execute(); err != nil {
// return fmt.Errorf("failed to delete from %s: %v", collection, err)
rm.app.Logger().Error("failed to delete", "collection", collection, "error", err)
} }
} }
} }