From 5ddb200a756573414f4993714eb35eb3df73eaec Mon Sep 17 00:00:00 2001 From: henrygd Date: Tue, 8 Jul 2025 18:03:49 -0400 Subject: [PATCH] improve memory efficiency of records.go --- beszel/internal/records/records.go | 105 +++++++++++++++++++---------- 1 file changed, 70 insertions(+), 35 deletions(-) diff --git a/beszel/internal/records/records.go b/beszel/internal/records/records.go index e740b21..d263ba7 100644 --- a/beszel/internal/records/records.go +++ b/beszel/internal/records/records.go @@ -4,13 +4,13 @@ package records import ( "beszel/internal/entities/container" "beszel/internal/entities/system" + "encoding/json" "fmt" "log" "math" "strings" "time" - "github.com/goccy/go-json" "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase/core" ) @@ -26,14 +26,26 @@ type LongerRecordData struct { minShorterRecords int } -type RecordStats []struct { - Stats []byte `db:"stats"` +type RecordIds []struct { + Id string `db:"id"` } func NewRecordManager(app core.App) *RecordManager { return &RecordManager{app} } +type StatsRecord struct { + Stats []byte `db:"stats"` +} + +// global variables for reusing allocations +var statsRecord StatsRecord +var containerStats []container.Stats +var sumStats system.Stats +var tempStats system.Stats +var queryParams = make(dbx.Params, 1) +var containerSums = make(map[string]*container.Stats) + // Create longer records by averaging shorter records func (rm *RecordManager) CreateLongerRecords() { // start := time.Now() @@ -76,11 +88,10 @@ func (rm *RecordManager) CreateLongerRecords() { if err != nil { return err } - var systems []struct { - Id string `db:"id"` - } + var systems RecordIds + db := txApp.DB() - txApp.DB().NewQuery("SELECT id FROM systems WHERE status='up'").All(&systems) + db.NewQuery("SELECT id FROM systems WHERE status='up'").All(&systems) // loop through all active systems, time periods, and collections for _, system := range systems { @@ -96,22 +107,23 @@ func (rm *RecordManager) CreateLongerRecords() { for _, collection := range collections { // check creation time of last longer record if not 10m, since 10m is created every run if recordData.longerType != "10m" { - lastLongerRecord, err := txApp.FindFirstRecordByFilter( + count, err := txApp.CountRecords( collection.Id, - "system = {:system} && type = {:type} && created > {:created}", - dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod}, + dbx.NewExp( + "system = {:system} AND type = {:type} AND created > {:created}", + dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod}, + ), ) // continue if longer record exists - if err == nil || lastLongerRecord != nil { - // log.Println("longer record found. continuing") + if err != nil || count > 0 { continue } } // get shorter records from the past x minutes - var stats RecordStats + var recordIds RecordIds err := txApp.DB(). - Select("stats"). + Select("id"). From(collection.Name). AndWhere(dbx.NewExp( "system={:system} AND type={:type} AND created > {:created}", @@ -121,10 +133,10 @@ func (rm *RecordManager) CreateLongerRecords() { "created": shorterRecordPeriod, }, )). - All(&stats) + All(&recordIds) // continue if not enough shorter records - if err != nil || len(stats) < recordData.minShorterRecords { + if err != nil || len(recordIds) < recordData.minShorterRecords { continue } // average the shorter records and create longer record @@ -133,9 +145,10 @@ func (rm *RecordManager) CreateLongerRecords() { longerRecord.Set("type", recordData.longerType) switch collection.Name { case "system_stats": - longerRecord.Set("stats", rm.AverageSystemStats(stats)) + longerRecord.Set("stats", rm.AverageSystemStats(db, recordIds)) case "container_stats": - longerRecord.Set("stats", rm.AverageContainerStats(stats)) + + longerRecord.Set("stats", rm.AverageContainerStats(db, recordIds)) } if err := txApp.SaveNoValidate(longerRecord); err != nil { log.Println("failed to save longer record", "err", err) @@ -147,24 +160,34 @@ func (rm *RecordManager) CreateLongerRecords() { return nil }) + statsRecord.Stats = statsRecord.Stats[:0] + // log.Println("finished creating longer records", "time (ms)", time.Since(start).Milliseconds()) } // Calculate the average stats of a list of system_stats records without reflect -func (rm *RecordManager) AverageSystemStats(records RecordStats) *system.Stats { - sum := &system.Stats{} +func (rm *RecordManager) AverageSystemStats(db dbx.Builder, records RecordIds) *system.Stats { + // Clear/reset global structs for reuse + sumStats = system.Stats{} + tempStats = system.Stats{} + sum := &sumStats + stats := &tempStats + count := float64(len(records)) tempCount := float64(0) - // Temporary struct for unmarshaling - stats := &system.Stats{} - // Accumulate totals - for i := range records { - *stats = system.Stats{} // Reset tempStats for unmarshaling - if err := json.Unmarshal(records[i].Stats, stats); err != nil { + for _, record := range records { + id := record.Id + // clear global statsRecord for reuse + statsRecord.Stats = statsRecord.Stats[:0] + + queryParams["id"] = id + db.NewQuery("SELECT stats FROM system_stats WHERE id = {:id}").Bind(queryParams).One(&statsRecord) + if err := json.Unmarshal(statsRecord.Stats, stats); err != nil { continue } + sum.Cpu += stats.Cpu sum.Mem += stats.Mem sum.MemUsed += stats.MemUsed @@ -293,14 +316,24 @@ func (rm *RecordManager) AverageSystemStats(records RecordStats) *system.Stats { } // Calculate the average stats of a list of container_stats records -func (rm *RecordManager) AverageContainerStats(records RecordStats) []container.Stats { - sums := make(map[string]*container.Stats) +func (rm *RecordManager) AverageContainerStats(db dbx.Builder, records RecordIds) []container.Stats { + // Clear global map for reuse + for k := range containerSums { + delete(containerSums, k) + } + sums := containerSums count := float64(len(records)) - containerStats := make([]container.Stats, 0, 50) + for i := range records { - // reset slice + id := records[i].Id + // clear global statsRecord and containerStats for reuse + statsRecord.Stats = statsRecord.Stats[:0] containerStats = containerStats[:0] - if err := json.Unmarshal(records[i].Stats, &containerStats); err != nil { + + queryParams["id"] = id + db.NewQuery("SELECT stats FROM container_stats WHERE id = {:id}").Bind(queryParams).One(&statsRecord) + + if err := json.Unmarshal(statsRecord.Stats, &containerStats); err != nil { return []container.Stats{} } for i := range containerStats { @@ -331,7 +364,7 @@ func (rm *RecordManager) AverageContainerStats(records RecordStats) []container. // Deletes records older than what is displayed in the UI func (rm *RecordManager) DeleteOldRecords() { // Define the collections to process - collections := []string{"system_stats", "container_stats"} + collections := [2]string{"system_stats", "container_stats"} // Define record types and their retention periods type RecordDeletionData struct { @@ -346,17 +379,19 @@ func (rm *RecordManager) DeleteOldRecords() { {recordType: "480m", retention: 30 * 24 * time.Hour}, // 30 days } - // Process each collection + now := time.Now().UTC() + for _, collection := range collections { // Build the WHERE clause dynamically var conditionParts []string var params dbx.Params = make(map[string]any) - for i, rd := range recordData { + for i := range recordData { + rd := recordData[i] // Create parameterized condition for this record type dateParam := fmt.Sprintf("date%d", i) conditionParts = append(conditionParts, fmt.Sprintf("(type = '%s' AND created < {:%s})", rd.recordType, dateParam)) - params[dateParam] = time.Now().UTC().Add(-rd.retention) + params[dateParam] = now.Add(-rd.retention) } // Combine conditions with OR