diff --git a/beszel/internal/alerts/alerts.go b/beszel/internal/alerts/alerts.go index dbebc11..922492c 100644 --- a/beszel/internal/alerts/alerts.go +++ b/beszel/internal/alerts/alerts.go @@ -39,46 +39,31 @@ func NewAlertManager(app *pocketbase.PocketBase) *AlertManager { } } -func (am *AlertManager) HandleSystemAlerts(newStatus string, newRecord *models.Record, oldRecord *models.Record) { +func (am *AlertManager) HandleSystemInfoAlerts(systemRecord *models.Record, systemInfo system.Info) { alertRecords, err := am.app.Dao().FindRecordsByExpr("alerts", - dbx.NewExp("system = {:system}", dbx.Params{"system": oldRecord.GetId()}), + dbx.NewExp("system={:system}", dbx.Params{"system": systemRecord.GetId()}), ) if err != nil || len(alertRecords) == 0 { // log.Println("no alerts found for system") return } // log.Println("found alerts", len(alertRecords)) - var systemInfo *system.Info for _, alertRecord := range alertRecords { name := alertRecord.GetString("name") switch name { - case "Status": - am.handleStatusAlerts(newStatus, oldRecord, alertRecord) case "CPU", "Memory", "Disk": - if newStatus != "up" { - continue - } - if systemInfo == nil { - systemInfo = getSystemInfo(newRecord) - } if name == "CPU" { - am.handleSlidingValueAlert(newRecord, alertRecord, name, systemInfo.Cpu) + am.handleSlidingValueAlert(systemRecord, alertRecord, name, systemInfo.Cpu) } else if name == "Memory" { - am.handleSlidingValueAlert(newRecord, alertRecord, name, systemInfo.MemPct) + am.handleSlidingValueAlert(systemRecord, alertRecord, name, systemInfo.MemPct) } else if name == "Disk" { - am.handleSlidingValueAlert(newRecord, alertRecord, name, systemInfo.DiskPct) + am.handleSlidingValueAlert(systemRecord, alertRecord, name, systemInfo.DiskPct) } } } } -func getSystemInfo(record *models.Record) *system.Info { - var SystemInfo system.Info - record.UnmarshalJSONField("info", &SystemInfo) - return &SystemInfo -} - -func (am *AlertManager) handleSlidingValueAlert(newRecord *models.Record, alertRecord *models.Record, name string, curValue float64) { +func (am *AlertManager) handleSlidingValueAlert(systemRecord *models.Record, alertRecord *models.Record, name string, curValue float64) { triggered := alertRecord.GetBool("triggered") threshold := alertRecord.GetFloat("value") // fmt.Println(name, curValue, "threshold", threshold, "triggered", triggered) @@ -87,12 +72,12 @@ func (am *AlertManager) handleSlidingValueAlert(newRecord *models.Record, alertR var systemName string if !triggered && curValue > threshold { alertRecord.Set("triggered", true) - systemName = newRecord.GetString("name") + systemName = systemRecord.GetString("name") subject = fmt.Sprintf("%s usage above threshold on %s", name, systemName) body = fmt.Sprintf("%s usage on %s is %.1f%%.", name, systemName, curValue) } else if triggered && curValue <= threshold { alertRecord.Set("triggered", false) - systemName = newRecord.GetString("name") + systemName = systemRecord.GetString("name") subject = fmt.Sprintf("%s usage below threshold on %s", name, systemName) body = fmt.Sprintf("%s usage on %s is below threshold at %.1f%%.", name, systemName, curValue) } else { @@ -119,42 +104,55 @@ func (am *AlertManager) handleSlidingValueAlert(newRecord *models.Record, alertR } } -func (am *AlertManager) handleStatusAlerts(newStatus string, oldRecord *models.Record, alertRecord *models.Record) error { +func (am *AlertManager) HandleStatusAlerts(newStatus string, oldSystemRecord *models.Record) error { var alertStatus string switch newStatus { case "up": - if oldRecord.GetString("status") == "down" { + if oldSystemRecord.GetString("status") == "down" { alertStatus = "up" } case "down": - if oldRecord.GetString("status") == "up" { + if oldSystemRecord.GetString("status") == "up" { alertStatus = "down" } } if alertStatus == "" { return nil } - // expand the user relation - if errs := am.app.Dao().ExpandRecord(alertRecord, []string{"user"}, nil); len(errs) > 0 { - return fmt.Errorf("failed to expand: %v", errs) - } - user := alertRecord.ExpandedOne("user") - if user == nil { + // check if use + alertRecords, err := am.app.Dao().FindRecordsByExpr("alerts", + dbx.HashExp{ + "system": oldSystemRecord.GetId(), + "name": "Status", + }, + ) + if err != nil || len(alertRecords) == 0 { + // log.Println("no alerts found for system") return nil } - emoji := "\U0001F534" - if alertStatus == "up" { - emoji = "\u2705" + for _, alertRecord := range alertRecords { + // expand the user relation + if errs := am.app.Dao().ExpandRecord(alertRecord, []string{"user"}, nil); len(errs) > 0 { + return fmt.Errorf("failed to expand: %v", errs) + } + user := alertRecord.ExpandedOne("user") + if user == nil { + return nil + } + emoji := "\U0001F534" + if alertStatus == "up" { + emoji = "\u2705" + } + // send alert + systemName := oldSystemRecord.GetString("name") + am.sendAlert(AlertData{ + UserID: user.GetId(), + Title: fmt.Sprintf("Connection to %s is %s %v", systemName, alertStatus, emoji), + Message: fmt.Sprintf("Connection to %s is %s", systemName, alertStatus), + Link: am.app.Settings().Meta.AppUrl + "/system/" + url.QueryEscape(systemName), + LinkText: "View " + systemName, + }) } - // send alert - systemName := oldRecord.GetString("name") - am.sendAlert(AlertData{ - UserID: user.GetId(), - Title: fmt.Sprintf("Connection to %s is %s %v", systemName, alertStatus, emoji), - Message: fmt.Sprintf("Connection to %s is %s", systemName, alertStatus), - Link: am.app.Settings().Meta.AppUrl + "/system/" + url.QueryEscape(systemName), - LinkText: "View " + systemName, - }) return nil } diff --git a/beszel/internal/hub/hub.go b/beszel/internal/hub/hub.go index 267701c..19ca016 100644 --- a/beszel/internal/hub/hub.go +++ b/beszel/internal/hub/hub.go @@ -8,6 +8,7 @@ import ( "beszel/internal/records" "beszel/internal/users" "beszel/site" + "context" "crypto/ed25519" "encoding/pem" @@ -22,7 +23,6 @@ import ( "time" "github.com/goccy/go-json" - "github.com/labstack/echo/v5" "github.com/pocketbase/pocketbase" "github.com/pocketbase/pocketbase/apis" @@ -39,6 +39,9 @@ type Hub struct { systemConnections map[string]*ssh.Client sshClientConfig *ssh.ClientConfig pubKey string + am *alerts.AlertManager + um *users.UserManager + rm *records.RecordManager } func NewHub(app *pocketbase.PocketBase) *Hub { @@ -46,13 +49,16 @@ func NewHub(app *pocketbase.PocketBase) *Hub { app: app, connectionLock: &sync.Mutex{}, systemConnections: make(map[string]*ssh.Client), + am: alerts.NewAlertManager(app), + um: users.NewUserManager(app), + rm: records.NewRecordManager(app), } } func (h *Hub) Run() { - rm := records.NewRecordManager(h.app) - am := alerts.NewAlertManager(h.app) - um := users.NewUserManager(h.app) + // rm := records.NewRecordManager(h.app) + // am := alerts.NewAlertManager(h.app) + // um := users.NewUserManager(h.app) // loosely check if it was executed using "go run" isGoRun := strings.HasPrefix(os.Args[0], os.TempDir()) @@ -120,9 +126,9 @@ func (h *Hub) Run() { // set up cron jobs scheduler := cron.New() // delete old records once every hour - scheduler.MustAdd("delete old records", "8 * * * *", rm.DeleteOldRecords) + scheduler.MustAdd("delete old records", "8 * * * *", h.rm.DeleteOldRecords) // create longer records every 10 minutes - scheduler.MustAdd("create longer records", "*/10 * * * *", rm.CreateLongerRecords) + scheduler.MustAdd("create longer records", "*/10 * * * *", h.rm.CreateLongerRecords) scheduler.Start() return nil }) @@ -146,7 +152,7 @@ func (h *Hub) Run() { return c.JSON(http.StatusOK, map[string]bool{"firstRun": adminNum == 0}) }) // send test notification - e.Router.GET("/api/beszel/send-test-notification", am.SendTestNotification) + e.Router.GET("/api/beszel/send-test-notification", h.am.SendTestNotification) return nil }) @@ -165,8 +171,8 @@ func (h *Hub) Run() { }) // handle default values for user / user_settings creation - h.app.OnModelBeforeCreate("users").Add(um.InitializeUserRole) - h.app.OnModelBeforeCreate("user_settings").Add(um.InitializeUserSettings) + h.app.OnModelBeforeCreate("users").Add(h.um.InitializeUserRole) + h.app.OnModelBeforeCreate("user_settings").Add(h.um.InitializeUserSettings) // do things after a systems record is updated h.app.OnModelAfterUpdate("systems").Add(func(e *core.ModelEvent) error { @@ -182,10 +188,11 @@ func (h *Hub) Run() { // if system is set to pending (unpause), try to connect immediately if newStatus == "pending" { go h.updateSystem(newRecord) + } else { + h.am.HandleStatusAlerts(newStatus, oldRecord) + } - // alerts - am.HandleSystemAlerts(newStatus, newRecord, oldRecord) return nil }) @@ -261,7 +268,7 @@ func (h *Hub) updateSystem(record *models.Record) { } // get system stats from agent var systemData system.CombinedData - if err := requestJsonFromAgent(client, &systemData); err != nil { + if err := h.requestJsonFromAgent(client, &systemData); err != nil { if err.Error() == "bad client" { // if previous connection was closed, try again h.app.Logger().Error("Existing SSH connection closed. Retrying...", "host", record.GetString("host"), "port", record.GetString("port")) @@ -299,6 +306,8 @@ func (h *Hub) updateSystem(record *models.Record) { h.app.Logger().Error("Failed to save record: ", "err", err.Error()) } } + // system info alerts (todo: temp alerts, extra fs alerts) + h.am.HandleSystemInfoAlerts(record, systemData.Info) } // set system to specified status and save record @@ -354,7 +363,8 @@ func (h *Hub) createSSHClientConfig() error { return nil } -func requestJsonFromAgent(client *ssh.Client, systemData *system.CombinedData) error { +// Fetches system stats from the agent and decodes the json data into the provided struct +func (h *Hub) requestJsonFromAgent(client *ssh.Client, systemData *system.CombinedData) error { session, err := newSessionWithTimeout(client, 5*time.Second) if err != nil { return fmt.Errorf("bad client")