move longer records creation to a scheduled job

This commit is contained in:
Henry Dollman
2024-08-18 18:23:17 -04:00
parent b5607025f7
commit 0566433aa1
2 changed files with 238 additions and 139 deletions

View File

@@ -107,20 +107,16 @@ func (h *Hub) Run() {
return nil return nil
}) })
// set up cron jobs / ticker for system updates // set up scheduled jobs / ticker for system updates
h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error { h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error {
// 15 second ticker for system updates // 15 second ticker for system updates
go h.startSystemUpdateTicker() go h.startSystemUpdateTicker()
// cron job to delete old records // set up cron jobs
scheduler := cron.New() scheduler := cron.New()
scheduler.MustAdd("delete old records", "8 * * * *", func() { // delete old records once every hour
collections := []string{"system_stats", "container_stats"} scheduler.MustAdd("delete old records", "8 * * * *", rm.DeleteOldRecords)
rm.DeleteOldRecords(collections, "1m", time.Hour) // create longer records every 10 minutes
rm.DeleteOldRecords(collections, "10m", 12*time.Hour) scheduler.MustAdd("create longer records", "*/10 * * * *", rm.CreateLongerRecords)
rm.DeleteOldRecords(collections, "20m", 24*time.Hour)
rm.DeleteOldRecords(collections, "120m", 7*24*time.Hour)
rm.DeleteOldRecords(collections, "480m", 30*24*time.Hour)
})
scheduler.Start() scheduler.Start()
return nil return nil
}) })
@@ -201,16 +197,6 @@ func (h *Hub) Run() {
return nil return nil
}) })
h.app.OnModelAfterCreate("system_stats").Add(func(e *core.ModelEvent) error {
rm.CreateLongerRecords("system_stats", e.Model.(*models.Record))
return nil
})
h.app.OnModelAfterCreate("container_stats").Add(func(e *core.ModelEvent) error {
rm.CreateLongerRecords("container_stats", e.Model.(*models.Record))
return nil
})
if err := h.app.Start(); err != nil { if err := h.app.Start(); err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@@ -4,9 +4,8 @@ package records
import ( import (
"beszel/internal/entities/container" "beszel/internal/entities/container"
"beszel/internal/entities/system" "beszel/internal/entities/system"
"fmt" "log"
"math" "math"
"reflect"
"time" "time"
"github.com/pocketbase/dbx" "github.com/pocketbase/dbx"
@@ -19,113 +18,202 @@ type RecordManager struct {
app *pocketbase.PocketBase app *pocketbase.PocketBase
} }
type LongerRecordData struct {
shorterType string
longerType string
longerTimeDuration time.Duration
expectedShorterRecords int
}
type RecordDeletionData struct {
recordType string
retention time.Duration
}
func NewRecordManager(app *pocketbase.PocketBase) *RecordManager { func NewRecordManager(app *pocketbase.PocketBase) *RecordManager {
return &RecordManager{app: app} return &RecordManager{app}
} }
func (rm *RecordManager) CreateLongerRecords(collectionName string, shorterRecord *models.Record) { // Create longer records by averaging shorter records
shorterRecordType := shorterRecord.GetString("type") func (rm *RecordManager) CreateLongerRecords() {
systemId := shorterRecord.GetString("system") // start := time.Now()
// fmt.Println("create longer records", "recordType", shorterRecordType, "systemId", systemId) recordData := []LongerRecordData{
var longerRecordType string {
var timeAgo time.Duration shorterType: "1m",
var expectedShorterRecords int expectedShorterRecords: 10,
switch shorterRecordType { longerType: "10m",
case "1m": longerTimeDuration: -10 * time.Minute,
longerRecordType = "10m" },
timeAgo = -10 * time.Minute {
expectedShorterRecords = 10 shorterType: "10m",
case "10m": expectedShorterRecords: 2,
longerRecordType = "20m" longerType: "20m",
timeAgo = -20 * time.Minute longerTimeDuration: -20 * time.Minute,
expectedShorterRecords = 2 },
case "20m": {
longerRecordType = "120m" shorterType: "20m",
timeAgo = -120 * time.Minute expectedShorterRecords: 6,
expectedShorterRecords = 6 longerType: "120m",
default: longerTimeDuration: -120 * time.Minute,
longerRecordType = "480m" },
timeAgo = -480 * time.Minute {
expectedShorterRecords = 4 shorterType: "120m",
expectedShorterRecords: 4,
longerType: "480m",
longerTimeDuration: -480 * time.Minute,
},
}
// wrap the operations in a transaction
rm.app.Dao().RunInTransaction(func(txDao *daos.Dao) error {
activeSystems, err := txDao.FindRecordsByExpr("systems", dbx.NewExp("status = 'up'"))
if err != nil {
log.Println("failed to get active systems", "err", err.Error())
return err
} }
longerRecordPeriod := time.Now().UTC().Add(timeAgo + 10*time.Second).Format("2006-01-02 15:04:05") collections := map[string]*models.Collection{}
// check creation time of last 10m record for _, collectionName := range []string{"system_stats", "container_stats"} {
lastLongerRecord, err := rm.app.Dao().FindFirstRecordByFilter( collection, _ := txDao.FindCollectionByNameOrId(collectionName)
collectionName, collections[collectionName] = collection
}
// loop through all active systems, time periods, and collections
for _, system := range activeSystems {
// log.Println("processing system", system.GetString("name"))
for _, recordData := range recordData {
// log.Println("processing longer record type", recordData.longerType)
// add one minute padding for longer records because they are created slightly later than the job start time
longerRecordPeriod := time.Now().UTC().Add(recordData.longerTimeDuration + time.Minute)
// shorter records are created independently of longer records, so we shouldn't need to add padding
shorterRecordPeriod := time.Now().UTC().Add(recordData.longerTimeDuration)
// loop through both collections
for _, collection := range collections {
// check creation time of last longer record if not 10m, since 10m is created every run
if recordData.longerType != "10m" {
lastLongerRecord, err := txDao.FindFirstRecordByFilter(
collection.Id,
"type = {:type} && system = {:system} && created > {:created}", "type = {:type} && system = {:system} && created > {:created}",
dbx.Params{"type": longerRecordType, "system": systemId, "created": longerRecordPeriod}, dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod},
) )
// return if longer record exists // continue if longer record exists
if err == nil || lastLongerRecord != nil { if err == nil || lastLongerRecord != nil {
// log.Println("longer record found. returning") // log.Println("longer record found. continuing")
return continue
}
} }
// get shorter records from the past x minutes // get shorter records from the past x minutes
// shorterRecordPeriod := time.Now().UTC().Add(timeAgo + time.Second).Format("2006-01-02 15:04:05") allShorterRecords, err := txDao.FindRecordsByExpr(
allShorterRecords, err := rm.app.Dao().FindRecordsByFilter( collection.Id,
collectionName, dbx.NewExp(
"type = {:type} && system = {:system} && created > {:created}", "type = {:type} AND system = {:system} AND created > {:created}",
"-created", dbx.Params{"type": recordData.shorterType, "system": system.Id, "created": shorterRecordPeriod},
-1, ),
0,
dbx.Params{"type": shorterRecordType, "system": systemId, "created": longerRecordPeriod},
) )
// return if not enough shorter records
if err != nil || len(allShorterRecords) < expectedShorterRecords { // continue if not enough shorter records
// log.Println("not enough shorter records. returning") if err != nil || len(allShorterRecords) < recordData.expectedShorterRecords {
return // log.Println("not enough shorter records. continue.", len(allShorterRecords), recordData.expectedShorterRecords)
continue
} }
// average the shorter records and create longer record // average the shorter records and create longer record
var stats interface{} var stats interface{}
switch collectionName { switch collection.Name {
case "system_stats": case "system_stats":
stats = rm.AverageSystemStats(allShorterRecords) stats = rm.AverageSystemStats(allShorterRecords)
case "container_stats": case "container_stats":
stats = rm.AverageContainerStats(allShorterRecords) stats = rm.AverageContainerStats(allShorterRecords)
} }
collection, _ := rm.app.Dao().FindCollectionByNameOrId(collectionName)
longerRecord := models.NewRecord(collection) longerRecord := models.NewRecord(collection)
longerRecord.Set("system", systemId) longerRecord.Set("system", system.Id)
longerRecord.Set("stats", stats) longerRecord.Set("stats", stats)
longerRecord.Set("type", longerRecordType) longerRecord.Set("type", recordData.longerType)
if err := rm.app.Dao().SaveRecord(longerRecord); err != nil { if err := txDao.SaveRecord(longerRecord); err != nil {
fmt.Println("failed to save longer record", "err", err.Error()) log.Println("failed to save longer record", "err", err.Error())
}
}
}
} }
return nil
})
// log.Println("finished creating longer records", "time (ms)", time.Since(start).Milliseconds())
} }
// calculate the average stats of a list of system_stats records // Calculate the average stats of a list of system_stats records with reflect
// func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Stats {
// count := float64(len(records))
// sum := reflect.New(reflect.TypeOf(system.Stats{})).Elem()
// var stats system.Stats
// for _, record := range records {
// record.UnmarshalJSONField("stats", &stats)
// statValue := reflect.ValueOf(stats)
// for i := 0; i < statValue.NumField(); i++ {
// field := sum.Field(i)
// field.SetFloat(field.Float() + statValue.Field(i).Float())
// }
// }
// average := reflect.New(reflect.TypeOf(system.Stats{})).Elem()
// for i := 0; i < sum.NumField(); i++ {
// average.Field(i).SetFloat(twoDecimals(sum.Field(i).Float() / count))
// }
// return average.Interface().(system.Stats)
// }
// Calculate the average stats of a list of system_stats records without reflect
func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Stats { func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Stats {
var sum system.Stats
count := float64(len(records)) count := float64(len(records))
sum := reflect.New(reflect.TypeOf(system.Stats{})).Elem()
for _, record := range records {
var stats system.Stats var stats system.Stats
for _, record := range records {
record.UnmarshalJSONField("stats", &stats) record.UnmarshalJSONField("stats", &stats)
statValue := reflect.ValueOf(stats) sum.Cpu += stats.Cpu
for i := 0; i < statValue.NumField(); i++ { sum.Mem += stats.Mem
field := sum.Field(i) sum.MemUsed += stats.MemUsed
field.SetFloat(field.Float() + statValue.Field(i).Float()) sum.MemPct += stats.MemPct
sum.MemBuffCache += stats.MemBuffCache
sum.Swap += stats.Swap
sum.SwapUsed += stats.SwapUsed
sum.Disk += stats.Disk
sum.DiskUsed += stats.DiskUsed
sum.DiskPct += stats.DiskPct
sum.DiskRead += stats.DiskRead
sum.DiskWrite += stats.DiskWrite
sum.NetworkSent += stats.NetworkSent
sum.NetworkRecv += stats.NetworkRecv
}
return system.Stats{
Cpu: twoDecimals(sum.Cpu / count),
Mem: twoDecimals(sum.Mem / count),
MemUsed: twoDecimals(sum.MemUsed / count),
MemPct: twoDecimals(sum.MemPct / count),
MemBuffCache: twoDecimals(sum.MemBuffCache / count),
Swap: twoDecimals(sum.Swap / count),
SwapUsed: twoDecimals(sum.SwapUsed / count),
Disk: twoDecimals(sum.Disk / count),
DiskUsed: twoDecimals(sum.DiskUsed / count),
DiskPct: twoDecimals(sum.DiskPct / count),
DiskRead: twoDecimals(sum.DiskRead / count),
DiskWrite: twoDecimals(sum.DiskWrite / count),
NetworkSent: twoDecimals(sum.NetworkSent / count),
NetworkRecv: twoDecimals(sum.NetworkRecv / count),
} }
} }
average := reflect.New(reflect.TypeOf(system.Stats{})).Elem() // Calculate the average stats of a list of container_stats records
for i := 0; i < sum.NumField(); i++ {
average.Field(i).SetFloat(twoDecimals(sum.Field(i).Float() / count))
}
return average.Interface().(system.Stats)
}
// calculate the average stats of a list of container_stats records
func (rm *RecordManager) AverageContainerStats(records []*models.Record) (stats []container.Stats) { func (rm *RecordManager) AverageContainerStats(records []*models.Record) (stats []container.Stats) {
sums := make(map[string]*container.Stats) sums := make(map[string]*container.Stats)
count := float64(len(records)) count := float64(len(records))
var containerStats []container.Stats
for _, record := range records { for _, record := range records {
var stats []container.Stats record.UnmarshalJSONField("stats", &containerStats)
record.UnmarshalJSONField("stats", &stats) for _, stat := range containerStats {
for _, stat := range stats {
if _, ok := sums[stat.Name]; !ok { if _, ok := sums[stat.Name]; !ok {
sums[stat.Name] = &container.Stats{Name: stat.Name, Cpu: 0, Mem: 0} sums[stat.Name] = &container.Stats{Name: stat.Name, Cpu: 0, Mem: 0}
} }
@@ -135,6 +223,7 @@ func (rm *RecordManager) AverageContainerStats(records []*models.Record) (stats
sums[stat.Name].NetworkRecv += stat.NetworkRecv sums[stat.Name].NetworkRecv += stat.NetworkRecv
} }
} }
for _, value := range sums { for _, value := range sums {
stats = append(stats, container.Stats{ stats = append(stats, container.Stats{
Name: value.Name, Name: value.Name,
@@ -147,33 +236,57 @@ func (rm *RecordManager) AverageContainerStats(records []*models.Record) (stats
return stats return stats
} }
/* Round float to two decimals */ func (rm *RecordManager) DeleteOldRecords() {
func twoDecimals(value float64) float64 { // start := time.Now()
return math.Round(value*100) / 100 collections := []string{"system_stats", "container_stats"}
recordData := []RecordDeletionData{
{
recordType: "1m",
retention: time.Hour,
},
{
recordType: "10m",
retention: 12 * time.Hour,
},
{
recordType: "20m",
retention: 24 * time.Hour,
},
{
recordType: "120m",
retention: 7 * 24 * time.Hour,
},
{
recordType: "480m",
retention: 30 * 24 * time.Hour,
},
} }
/* Delete records of specified collections and type that are older than timeLimit */
func (rm *RecordManager) DeleteOldRecords(collections []string, recordType string, timeLimit time.Duration) {
timeLimitStamp := time.Now().UTC().Add(-timeLimit).Format("2006-01-02 15:04:05")
// db query
expType := dbx.NewExp("type = {:type}", dbx.Params{"type": recordType})
expCreated := dbx.NewExp("created < {:created}", dbx.Params{"created": timeLimitStamp})
var records []*models.Record
for _, collection := range collections {
if collectionRecords, err := rm.app.Dao().FindRecordsByExpr(collection, expType, expCreated); err == nil {
records = append(records, collectionRecords...)
}
}
rm.app.Dao().RunInTransaction(func(txDao *daos.Dao) error { rm.app.Dao().RunInTransaction(func(txDao *daos.Dao) error {
for _, record := range records { for _, recordData := range recordData {
err := txDao.DeleteRecord(record) exp := dbx.NewExp(
"type = {:type} AND created < {:created}",
dbx.Params{"type": recordData.recordType, "created": time.Now().UTC().Add(-recordData.retention)},
)
for _, collectionSlug := range collections {
collectionRecords, err := txDao.FindRecordsByExpr(collectionSlug, exp)
if err != nil { if err != nil {
return err return err
} }
for _, record := range collectionRecords {
err := txDao.DeleteRecord(record)
if err != nil {
rm.app.Logger().Error("Failed to delete records", "err", err.Error())
return err
}
}
}
} }
return nil return nil
}) })
// log.Println("finished deleting old records", "time (ms)", time.Since(start).Milliseconds())
}
/* Round float to two decimals */
func twoDecimals(value float64) float64 {
return math.Round(value*100) / 100
} }