diff --git a/beszel/internal/hub/hub.go b/beszel/internal/hub/hub.go index 8a4a4af..70d5298 100644 --- a/beszel/internal/hub/hub.go +++ b/beszel/internal/hub/hub.go @@ -107,20 +107,16 @@ func (h *Hub) Run() { return nil }) - // set up cron jobs / ticker for system updates + // set up scheduled jobs / ticker for system updates h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error { // 15 second ticker for system updates go h.startSystemUpdateTicker() - // cron job to delete old records + // set up cron jobs scheduler := cron.New() - scheduler.MustAdd("delete old records", "8 * * * *", func() { - collections := []string{"system_stats", "container_stats"} - rm.DeleteOldRecords(collections, "1m", time.Hour) - rm.DeleteOldRecords(collections, "10m", 12*time.Hour) - rm.DeleteOldRecords(collections, "20m", 24*time.Hour) - rm.DeleteOldRecords(collections, "120m", 7*24*time.Hour) - rm.DeleteOldRecords(collections, "480m", 30*24*time.Hour) - }) + // delete old records once every hour + scheduler.MustAdd("delete old records", "8 * * * *", rm.DeleteOldRecords) + // create longer records every 10 minutes + scheduler.MustAdd("create longer records", "*/10 * * * *", rm.CreateLongerRecords) scheduler.Start() return nil }) @@ -201,16 +197,6 @@ func (h *Hub) Run() { return nil }) - h.app.OnModelAfterCreate("system_stats").Add(func(e *core.ModelEvent) error { - rm.CreateLongerRecords("system_stats", e.Model.(*models.Record)) - return nil - }) - - h.app.OnModelAfterCreate("container_stats").Add(func(e *core.ModelEvent) error { - rm.CreateLongerRecords("container_stats", e.Model.(*models.Record)) - return nil - }) - if err := h.app.Start(); err != nil { log.Fatal(err) } diff --git a/beszel/internal/records/records.go b/beszel/internal/records/records.go index 58c595d..673c820 100644 --- a/beszel/internal/records/records.go +++ b/beszel/internal/records/records.go @@ -4,9 +4,8 @@ package records import ( "beszel/internal/entities/container" "beszel/internal/entities/system" - "fmt" + "log" "math" - "reflect" "time" "github.com/pocketbase/dbx" @@ -19,113 +18,202 @@ type RecordManager struct { app *pocketbase.PocketBase } +type LongerRecordData struct { + shorterType string + longerType string + longerTimeDuration time.Duration + expectedShorterRecords int +} + +type RecordDeletionData struct { + recordType string + retention time.Duration +} + func NewRecordManager(app *pocketbase.PocketBase) *RecordManager { - return &RecordManager{app: app} + return &RecordManager{app} } -func (rm *RecordManager) CreateLongerRecords(collectionName string, shorterRecord *models.Record) { - shorterRecordType := shorterRecord.GetString("type") - systemId := shorterRecord.GetString("system") - // fmt.Println("create longer records", "recordType", shorterRecordType, "systemId", systemId) - var longerRecordType string - var timeAgo time.Duration - var expectedShorterRecords int - switch shorterRecordType { - case "1m": - longerRecordType = "10m" - timeAgo = -10 * time.Minute - expectedShorterRecords = 10 - case "10m": - longerRecordType = "20m" - timeAgo = -20 * time.Minute - expectedShorterRecords = 2 - case "20m": - longerRecordType = "120m" - timeAgo = -120 * time.Minute - expectedShorterRecords = 6 - default: - longerRecordType = "480m" - timeAgo = -480 * time.Minute - expectedShorterRecords = 4 +// Create longer records by averaging shorter records +func (rm *RecordManager) CreateLongerRecords() { + // start := time.Now() + recordData := []LongerRecordData{ + { + shorterType: "1m", + expectedShorterRecords: 10, + longerType: "10m", + longerTimeDuration: -10 * time.Minute, + }, + { + shorterType: "10m", + expectedShorterRecords: 2, + longerType: "20m", + longerTimeDuration: -20 * time.Minute, + }, + { + shorterType: "20m", + expectedShorterRecords: 6, + longerType: "120m", + longerTimeDuration: -120 * time.Minute, + }, + { + shorterType: "120m", + expectedShorterRecords: 4, + longerType: "480m", + longerTimeDuration: -480 * time.Minute, + }, } - - longerRecordPeriod := time.Now().UTC().Add(timeAgo + 10*time.Second).Format("2006-01-02 15:04:05") - // check creation time of last 10m record - lastLongerRecord, err := rm.app.Dao().FindFirstRecordByFilter( - collectionName, - "type = {:type} && system = {:system} && created > {:created}", - dbx.Params{"type": longerRecordType, "system": systemId, "created": longerRecordPeriod}, - ) - // return if longer record exists - if err == nil || lastLongerRecord != nil { - // log.Println("longer record found. returning") - return - } - // get shorter records from the past x minutes - // shorterRecordPeriod := time.Now().UTC().Add(timeAgo + time.Second).Format("2006-01-02 15:04:05") - allShorterRecords, err := rm.app.Dao().FindRecordsByFilter( - collectionName, - "type = {:type} && system = {:system} && created > {:created}", - "-created", - -1, - 0, - dbx.Params{"type": shorterRecordType, "system": systemId, "created": longerRecordPeriod}, - ) - // return if not enough shorter records - if err != nil || len(allShorterRecords) < expectedShorterRecords { - // log.Println("not enough shorter records. returning") - return - } - // average the shorter records and create longer record - var stats interface{} - switch collectionName { - case "system_stats": - stats = rm.AverageSystemStats(allShorterRecords) - case "container_stats": - stats = rm.AverageContainerStats(allShorterRecords) - } - collection, _ := rm.app.Dao().FindCollectionByNameOrId(collectionName) - longerRecord := models.NewRecord(collection) - longerRecord.Set("system", systemId) - longerRecord.Set("stats", stats) - longerRecord.Set("type", longerRecordType) - if err := rm.app.Dao().SaveRecord(longerRecord); err != nil { - fmt.Println("failed to save longer record", "err", err.Error()) - } - -} - -// calculate the average stats of a list of system_stats records -func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Stats { - count := float64(len(records)) - sum := reflect.New(reflect.TypeOf(system.Stats{})).Elem() - - for _, record := range records { - var stats system.Stats - record.UnmarshalJSONField("stats", &stats) - statValue := reflect.ValueOf(stats) - for i := 0; i < statValue.NumField(); i++ { - field := sum.Field(i) - field.SetFloat(field.Float() + statValue.Field(i).Float()) + // wrap the operations in a transaction + rm.app.Dao().RunInTransaction(func(txDao *daos.Dao) error { + activeSystems, err := txDao.FindRecordsByExpr("systems", dbx.NewExp("status = 'up'")) + if err != nil { + log.Println("failed to get active systems", "err", err.Error()) + return err } - } - average := reflect.New(reflect.TypeOf(system.Stats{})).Elem() - for i := 0; i < sum.NumField(); i++ { - average.Field(i).SetFloat(twoDecimals(sum.Field(i).Float() / count)) - } + collections := map[string]*models.Collection{} + for _, collectionName := range []string{"system_stats", "container_stats"} { + collection, _ := txDao.FindCollectionByNameOrId(collectionName) + collections[collectionName] = collection + } - return average.Interface().(system.Stats) + // loop through all active systems, time periods, and collections + for _, system := range activeSystems { + // log.Println("processing system", system.GetString("name")) + for _, recordData := range recordData { + // log.Println("processing longer record type", recordData.longerType) + // add one minute padding for longer records because they are created slightly later than the job start time + longerRecordPeriod := time.Now().UTC().Add(recordData.longerTimeDuration + time.Minute) + // shorter records are created independently of longer records, so we shouldn't need to add padding + shorterRecordPeriod := time.Now().UTC().Add(recordData.longerTimeDuration) + // loop through both collections + for _, collection := range collections { + // check creation time of last longer record if not 10m, since 10m is created every run + if recordData.longerType != "10m" { + lastLongerRecord, err := txDao.FindFirstRecordByFilter( + collection.Id, + "type = {:type} && system = {:system} && created > {:created}", + dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod}, + ) + // continue if longer record exists + if err == nil || lastLongerRecord != nil { + // log.Println("longer record found. continuing") + continue + } + } + // get shorter records from the past x minutes + allShorterRecords, err := txDao.FindRecordsByExpr( + collection.Id, + dbx.NewExp( + "type = {:type} AND system = {:system} AND created > {:created}", + dbx.Params{"type": recordData.shorterType, "system": system.Id, "created": shorterRecordPeriod}, + ), + ) + + // continue if not enough shorter records + if err != nil || len(allShorterRecords) < recordData.expectedShorterRecords { + // log.Println("not enough shorter records. continue.", len(allShorterRecords), recordData.expectedShorterRecords) + continue + } + // average the shorter records and create longer record + var stats interface{} + switch collection.Name { + case "system_stats": + stats = rm.AverageSystemStats(allShorterRecords) + case "container_stats": + stats = rm.AverageContainerStats(allShorterRecords) + } + longerRecord := models.NewRecord(collection) + longerRecord.Set("system", system.Id) + longerRecord.Set("stats", stats) + longerRecord.Set("type", recordData.longerType) + if err := txDao.SaveRecord(longerRecord); err != nil { + log.Println("failed to save longer record", "err", err.Error()) + } + } + } + } + + return nil + }) + + // log.Println("finished creating longer records", "time (ms)", time.Since(start).Milliseconds()) } -// calculate the average stats of a list of container_stats records +// Calculate the average stats of a list of system_stats records with reflect +// func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Stats { +// count := float64(len(records)) +// sum := reflect.New(reflect.TypeOf(system.Stats{})).Elem() + +// var stats system.Stats +// for _, record := range records { +// record.UnmarshalJSONField("stats", &stats) +// statValue := reflect.ValueOf(stats) +// for i := 0; i < statValue.NumField(); i++ { +// field := sum.Field(i) +// field.SetFloat(field.Float() + statValue.Field(i).Float()) +// } +// } + +// average := reflect.New(reflect.TypeOf(system.Stats{})).Elem() +// for i := 0; i < sum.NumField(); i++ { +// average.Field(i).SetFloat(twoDecimals(sum.Field(i).Float() / count)) +// } + +// return average.Interface().(system.Stats) +// } + +// Calculate the average stats of a list of system_stats records without reflect +func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Stats { + var sum system.Stats + count := float64(len(records)) + + var stats system.Stats + for _, record := range records { + record.UnmarshalJSONField("stats", &stats) + sum.Cpu += stats.Cpu + sum.Mem += stats.Mem + sum.MemUsed += stats.MemUsed + sum.MemPct += stats.MemPct + sum.MemBuffCache += stats.MemBuffCache + sum.Swap += stats.Swap + sum.SwapUsed += stats.SwapUsed + sum.Disk += stats.Disk + sum.DiskUsed += stats.DiskUsed + sum.DiskPct += stats.DiskPct + sum.DiskRead += stats.DiskRead + sum.DiskWrite += stats.DiskWrite + sum.NetworkSent += stats.NetworkSent + sum.NetworkRecv += stats.NetworkRecv + } + + return system.Stats{ + Cpu: twoDecimals(sum.Cpu / count), + Mem: twoDecimals(sum.Mem / count), + MemUsed: twoDecimals(sum.MemUsed / count), + MemPct: twoDecimals(sum.MemPct / count), + MemBuffCache: twoDecimals(sum.MemBuffCache / count), + Swap: twoDecimals(sum.Swap / count), + SwapUsed: twoDecimals(sum.SwapUsed / count), + Disk: twoDecimals(sum.Disk / count), + DiskUsed: twoDecimals(sum.DiskUsed / count), + DiskPct: twoDecimals(sum.DiskPct / count), + DiskRead: twoDecimals(sum.DiskRead / count), + DiskWrite: twoDecimals(sum.DiskWrite / count), + NetworkSent: twoDecimals(sum.NetworkSent / count), + NetworkRecv: twoDecimals(sum.NetworkRecv / count), + } +} + +// Calculate the average stats of a list of container_stats records func (rm *RecordManager) AverageContainerStats(records []*models.Record) (stats []container.Stats) { sums := make(map[string]*container.Stats) count := float64(len(records)) + + var containerStats []container.Stats for _, record := range records { - var stats []container.Stats - record.UnmarshalJSONField("stats", &stats) - for _, stat := range stats { + record.UnmarshalJSONField("stats", &containerStats) + for _, stat := range containerStats { if _, ok := sums[stat.Name]; !ok { sums[stat.Name] = &container.Stats{Name: stat.Name, Cpu: 0, Mem: 0} } @@ -135,6 +223,7 @@ func (rm *RecordManager) AverageContainerStats(records []*models.Record) (stats sums[stat.Name].NetworkRecv += stat.NetworkRecv } } + for _, value := range sums { stats = append(stats, container.Stats{ Name: value.Name, @@ -147,33 +236,57 @@ func (rm *RecordManager) AverageContainerStats(records []*models.Record) (stats return stats } -/* Round float to two decimals */ -func twoDecimals(value float64) float64 { - return math.Round(value*100) / 100 -} - -/* Delete records of specified collections and type that are older than timeLimit */ -func (rm *RecordManager) DeleteOldRecords(collections []string, recordType string, timeLimit time.Duration) { - timeLimitStamp := time.Now().UTC().Add(-timeLimit).Format("2006-01-02 15:04:05") - - // db query - expType := dbx.NewExp("type = {:type}", dbx.Params{"type": recordType}) - expCreated := dbx.NewExp("created < {:created}", dbx.Params{"created": timeLimitStamp}) - - var records []*models.Record - for _, collection := range collections { - if collectionRecords, err := rm.app.Dao().FindRecordsByExpr(collection, expType, expCreated); err == nil { - records = append(records, collectionRecords...) - } +func (rm *RecordManager) DeleteOldRecords() { + // start := time.Now() + collections := []string{"system_stats", "container_stats"} + recordData := []RecordDeletionData{ + { + recordType: "1m", + retention: time.Hour, + }, + { + recordType: "10m", + retention: 12 * time.Hour, + }, + { + recordType: "20m", + retention: 24 * time.Hour, + }, + { + recordType: "120m", + retention: 7 * 24 * time.Hour, + }, + { + recordType: "480m", + retention: 30 * 24 * time.Hour, + }, } - rm.app.Dao().RunInTransaction(func(txDao *daos.Dao) error { - for _, record := range records { - err := txDao.DeleteRecord(record) - if err != nil { - return err + for _, recordData := range recordData { + exp := dbx.NewExp( + "type = {:type} AND created < {:created}", + dbx.Params{"type": recordData.recordType, "created": time.Now().UTC().Add(-recordData.retention)}, + ) + for _, collectionSlug := range collections { + collectionRecords, err := txDao.FindRecordsByExpr(collectionSlug, exp) + if err != nil { + return err + } + for _, record := range collectionRecords { + err := txDao.DeleteRecord(record) + if err != nil { + rm.app.Logger().Error("Failed to delete records", "err", err.Error()) + return err + } + } } } return nil }) + // log.Println("finished deleting old records", "time (ms)", time.Since(start).Milliseconds()) +} + +/* Round float to two decimals */ +func twoDecimals(value float64) float64 { + return math.Round(value*100) / 100 }