From d81db6e3198b315f62aee9ddf3fd61f303ff9e01 Mon Sep 17 00:00:00 2001 From: henrygd Date: Mon, 3 Mar 2025 23:44:50 -0500 Subject: [PATCH] refactor: optimize record management and deletion logic --- beszel/internal/records/records.go | 253 +++++++++++++++-------------- 1 file changed, 130 insertions(+), 123 deletions(-) diff --git a/beszel/internal/records/records.go b/beszel/internal/records/records.go index 9a99adc..e740b21 100644 --- a/beszel/internal/records/records.go +++ b/beszel/internal/records/records.go @@ -4,14 +4,15 @@ package records import ( "beszel/internal/entities/container" "beszel/internal/entities/system" + "fmt" "log" "math" + "strings" "time" "github.com/goccy/go-json" "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase/core" - "github.com/pocketbase/pocketbase/tools/types" ) type RecordManager struct { @@ -25,11 +26,6 @@ type LongerRecordData struct { minShorterRecords int } -type RecordDeletionData struct { - recordType string - retention time.Duration -} - type RecordStats []struct { Stats []byte `db:"stats"` } @@ -39,7 +35,7 @@ func NewRecordManager(app core.App) *RecordManager { } // Create longer records by averaging shorter records -func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) { +func (rm *RecordManager) CreateLongerRecords() { // start := time.Now() longerRecordData := []LongerRecordData{ { @@ -70,14 +66,24 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) { } // wrap the operations in a transaction rm.app.RunInTransaction(func(txApp core.App) error { - activeSystems, err := txApp.FindAllRecords("systems", dbx.NewExp("status = 'up'")) + var err error + collections := [2]*core.Collection{} + collections[0], err = txApp.FindCachedCollectionByNameOrId("system_stats") if err != nil { - log.Println("failed to get active systems", "err", err.Error()) return err } + collections[1], err = txApp.FindCachedCollectionByNameOrId("container_stats") + if err != nil { + return err + } + var systems []struct { + Id string `db:"id"` + } + + txApp.DB().NewQuery("SELECT id FROM systems WHERE status='up'").All(&systems) // loop through all active systems, time periods, and collections - for _, system := range activeSystems { + for _, system := range systems { // log.Println("processing system", system.GetString("name")) for i := range longerRecordData { recordData := longerRecordData[i] @@ -92,7 +98,7 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) { if recordData.longerType != "10m" { lastLongerRecord, err := txApp.FindFirstRecordByFilter( collection.Id, - "type = {:type} && system = {:system} && created > {:created}", + "system = {:system} && type = {:type} && created > {:created}", dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod}, ) // continue if longer record exists @@ -108,7 +114,7 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) { Select("stats"). From(collection.Name). AndWhere(dbx.NewExp( - "type={:type} AND system={:system} AND created > {:created}", + "system={:system} AND type={:type} AND created > {:created}", dbx.Params{ "type": recordData.shorterType, "system": system.Id, @@ -119,7 +125,6 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) { // continue if not enough shorter records if err != nil || len(stats) < recordData.minShorterRecords { - // log.Println("not enough shorter records. continue.", len(allShorterRecords), recordData.expectedShorterRecords) continue } // average the shorter records and create longer record @@ -133,7 +138,7 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) { longerRecord.Set("stats", rm.AverageContainerStats(stats)) } if err := txApp.SaveNoValidate(longerRecord); err != nil { - log.Println("failed to save longer record", "err", err.Error()) + log.Println("failed to save longer record", "err", err) } } } @@ -146,16 +151,20 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) { } // Calculate the average stats of a list of system_stats records without reflect -func (rm *RecordManager) AverageSystemStats(records RecordStats) system.Stats { - sum := system.Stats{} +func (rm *RecordManager) AverageSystemStats(records RecordStats) *system.Stats { + sum := &system.Stats{} count := float64(len(records)) - // use different counter for temps in case some records don't have them tempCount := float64(0) - var stats system.Stats + // Temporary struct for unmarshaling + stats := &system.Stats{} + + // Accumulate totals for i := range records { - stats = system.Stats{} // Zero the struct before unmarshalling - json.Unmarshal(records[i].Stats, &stats) + *stats = system.Stats{} // Reset tempStats for unmarshaling + if err := json.Unmarshal(records[i].Stats, stats); err != nil { + continue + } sum.Cpu += stats.Cpu sum.Mem += stats.Mem sum.MemUsed += stats.MemUsed @@ -171,26 +180,25 @@ func (rm *RecordManager) AverageSystemStats(records RecordStats) system.Stats { sum.DiskWritePs += stats.DiskWritePs sum.NetworkSent += stats.NetworkSent sum.NetworkRecv += stats.NetworkRecv - // set peak values + // Set peak values sum.MaxCpu = max(sum.MaxCpu, stats.MaxCpu, stats.Cpu) sum.MaxNetworkSent = max(sum.MaxNetworkSent, stats.MaxNetworkSent, stats.NetworkSent) sum.MaxNetworkRecv = max(sum.MaxNetworkRecv, stats.MaxNetworkRecv, stats.NetworkRecv) sum.MaxDiskReadPs = max(sum.MaxDiskReadPs, stats.MaxDiskReadPs, stats.DiskReadPs) sum.MaxDiskWritePs = max(sum.MaxDiskWritePs, stats.MaxDiskWritePs, stats.DiskWritePs) - // add temps to sum + + // Accumulate temperatures if stats.Temperatures != nil { if sum.Temperatures == nil { sum.Temperatures = make(map[string]float64, len(stats.Temperatures)) } tempCount++ for key, value := range stats.Temperatures { - if _, ok := sum.Temperatures[key]; !ok { - sum.Temperatures[key] = 0 - } sum.Temperatures[key] += value } } - // add extra fs to sum + + // Accumulate extra filesystem stats if stats.ExtraFs != nil { if sum.ExtraFs == nil { sum.ExtraFs = make(map[string]*system.FsStats, len(stats.ExtraFs)) @@ -199,25 +207,26 @@ func (rm *RecordManager) AverageSystemStats(records RecordStats) system.Stats { if _, ok := sum.ExtraFs[key]; !ok { sum.ExtraFs[key] = &system.FsStats{} } - sum.ExtraFs[key].DiskTotal += value.DiskTotal - sum.ExtraFs[key].DiskUsed += value.DiskUsed - sum.ExtraFs[key].DiskWritePs += value.DiskWritePs - sum.ExtraFs[key].DiskReadPs += value.DiskReadPs - // peak values - sum.ExtraFs[key].MaxDiskReadPS = max(sum.ExtraFs[key].MaxDiskReadPS, value.MaxDiskReadPS, value.DiskReadPs) - sum.ExtraFs[key].MaxDiskWritePS = max(sum.ExtraFs[key].MaxDiskWritePS, value.MaxDiskWritePS, value.DiskWritePs) + fs := sum.ExtraFs[key] + fs.DiskTotal += value.DiskTotal + fs.DiskUsed += value.DiskUsed + fs.DiskWritePs += value.DiskWritePs + fs.DiskReadPs += value.DiskReadPs + fs.MaxDiskReadPS = max(fs.MaxDiskReadPS, value.MaxDiskReadPS, value.DiskReadPs) + fs.MaxDiskWritePS = max(fs.MaxDiskWritePS, value.MaxDiskWritePS, value.DiskWritePs) } } - // add GPU data + + // Accumulate GPU data if stats.GPUData != nil { if sum.GPUData == nil { sum.GPUData = make(map[string]system.GPUData, len(stats.GPUData)) } for id, value := range stats.GPUData { - if _, ok := sum.GPUData[id]; !ok { - sum.GPUData[id] = system.GPUData{Name: value.Name} + gpu, ok := sum.GPUData[id] + if !ok { + gpu = system.GPUData{Name: value.Name} } - gpu := sum.GPUData[id] gpu.Temperature += value.Temperature gpu.MemoryUsed += value.MemoryUsed gpu.MemoryTotal += value.MemoryTotal @@ -229,76 +238,67 @@ func (rm *RecordManager) AverageSystemStats(records RecordStats) system.Stats { } } - stats = system.Stats{ - Cpu: twoDecimals(sum.Cpu / count), - Mem: twoDecimals(sum.Mem / count), - MemUsed: twoDecimals(sum.MemUsed / count), - MemPct: twoDecimals(sum.MemPct / count), - MemBuffCache: twoDecimals(sum.MemBuffCache / count), - MemZfsArc: twoDecimals(sum.MemZfsArc / count), - Swap: twoDecimals(sum.Swap / count), - SwapUsed: twoDecimals(sum.SwapUsed / count), - DiskTotal: twoDecimals(sum.DiskTotal / count), - DiskUsed: twoDecimals(sum.DiskUsed / count), - DiskPct: twoDecimals(sum.DiskPct / count), - DiskReadPs: twoDecimals(sum.DiskReadPs / count), - DiskWritePs: twoDecimals(sum.DiskWritePs / count), - NetworkSent: twoDecimals(sum.NetworkSent / count), - NetworkRecv: twoDecimals(sum.NetworkRecv / count), - MaxCpu: sum.MaxCpu, - MaxDiskReadPs: sum.MaxDiskReadPs, - MaxDiskWritePs: sum.MaxDiskWritePs, - MaxNetworkSent: sum.MaxNetworkSent, - MaxNetworkRecv: sum.MaxNetworkRecv, - } + // Compute averages in place + if count > 0 { + sum.Cpu = twoDecimals(sum.Cpu / count) + sum.Mem = twoDecimals(sum.Mem / count) + sum.MemUsed = twoDecimals(sum.MemUsed / count) + sum.MemPct = twoDecimals(sum.MemPct / count) + sum.MemBuffCache = twoDecimals(sum.MemBuffCache / count) + sum.MemZfsArc = twoDecimals(sum.MemZfsArc / count) + sum.Swap = twoDecimals(sum.Swap / count) + sum.SwapUsed = twoDecimals(sum.SwapUsed / count) + sum.DiskTotal = twoDecimals(sum.DiskTotal / count) + sum.DiskUsed = twoDecimals(sum.DiskUsed / count) + sum.DiskPct = twoDecimals(sum.DiskPct / count) + sum.DiskReadPs = twoDecimals(sum.DiskReadPs / count) + sum.DiskWritePs = twoDecimals(sum.DiskWritePs / count) + sum.NetworkSent = twoDecimals(sum.NetworkSent / count) + sum.NetworkRecv = twoDecimals(sum.NetworkRecv / count) - if sum.Temperatures != nil { - stats.Temperatures = make(map[string]float64, len(sum.Temperatures)) - for key, value := range sum.Temperatures { - stats.Temperatures[key] = twoDecimals(value / tempCount) + // Average temperatures + if sum.Temperatures != nil && tempCount > 0 { + for key := range sum.Temperatures { + sum.Temperatures[key] = twoDecimals(sum.Temperatures[key] / tempCount) + } } - } - if sum.ExtraFs != nil { - stats.ExtraFs = make(map[string]*system.FsStats, len(sum.ExtraFs)) - for key, value := range sum.ExtraFs { - stats.ExtraFs[key] = &system.FsStats{ - DiskTotal: twoDecimals(value.DiskTotal / count), - DiskUsed: twoDecimals(value.DiskUsed / count), - DiskWritePs: twoDecimals(value.DiskWritePs / count), - DiskReadPs: twoDecimals(value.DiskReadPs / count), - MaxDiskReadPS: value.MaxDiskReadPS, - MaxDiskWritePS: value.MaxDiskWritePS, + // Average extra filesystem stats + if sum.ExtraFs != nil { + for key := range sum.ExtraFs { + fs := sum.ExtraFs[key] + fs.DiskTotal = twoDecimals(fs.DiskTotal / count) + fs.DiskUsed = twoDecimals(fs.DiskUsed / count) + fs.DiskWritePs = twoDecimals(fs.DiskWritePs / count) + fs.DiskReadPs = twoDecimals(fs.DiskReadPs / count) + } + } + + // Average GPU data + if sum.GPUData != nil { + for id := range sum.GPUData { + gpu := sum.GPUData[id] + gpu.Temperature = twoDecimals(gpu.Temperature / count) + gpu.MemoryUsed = twoDecimals(gpu.MemoryUsed / count) + gpu.MemoryTotal = twoDecimals(gpu.MemoryTotal / count) + gpu.Usage = twoDecimals(gpu.Usage / count) + gpu.Power = twoDecimals(gpu.Power / count) + gpu.Count = twoDecimals(gpu.Count / count) + sum.GPUData[id] = gpu } } } - if sum.GPUData != nil { - stats.GPUData = make(map[string]system.GPUData, len(sum.GPUData)) - for id, value := range sum.GPUData { - stats.GPUData[id] = system.GPUData{ - Name: value.Name, - Temperature: twoDecimals(value.Temperature / count), - MemoryUsed: twoDecimals(value.MemoryUsed / count), - MemoryTotal: twoDecimals(value.MemoryTotal / count), - Usage: twoDecimals(value.Usage / count), - Power: twoDecimals(value.Power / count), - Count: twoDecimals(value.Count / count), - } - } - } - - return stats + return sum } // Calculate the average stats of a list of container_stats records func (rm *RecordManager) AverageContainerStats(records RecordStats) []container.Stats { sums := make(map[string]*container.Stats) count := float64(len(records)) - - var containerStats []container.Stats + containerStats := make([]container.Stats, 0, 50) for i := range records { - // Reset the slice length to 0, but keep the capacity + // reset slice containerStats = containerStats[:0] if err := json.Unmarshal(records[i].Stats, &containerStats); err != nil { return []container.Stats{} @@ -330,38 +330,45 @@ func (rm *RecordManager) AverageContainerStats(records RecordStats) []container. // Deletes records older than what is displayed in the UI func (rm *RecordManager) DeleteOldRecords() { + // Define the collections to process collections := []string{"system_stats", "container_stats"} - recordData := []RecordDeletionData{ - { - recordType: "1m", - retention: time.Hour, - }, - { - recordType: "10m", - retention: 12 * time.Hour, - }, - { - recordType: "20m", - retention: 24 * time.Hour, - }, - { - recordType: "120m", - retention: 7 * 24 * time.Hour, - }, - { - recordType: "480m", - retention: 30 * 24 * time.Hour, - }, + + // Define record types and their retention periods + type RecordDeletionData struct { + recordType string + retention time.Duration } - db := rm.app.NonconcurrentDB() - for _, recordData := range recordData { - for _, collectionSlug := range collections { - formattedDate := time.Now().UTC().Add(-recordData.retention).Format(types.DefaultDateLayout) - expr := dbx.NewExp("[[created]] < {:date} AND [[type]] = {:type}", dbx.Params{"date": formattedDate, "type": recordData.recordType}) - _, err := db.Delete(collectionSlug, expr).Execute() - if err != nil { - rm.app.Logger().Error("Failed to delete records", "err", err.Error()) - } + recordData := []RecordDeletionData{ + {recordType: "1m", retention: time.Hour}, // 1 hour + {recordType: "10m", retention: 12 * time.Hour}, // 12 hours + {recordType: "20m", retention: 24 * time.Hour}, // 1 day + {recordType: "120m", retention: 7 * 24 * time.Hour}, // 7 days + {recordType: "480m", retention: 30 * 24 * time.Hour}, // 30 days + } + + // Process each collection + for _, collection := range collections { + // Build the WHERE clause dynamically + var conditionParts []string + var params dbx.Params = make(map[string]any) + + for i, rd := range recordData { + // Create parameterized condition for this record type + dateParam := fmt.Sprintf("date%d", i) + conditionParts = append(conditionParts, fmt.Sprintf("(type = '%s' AND created < {:%s})", rd.recordType, dateParam)) + params[dateParam] = time.Now().UTC().Add(-rd.retention) + } + + // Combine conditions with OR + conditionStr := strings.Join(conditionParts, " OR ") + + // Construct the full raw query + rawQuery := fmt.Sprintf("DELETE FROM %s WHERE %s", collection, conditionStr) + + // Execute the query with parameters + if _, err := rm.app.DB().NewQuery(rawQuery).Bind(params).Execute(); err != nil { + // return fmt.Errorf("failed to delete from %s: %v", collection, err) + rm.app.Logger().Error("failed to delete", "collection", collection, "error", err) } } }