diff --git a/beszel/internal/hub/hub.go b/beszel/internal/hub/hub.go index cabc0df..6537e96 100644 --- a/beszel/internal/hub/hub.go +++ b/beszel/internal/hub/hub.go @@ -42,6 +42,8 @@ type Hub struct { am *alerts.AlertManager um *users.UserManager rm *records.RecordManager + systemStats *models.Collection + containerStats *models.Collection } func NewHub(app *pocketbase.PocketBase) *Hub { @@ -125,7 +127,11 @@ func (h *Hub) Run() { // delete old records once every hour scheduler.MustAdd("delete old records", "8 * * * *", h.rm.DeleteOldRecords) // create longer records every 10 minutes - scheduler.MustAdd("create longer records", "*/10 * * * *", h.rm.CreateLongerRecords) + scheduler.MustAdd("create longer records", "*/10 * * * *", func() { + if systemStats, containerStats, err := h.getCollections(); err == nil { + h.rm.CreateLongerRecords([]*models.Collection{systemStats, containerStats}) + } + }) scheduler.Start() return nil }) @@ -286,37 +292,61 @@ func (h *Hub) updateSystem(record *models.Record) { return } // update system record + dao := h.app.Dao() record.Set("status", "up") record.Set("info", systemData.Info) - if err := h.app.Dao().SaveRecord(record); err != nil { + if err := dao.SaveRecord(record); err != nil { h.app.Logger().Error("Failed to update record: ", "err", err.Error()) } - // add new system_stats record - system_stats, _ := h.app.Dao().FindCollectionByNameOrId("system_stats") - systemStatsRecord := models.NewRecord(system_stats) - systemStatsRecord.Set("system", record.Id) - systemStatsRecord.Set("stats", systemData.Stats) - systemStatsRecord.Set("type", "1m") - if err := h.app.Dao().SaveRecord(systemStatsRecord); err != nil { - h.app.Logger().Error("Failed to save record: ", "err", err.Error()) - } - // add new container_stats record - if len(systemData.Containers) > 0 { - container_stats, _ := h.app.Dao().FindCollectionByNameOrId("container_stats") - containerStatsRecord := models.NewRecord(container_stats) - containerStatsRecord.Set("system", record.Id) - containerStatsRecord.Set("stats", systemData.Containers) - containerStatsRecord.Set("type", "1m") - if err := h.app.Dao().SaveRecord(containerStatsRecord); err != nil { + // add system_stats and container_stats records + if systemStats, containerStats, err := h.getCollections(); err != nil { + h.app.Logger().Error("Failed to get collections: ", "err", err.Error()) + } else { + // add new system_stats record + systemStatsRecord := models.NewRecord(systemStats) + systemStatsRecord.Set("system", record.Id) + systemStatsRecord.Set("stats", systemData.Stats) + systemStatsRecord.Set("type", "1m") + if err := dao.SaveRecord(systemStatsRecord); err != nil { h.app.Logger().Error("Failed to save record: ", "err", err.Error()) } + // add new container_stats record + if len(systemData.Containers) > 0 { + containerStatsRecord := models.NewRecord(containerStats) + containerStatsRecord.Set("system", record.Id) + containerStatsRecord.Set("stats", systemData.Containers) + containerStatsRecord.Set("type", "1m") + if err := dao.SaveRecord(containerStatsRecord); err != nil { + h.app.Logger().Error("Failed to save record: ", "err", err.Error()) + } + } } + // system info alerts (todo: extra fs alerts) if err := h.am.HandleSystemAlerts(record, systemData.Info, systemData.Stats.Temperatures, systemData.Stats.ExtraFs); err != nil { h.app.Logger().Error("System alerts error", "err", err.Error()) } } +// return system_stats and container_stats collections +func (h *Hub) getCollections() (*models.Collection, *models.Collection, error) { + if h.systemStats == nil { + systemStats, err := h.app.Dao().FindCollectionByNameOrId("system_stats") + if err != nil { + return nil, nil, err + } + h.systemStats = systemStats + } + if h.containerStats == nil { + containerStats, err := h.app.Dao().FindCollectionByNameOrId("container_stats") + if err != nil { + return nil, nil, err + } + h.containerStats = containerStats + } + return h.systemStats, h.containerStats, nil +} + // set system to specified status and save record func (h *Hub) updateSystemStatus(record *models.Record, status string) { if record.GetString("status") != status { diff --git a/beszel/internal/records/records.go b/beszel/internal/records/records.go index f097339..83b496f 100644 --- a/beszel/internal/records/records.go +++ b/beszel/internal/records/records.go @@ -32,7 +32,7 @@ type RecordDeletionData struct { retention time.Duration } -type RecordStats []*struct { +type RecordStats []struct { Stats []byte `db:"stats"` } @@ -41,9 +41,9 @@ func NewRecordManager(app *pocketbase.PocketBase) *RecordManager { } // Create longer records by averaging shorter records -func (rm *RecordManager) CreateLongerRecords() { +func (rm *RecordManager) CreateLongerRecords(collections []*models.Collection) { // start := time.Now() - recordData := []LongerRecordData{ + longerRecordData := []LongerRecordData{ { shorterType: "1m", // change to 9 from 10 to allow edge case timing or short pauses @@ -78,17 +78,11 @@ func (rm *RecordManager) CreateLongerRecords() { return err } - // need *models.Collection to create a new record with models.NewRecord - collections := map[string]*models.Collection{} - for _, collectionName := range []string{"system_stats", "container_stats"} { - collection, _ := txDao.FindCollectionByNameOrId(collectionName) - collections[collectionName] = collection - } - // loop through all active systems, time periods, and collections for _, system := range activeSystems { // log.Println("processing system", system.GetString("name")) - for _, recordData := range recordData { + for i := range longerRecordData { + recordData := longerRecordData[i] // log.Println("processing longer record type", recordData.longerType) // add one minute padding for longer records because they are created slightly later than the job start time longerRecordPeriod := time.Now().UTC().Add(recordData.longerTimeDuration + time.Minute) @@ -165,8 +159,8 @@ func (rm *RecordManager) AverageSystemStats(records RecordStats) system.Stats { tempCount := float64(0) var stats system.Stats - for _, record := range records { - json.Unmarshal(record.Stats, &stats) + for i := range records { + json.Unmarshal(records[i].Stats, &stats) sum.Cpu += stats.Cpu sum.Mem += stats.Mem sum.MemUsed += stats.MemUsed @@ -268,13 +262,14 @@ func (rm *RecordManager) AverageContainerStats(records RecordStats) []container. count := float64(len(records)) var containerStats []container.Stats - for _, record := range records { + for i := range records { // Reset the slice length to 0, but keep the capacity containerStats = containerStats[:0] - if err := json.Unmarshal(record.Stats, &containerStats); err != nil { + if err := json.Unmarshal(records[i].Stats, &containerStats); err != nil { return []container.Stats{} } - for _, stat := range containerStats { + for i := range containerStats { + stat := containerStats[i] if _, ok := sums[stat.Name]; !ok { sums[stat.Name] = &container.Stats{Name: stat.Name} }