mirror of
https://github.com/fankes/beszel.git
synced 2025-10-22 11:29:23 +08:00
improve memory efficiency of records.go
This commit is contained in:
@@ -4,13 +4,13 @@ package records
|
|||||||
import (
|
import (
|
||||||
"beszel/internal/entities/container"
|
"beszel/internal/entities/container"
|
||||||
"beszel/internal/entities/system"
|
"beszel/internal/entities/system"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/goccy/go-json"
|
|
||||||
"github.com/pocketbase/dbx"
|
"github.com/pocketbase/dbx"
|
||||||
"github.com/pocketbase/pocketbase/core"
|
"github.com/pocketbase/pocketbase/core"
|
||||||
)
|
)
|
||||||
@@ -26,14 +26,26 @@ type LongerRecordData struct {
|
|||||||
minShorterRecords int
|
minShorterRecords int
|
||||||
}
|
}
|
||||||
|
|
||||||
type RecordStats []struct {
|
type RecordIds []struct {
|
||||||
Stats []byte `db:"stats"`
|
Id string `db:"id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRecordManager(app core.App) *RecordManager {
|
func NewRecordManager(app core.App) *RecordManager {
|
||||||
return &RecordManager{app}
|
return &RecordManager{app}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StatsRecord struct {
|
||||||
|
Stats []byte `db:"stats"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// global variables for reusing allocations
|
||||||
|
var statsRecord StatsRecord
|
||||||
|
var containerStats []container.Stats
|
||||||
|
var sumStats system.Stats
|
||||||
|
var tempStats system.Stats
|
||||||
|
var queryParams = make(dbx.Params, 1)
|
||||||
|
var containerSums = make(map[string]*container.Stats)
|
||||||
|
|
||||||
// Create longer records by averaging shorter records
|
// Create longer records by averaging shorter records
|
||||||
func (rm *RecordManager) CreateLongerRecords() {
|
func (rm *RecordManager) CreateLongerRecords() {
|
||||||
// start := time.Now()
|
// start := time.Now()
|
||||||
@@ -76,11 +88,10 @@ func (rm *RecordManager) CreateLongerRecords() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var systems []struct {
|
var systems RecordIds
|
||||||
Id string `db:"id"`
|
db := txApp.DB()
|
||||||
}
|
|
||||||
|
|
||||||
txApp.DB().NewQuery("SELECT id FROM systems WHERE status='up'").All(&systems)
|
db.NewQuery("SELECT id FROM systems WHERE status='up'").All(&systems)
|
||||||
|
|
||||||
// loop through all active systems, time periods, and collections
|
// loop through all active systems, time periods, and collections
|
||||||
for _, system := range systems {
|
for _, system := range systems {
|
||||||
@@ -96,22 +107,23 @@ func (rm *RecordManager) CreateLongerRecords() {
|
|||||||
for _, collection := range collections {
|
for _, collection := range collections {
|
||||||
// check creation time of last longer record if not 10m, since 10m is created every run
|
// check creation time of last longer record if not 10m, since 10m is created every run
|
||||||
if recordData.longerType != "10m" {
|
if recordData.longerType != "10m" {
|
||||||
lastLongerRecord, err := txApp.FindFirstRecordByFilter(
|
count, err := txApp.CountRecords(
|
||||||
collection.Id,
|
collection.Id,
|
||||||
"system = {:system} && type = {:type} && created > {:created}",
|
dbx.NewExp(
|
||||||
dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod},
|
"system = {:system} AND type = {:type} AND created > {:created}",
|
||||||
|
dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod},
|
||||||
|
),
|
||||||
)
|
)
|
||||||
// continue if longer record exists
|
// continue if longer record exists
|
||||||
if err == nil || lastLongerRecord != nil {
|
if err != nil || count > 0 {
|
||||||
// log.Println("longer record found. continuing")
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// get shorter records from the past x minutes
|
// get shorter records from the past x minutes
|
||||||
var stats RecordStats
|
var recordIds RecordIds
|
||||||
|
|
||||||
err := txApp.DB().
|
err := txApp.DB().
|
||||||
Select("stats").
|
Select("id").
|
||||||
From(collection.Name).
|
From(collection.Name).
|
||||||
AndWhere(dbx.NewExp(
|
AndWhere(dbx.NewExp(
|
||||||
"system={:system} AND type={:type} AND created > {:created}",
|
"system={:system} AND type={:type} AND created > {:created}",
|
||||||
@@ -121,10 +133,10 @@ func (rm *RecordManager) CreateLongerRecords() {
|
|||||||
"created": shorterRecordPeriod,
|
"created": shorterRecordPeriod,
|
||||||
},
|
},
|
||||||
)).
|
)).
|
||||||
All(&stats)
|
All(&recordIds)
|
||||||
|
|
||||||
// continue if not enough shorter records
|
// continue if not enough shorter records
|
||||||
if err != nil || len(stats) < recordData.minShorterRecords {
|
if err != nil || len(recordIds) < recordData.minShorterRecords {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// average the shorter records and create longer record
|
// average the shorter records and create longer record
|
||||||
@@ -133,9 +145,10 @@ func (rm *RecordManager) CreateLongerRecords() {
|
|||||||
longerRecord.Set("type", recordData.longerType)
|
longerRecord.Set("type", recordData.longerType)
|
||||||
switch collection.Name {
|
switch collection.Name {
|
||||||
case "system_stats":
|
case "system_stats":
|
||||||
longerRecord.Set("stats", rm.AverageSystemStats(stats))
|
longerRecord.Set("stats", rm.AverageSystemStats(db, recordIds))
|
||||||
case "container_stats":
|
case "container_stats":
|
||||||
longerRecord.Set("stats", rm.AverageContainerStats(stats))
|
|
||||||
|
longerRecord.Set("stats", rm.AverageContainerStats(db, recordIds))
|
||||||
}
|
}
|
||||||
if err := txApp.SaveNoValidate(longerRecord); err != nil {
|
if err := txApp.SaveNoValidate(longerRecord); err != nil {
|
||||||
log.Println("failed to save longer record", "err", err)
|
log.Println("failed to save longer record", "err", err)
|
||||||
@@ -147,24 +160,34 @@ func (rm *RecordManager) CreateLongerRecords() {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
statsRecord.Stats = statsRecord.Stats[:0]
|
||||||
|
|
||||||
// log.Println("finished creating longer records", "time (ms)", time.Since(start).Milliseconds())
|
// log.Println("finished creating longer records", "time (ms)", time.Since(start).Milliseconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate the average stats of a list of system_stats records without reflect
|
// Calculate the average stats of a list of system_stats records without reflect
|
||||||
func (rm *RecordManager) AverageSystemStats(records RecordStats) *system.Stats {
|
func (rm *RecordManager) AverageSystemStats(db dbx.Builder, records RecordIds) *system.Stats {
|
||||||
sum := &system.Stats{}
|
// Clear/reset global structs for reuse
|
||||||
|
sumStats = system.Stats{}
|
||||||
|
tempStats = system.Stats{}
|
||||||
|
sum := &sumStats
|
||||||
|
stats := &tempStats
|
||||||
|
|
||||||
count := float64(len(records))
|
count := float64(len(records))
|
||||||
tempCount := float64(0)
|
tempCount := float64(0)
|
||||||
|
|
||||||
// Temporary struct for unmarshaling
|
|
||||||
stats := &system.Stats{}
|
|
||||||
|
|
||||||
// Accumulate totals
|
// Accumulate totals
|
||||||
for i := range records {
|
for _, record := range records {
|
||||||
*stats = system.Stats{} // Reset tempStats for unmarshaling
|
id := record.Id
|
||||||
if err := json.Unmarshal(records[i].Stats, stats); err != nil {
|
// clear global statsRecord for reuse
|
||||||
|
statsRecord.Stats = statsRecord.Stats[:0]
|
||||||
|
|
||||||
|
queryParams["id"] = id
|
||||||
|
db.NewQuery("SELECT stats FROM system_stats WHERE id = {:id}").Bind(queryParams).One(&statsRecord)
|
||||||
|
if err := json.Unmarshal(statsRecord.Stats, stats); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
sum.Cpu += stats.Cpu
|
sum.Cpu += stats.Cpu
|
||||||
sum.Mem += stats.Mem
|
sum.Mem += stats.Mem
|
||||||
sum.MemUsed += stats.MemUsed
|
sum.MemUsed += stats.MemUsed
|
||||||
@@ -293,14 +316,24 @@ func (rm *RecordManager) AverageSystemStats(records RecordStats) *system.Stats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Calculate the average stats of a list of container_stats records
|
// Calculate the average stats of a list of container_stats records
|
||||||
func (rm *RecordManager) AverageContainerStats(records RecordStats) []container.Stats {
|
func (rm *RecordManager) AverageContainerStats(db dbx.Builder, records RecordIds) []container.Stats {
|
||||||
sums := make(map[string]*container.Stats)
|
// Clear global map for reuse
|
||||||
|
for k := range containerSums {
|
||||||
|
delete(containerSums, k)
|
||||||
|
}
|
||||||
|
sums := containerSums
|
||||||
count := float64(len(records))
|
count := float64(len(records))
|
||||||
containerStats := make([]container.Stats, 0, 50)
|
|
||||||
for i := range records {
|
for i := range records {
|
||||||
// reset slice
|
id := records[i].Id
|
||||||
|
// clear global statsRecord and containerStats for reuse
|
||||||
|
statsRecord.Stats = statsRecord.Stats[:0]
|
||||||
containerStats = containerStats[:0]
|
containerStats = containerStats[:0]
|
||||||
if err := json.Unmarshal(records[i].Stats, &containerStats); err != nil {
|
|
||||||
|
queryParams["id"] = id
|
||||||
|
db.NewQuery("SELECT stats FROM container_stats WHERE id = {:id}").Bind(queryParams).One(&statsRecord)
|
||||||
|
|
||||||
|
if err := json.Unmarshal(statsRecord.Stats, &containerStats); err != nil {
|
||||||
return []container.Stats{}
|
return []container.Stats{}
|
||||||
}
|
}
|
||||||
for i := range containerStats {
|
for i := range containerStats {
|
||||||
@@ -331,7 +364,7 @@ func (rm *RecordManager) AverageContainerStats(records RecordStats) []container.
|
|||||||
// Deletes records older than what is displayed in the UI
|
// Deletes records older than what is displayed in the UI
|
||||||
func (rm *RecordManager) DeleteOldRecords() {
|
func (rm *RecordManager) DeleteOldRecords() {
|
||||||
// Define the collections to process
|
// Define the collections to process
|
||||||
collections := []string{"system_stats", "container_stats"}
|
collections := [2]string{"system_stats", "container_stats"}
|
||||||
|
|
||||||
// Define record types and their retention periods
|
// Define record types and their retention periods
|
||||||
type RecordDeletionData struct {
|
type RecordDeletionData struct {
|
||||||
@@ -346,17 +379,19 @@ func (rm *RecordManager) DeleteOldRecords() {
|
|||||||
{recordType: "480m", retention: 30 * 24 * time.Hour}, // 30 days
|
{recordType: "480m", retention: 30 * 24 * time.Hour}, // 30 days
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process each collection
|
now := time.Now().UTC()
|
||||||
|
|
||||||
for _, collection := range collections {
|
for _, collection := range collections {
|
||||||
// Build the WHERE clause dynamically
|
// Build the WHERE clause dynamically
|
||||||
var conditionParts []string
|
var conditionParts []string
|
||||||
var params dbx.Params = make(map[string]any)
|
var params dbx.Params = make(map[string]any)
|
||||||
|
|
||||||
for i, rd := range recordData {
|
for i := range recordData {
|
||||||
|
rd := recordData[i]
|
||||||
// Create parameterized condition for this record type
|
// Create parameterized condition for this record type
|
||||||
dateParam := fmt.Sprintf("date%d", i)
|
dateParam := fmt.Sprintf("date%d", i)
|
||||||
conditionParts = append(conditionParts, fmt.Sprintf("(type = '%s' AND created < {:%s})", rd.recordType, dateParam))
|
conditionParts = append(conditionParts, fmt.Sprintf("(type = '%s' AND created < {:%s})", rd.recordType, dateParam))
|
||||||
params[dateParam] = time.Now().UTC().Add(-rd.retention)
|
params[dateParam] = now.Add(-rd.retention)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Combine conditions with OR
|
// Combine conditions with OR
|
||||||
|
Reference in New Issue
Block a user