diff --git a/beszel/internal/alerts/alerts.go b/beszel/internal/alerts/alerts.go index 8f5eef9..6f34fd9 100644 --- a/beszel/internal/alerts/alerts.go +++ b/beszel/internal/alerts/alerts.go @@ -4,23 +4,28 @@ package alerts import ( "beszel/internal/entities/system" "fmt" + "log" "net/mail" "net/url" + "time" "github.com/containrrr/shoutrrr" + "github.com/goccy/go-json" "github.com/labstack/echo/v5" "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase" "github.com/pocketbase/pocketbase/apis" "github.com/pocketbase/pocketbase/models" "github.com/pocketbase/pocketbase/tools/mailer" + "github.com/pocketbase/pocketbase/tools/types" + "github.com/spf13/cast" ) type AlertManager struct { app *pocketbase.PocketBase } -type AlertData struct { +type AlertMessageData struct { UserID string Title string Message string @@ -33,6 +38,30 @@ type UserNotificationSettings struct { Webhooks []string `json:"webhooks"` } +type SystemAlertStats struct { + Cpu float64 `json:"cpu"` + Mem float64 `json:"mp"` + Disk float64 `json:"dp"` + NetSent float64 `json:"ns"` + NetRecv float64 `json:"nr"` + Temperatures map[string]float32 `json:"t"` +} + +type SystemAlertData struct { + systemRecord *models.Record + alertRecord *models.Record + name string + unit string + val float64 + threshold float64 + triggered bool + time time.Time + count uint8 + min uint8 + tempSums map[string]float32 + descriptor string // override descriptor in notification body (for temp sensor, disk partition, etc) +} + func NewAlertManager(app *pocketbase.PocketBase) *AlertManager { return &AlertManager{ app: app, @@ -40,6 +69,11 @@ func NewAlertManager(app *pocketbase.PocketBase) *AlertManager { } func (am *AlertManager) HandleSystemAlerts(systemRecord *models.Record, systemInfo system.Info, temperatures map[string]float64) { + // start := time.Now() + // defer func() { + // log.Println("alert stats took", time.Since(start)) + // }() + alertRecords, err := am.app.Dao().FindRecordsByExpr("alerts", dbx.NewExp("system={:system}", dbx.Params{"system": systemRecord.GetId()}), ) @@ -47,65 +81,224 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *models.Record, systemIn // log.Println("no alerts found for system") return } - // log.Println("found alerts", len(alertRecords)) + + var validAlerts []SystemAlertData + now := systemRecord.Updated.Time().UTC() + oldestTime := now + for _, alertRecord := range alertRecords { name := alertRecord.GetString("name") + var val float64 + unit := "%" + switch name { case "CPU": - am.handleSlidingValueAlert(systemRecord, alertRecord, name, "%", systemInfo.Cpu) + val = systemInfo.Cpu case "Memory": - am.handleSlidingValueAlert(systemRecord, alertRecord, name, "%", systemInfo.MemPct) + val = systemInfo.MemPct case "Disk": - am.handleSlidingValueAlert(systemRecord, alertRecord, name+" usage", "%", systemInfo.DiskPct) + val = systemInfo.DiskPct case "Bandwidth": - am.handleSlidingValueAlert(systemRecord, alertRecord, name, " MB/s", systemInfo.Bandwidth) + val = systemInfo.Bandwidth + unit = "MB/s" case "Temperature": if temperatures == nil { continue } - highTemp := 0.0 for _, temp := range temperatures { - if temp > highTemp { - highTemp = temp + if temp > val { + val = temp } } - am.handleSlidingValueAlert(systemRecord, alertRecord, name, "°C", highTemp) + unit = "°C" + } + + triggered := alertRecord.GetBool("triggered") + threshold := alertRecord.GetFloat("value") + + // CONTINUE + // IF alert is not triggered and curValue is less than threshold + // OR alert is triggered and curValue is greater than threshold + if (!triggered && val <= threshold) || (triggered && val > threshold) { + // log.Printf("Skipping alert %s: val %f | threshold %f | triggered %v\n", name, val, threshold, triggered) + continue + } + + min := max(1, cast.ToUint8(alertRecord.Get("min"))) + // add time to alert time to make sure it's slighty after record creation + time := now.Add(-time.Duration(min)*time.Minute + time.Second*5) + if time.Before(oldestTime) { + oldestTime = time + } + + validAlerts = append(validAlerts, SystemAlertData{ + systemRecord: systemRecord, + alertRecord: alertRecord, + name: name, + unit: unit, + val: val, + threshold: threshold, + triggered: triggered, + time: time, + min: min, + }) + } + + systemStats := []struct { + Stats []byte `db:"stats"` + Created types.DateTime `db:"created"` + }{} + + err = am.app.Dao().DB(). + Select("stats", "created"). + From("system_stats"). + Where(dbx.NewExp( + "system={:system} AND type='1m' AND created > {:created}", + dbx.Params{ + "system": systemRecord.Id, + // subtract some time to give us a bit of buffer + "created": oldestTime.Add(-time.Second * 90), + }, + )). + OrderBy("created"). + All(&systemStats) + + if err != nil { + return + } + + // get oldest record creation time from first record in the slice + oldestRecordTime := systemStats[0].Created.Time() + // log.Println("oldestRecordTime", oldestRecordTime.String()) + + // delete from validAlerts if time is older than oldestRecord + for i := 0; i < len(validAlerts); i++ { + if validAlerts[i].time.Before(oldestRecordTime) { + // log.Println("deleting alert - time is older than oldestRecord", validAlerts[i].name, oldestRecordTime, validAlerts[i].time) + validAlerts = append(validAlerts[:i], validAlerts[i+1:]...) + } + } + + if len(validAlerts) == 0 { + // log.Println("no valid alerts found") + return + } + + var stats SystemAlertStats + + // we can skip the latest systemStats record since it's the current value + for i := 0; i < len(systemStats); i++ { + stat := systemStats[i] + // log.Println("created", stat.Created.Time(), "now", time.Now().UTC()) + statTime := stat.Created.Time().Add(time.Second) + json.Unmarshal(stat.Stats, &stats) + // log.Println("stats", stats) + for j := range validAlerts { + alert := &validAlerts[j] + // reset alert val on first iteration + if i == 0 { + alert.val = 0 + } + // continue if stat is older than alert time range + if statTime.Before(alert.time) { + continue + } + // add to alert value + switch alert.name { + case "CPU": + alert.val += stats.Cpu + case "Memory": + alert.val += stats.Mem + case "Bandwidth": + alert.val += stats.NetSent + stats.NetRecv + case "Disk": + // todo: check all disks instead of just root + alert.val += stats.Disk + case "Temperature": + if alert.tempSums == nil { + alert.tempSums = make(map[string]float32, len(stats.Temperatures)) + } + for key, value := range stats.Temperatures { + if _, ok := alert.tempSums[key]; !ok { + alert.tempSums[key] = float32(0) + } + alert.tempSums[key] += value + } + default: + continue + } + alert.count++ + } + } + // sum up vals for each alert + for _, alert := range validAlerts { + switch alert.name { + case "Temperature": + maxTemp := float32(0) + for key, value := range alert.tempSums { + sumTemp := float32(value) / float32(alert.count) + if sumTemp > maxTemp { + maxTemp = sumTemp + alert.descriptor = fmt.Sprintf("Hottest sensor %s", key) + } + } + alert.val = float64(maxTemp) + default: + alert.val = alert.val / float64(alert.count) + } + minCount := float32(alert.min) / 1.2 + // log.Println("alert", alert.name, "val", alert.val, "threshold", alert.threshold, "triggered", alert.triggered) + // log.Printf("%s: val %f | count %d | min-count %f | threshold %f\n", alert.name, alert.val, alert.count, minCount, alert.threshold) + // pass through alert if count is greater than or equal to minCount + if float32(alert.count) >= minCount { + if !alert.triggered && alert.val > alert.threshold { + alert.triggered = true + am.sendSystemAlert(alert) + } else if alert.triggered && alert.val <= alert.threshold { + alert.triggered = false + am.sendSystemAlert(alert) + } } } } -func (am *AlertManager) handleSlidingValueAlert(systemRecord *models.Record, alertRecord *models.Record, name, unit string, curValue float64) { - triggered := alertRecord.GetBool("triggered") - threshold := alertRecord.GetFloat("value") - // fmt.Println(name, curValue, "threshold", threshold, "triggered", triggered) - var subject string - var body string - var systemName string - if !triggered && curValue > threshold { - alertRecord.Set("triggered", true) - systemName = systemRecord.GetString("name") - subject = fmt.Sprintf("%s above threshold on %s", name, systemName) - body = fmt.Sprintf("%s on %s is %v%s.", name, systemName, curValue, unit) - } else if triggered && curValue <= threshold { - alertRecord.Set("triggered", false) - systemName = systemRecord.GetString("name") - subject = fmt.Sprintf("%s below threshold on %s", name, systemName) - body = fmt.Sprintf("%s on %s is below threshold at %v%s.", name, systemName, curValue, unit) - } else { - // fmt.Println(name, "not triggered") - return +func (am *AlertManager) sendSystemAlert(alert SystemAlertData) { + log.Printf("Sending alert %s: val %f | count %d | threshold %f\n", alert.name, alert.val, alert.count, alert.threshold) + + systemName := alert.systemRecord.GetString("name") + + // change Disk to Disk usage + if alert.name == "Disk" { + alert.name = "Disk usage" } - if err := am.app.Dao().SaveRecord(alertRecord); err != nil { + + var subject string + if alert.triggered { + subject = fmt.Sprintf("%s above threshold on %s", alert.name, systemName) + } else { + subject = fmt.Sprintf("%s below threshold on %s", alert.name, systemName) + } + minutesLabel := "minute" + if alert.min > 1 { + minutesLabel += "s" + } + if alert.descriptor == "" { + alert.descriptor = alert.name + } + body := fmt.Sprintf("%s averaged %.2f%s for the previous %v %s.", alert.descriptor, alert.val, alert.unit, alert.min, minutesLabel) + + alert.alertRecord.Set("triggered", alert.triggered) + if err := am.app.Dao().SaveRecord(alert.alertRecord); err != nil { // app.Logger().Error("failed to save alert record", "err", err.Error()) return } // expand the user relation and send the alert - if errs := am.app.Dao().ExpandRecord(alertRecord, []string{"user"}, nil); len(errs) > 0 { + if errs := am.app.Dao().ExpandRecord(alert.alertRecord, []string{"user"}, nil); len(errs) > 0 { // app.Logger().Error("failed to expand user relation", "errs", errs) return } - if user := alertRecord.ExpandedOne("user"); user != nil { - am.sendAlert(AlertData{ + if user := alert.alertRecord.ExpandedOne("user"); user != nil { + am.sendAlert(AlertMessageData{ UserID: user.GetId(), Title: subject, Message: body, @@ -156,7 +349,7 @@ func (am *AlertManager) HandleStatusAlerts(newStatus string, oldSystemRecord *mo } // send alert systemName := oldSystemRecord.GetString("name") - am.sendAlert(AlertData{ + am.sendAlert(AlertMessageData{ UserID: user.GetId(), Title: fmt.Sprintf("Connection to %s is %s %v", systemName, alertStatus, emoji), Message: fmt.Sprintf("Connection to %s is %s", systemName, alertStatus), @@ -167,7 +360,7 @@ func (am *AlertManager) HandleStatusAlerts(newStatus string, oldSystemRecord *mo return nil } -func (am *AlertManager) sendAlert(data AlertData) { +func (am *AlertManager) sendAlert(data AlertMessageData) { // get user settings record, err := am.app.Dao().FindFirstRecordByFilter( "user_settings", "user={:user}",