From 083da9598e783bd73fac802f2b7e6a542cf55314 Mon Sep 17 00:00:00 2001 From: Henry Dollman Date: Wed, 14 Aug 2024 11:28:43 -0400 Subject: [PATCH] refactor: hub --- beszel/internal/entities/server/server.go | 17 --- beszel/internal/hub/hub.go | 166 ++++++++++------------ 2 files changed, 78 insertions(+), 105 deletions(-) delete mode 100644 beszel/internal/entities/server/server.go diff --git a/beszel/internal/entities/server/server.go b/beszel/internal/entities/server/server.go deleted file mode 100644 index 282ce21..0000000 --- a/beszel/internal/entities/server/server.go +++ /dev/null @@ -1,17 +0,0 @@ -package server - -import "golang.org/x/crypto/ssh" - -type Server struct { - Host string - Port string - Status string - Client *ssh.Client -} - -func NewServer(host, port string) *Server { - return &Server{ - Host: host, - Port: port, - } -} diff --git a/beszel/internal/hub/hub.go b/beszel/internal/hub/hub.go index d47e51c..2a53c7e 100644 --- a/beszel/internal/hub/hub.go +++ b/beszel/internal/hub/hub.go @@ -2,13 +2,11 @@ package hub import ( "beszel/internal/alerts" - "beszel/internal/entities/server" "beszel/internal/entities/system" "beszel/internal/records" "beszel/site" "bytes" "crypto/ed25519" - "encoding/json" "encoding/pem" "errors" "fmt" @@ -21,6 +19,8 @@ import ( "sync" "time" + "github.com/goccy/go-json" + "github.com/labstack/echo/v5" "github.com/pocketbase/pocketbase" "github.com/pocketbase/pocketbase/apis" @@ -32,16 +32,17 @@ import ( ) type Hub struct { - app *pocketbase.PocketBase - serverConnectionsLock *sync.Mutex - serverConnections map[string]*server.Server + app *pocketbase.PocketBase + connectionLock *sync.Mutex + systemConnections map[string]*ssh.Client + sshClientConfig *ssh.ClientConfig } func NewHub(app *pocketbase.PocketBase) *Hub { return &Hub{ - app: app, - serverConnectionsLock: &sync.Mutex{}, - serverConnections: make(map[string]*server.Server), + app: app, + connectionLock: &sync.Mutex{}, + systemConnections: make(map[string]*ssh.Client), } } @@ -59,15 +60,17 @@ func (h *Hub) Run() { Dir: "../../migrations", }) - // set up record manager and alert manager + // initial setup h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error { + // set up record manager and alert manager rm = records.NewRecordManager(h.app) am = alerts.NewAlertManager(h.app) - return nil - }) - - // set auth settings - h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error { + // create ssh client config + err := h.createSSHClientConfig() + if err != nil { + log.Fatal(err) + } + // set auth settings usersCollection, err := h.app.Dao().FindCollectionByNameOrId("users") if err != nil { return err @@ -122,11 +125,9 @@ func (h *Hub) Run() { return nil }) - // ssh key setup + // custom api routes h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error { - // create ssh key if it doesn't exist - h.getSSHKey() - // api route to return public key + // returns public key e.Router.GET("/api/beszel/getkey", func(c echo.Context) error { requestData := apis.RequestInfo(c) if requestData.AuthRecord == nil { @@ -138,11 +139,6 @@ func (h *Hub) Run() { } return c.JSON(http.StatusOK, map[string]string{"key": strings.TrimSuffix(string(key), "\n")}) }) - return nil - }) - - // other api routes - h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error { // check if first time setup on login page e.Router.GET("/api/beszel/first-run", func(c echo.Context) error { adminNum, err := h.app.Dao().TotalAdmins() @@ -171,7 +167,7 @@ func (h *Hub) Run() { return nil }) - // immediately create connection for new servers + // immediately create connection for new systems h.app.OnModelAfterCreate("systems").Add(func(e *core.ModelEvent) error { go h.updateSystem(e.Model.(*models.Record)) return nil @@ -183,12 +179,12 @@ func (h *Hub) Run() { oldRecord := newRecord.OriginalCopy() newStatus := newRecord.GetString("status") - // if server is disconnected and connection exists, remove it + // if system is disconnected and connection exists, remove it if newStatus == "down" || newStatus == "paused" { - h.deleteServerConnection(newRecord) + h.deleteSystemConnection(newRecord) } - // if server is set to pending (unpause), try to connect immediately + // if system is set to pending (unpause), try to connect immediately if newStatus == "pending" { go h.updateSystem(newRecord) } @@ -200,8 +196,8 @@ func (h *Hub) Run() { // do things after a systems record is deleted h.app.OnModelAfterDelete("systems").Add(func(e *core.ModelEvent) error { - // if server connection exists, close it - h.deleteServerConnection(e.Model.(*models.Record)) + // if system connection exists, close it + h.deleteSystemConnection(e.Model.(*models.Record)) return nil }) @@ -258,38 +254,36 @@ func (h *Hub) updateSystems() { } func (h *Hub) updateSystem(record *models.Record) { - var s *server.Server - // check if server connection data exists - if _, ok := h.serverConnections[record.Id]; ok { - s = h.serverConnections[record.Id] + var client *ssh.Client + var err error + + // check if system connection data exists + if _, ok := h.systemConnections[record.Id]; ok { + client = h.systemConnections[record.Id] } else { - // create server connection struct - s = server.NewServer( - record.GetString("host"), - record.GetString("port")) - client, err := h.getServerConnection(s) + // create system connection + client, err = h.createSystemConnection(record) if err != nil { - h.app.Logger().Error("Failed to connect:", "err", err.Error(), "server", s.Host, "port", s.Port) - h.updateServerStatus(record, "down") + h.app.Logger().Error("Failed to connect:", "err", err.Error(), "system", record.GetString("host"), "port", record.GetString("port")) + h.updateSystemStatus(record, "down") return } - s.Client = client - h.serverConnectionsLock.Lock() - h.serverConnections[record.Id] = s - h.serverConnectionsLock.Unlock() + h.connectionLock.Lock() + h.systemConnections[record.Id] = client + h.connectionLock.Unlock() } - // get server stats from agent - systemData, err := requestJson(s) + // get system stats from agent + systemData, err := requestJson(client) if err != nil { if err.Error() == "retry" { // if previous connection was closed, try again - h.app.Logger().Error("Existing SSH connection closed. Retrying...", "host", s.Host, "port", s.Port) - h.deleteServerConnection(record) + h.app.Logger().Error("Existing SSH connection closed. Retrying...", "host", record.GetString("host"), "port", record.GetString("port")) + h.deleteSystemConnection(record) h.updateSystem(record) return } - h.app.Logger().Error("Failed to get server stats: ", "err", err.Error()) - h.updateServerStatus(record, "down") + h.app.Logger().Error("Failed to get system stats: ", "err", err.Error()) + h.updateSystemStatus(record, "down") return } // update system record @@ -300,33 +294,28 @@ func (h *Hub) updateSystem(record *models.Record) { } // add new system_stats record system_stats, _ := h.app.Dao().FindCollectionByNameOrId("system_stats") - system_stats_record := models.NewRecord(system_stats) - system_stats_record.Set("system", record.Id) - system_stats_record.Set("stats", systemData.Stats) - system_stats_record.Set("type", "1m") - if err := h.app.Dao().SaveRecord(system_stats_record); err != nil { + systemStatsRecord := models.NewRecord(system_stats) + systemStatsRecord.Set("system", record.Id) + systemStatsRecord.Set("stats", systemData.Stats) + systemStatsRecord.Set("type", "1m") + if err := h.app.Dao().SaveRecord(systemStatsRecord); err != nil { h.app.Logger().Error("Failed to save record: ", "err", err.Error()) } // add new container_stats record if len(systemData.Containers) > 0 { container_stats, _ := h.app.Dao().FindCollectionByNameOrId("container_stats") - container_stats_record := models.NewRecord(container_stats) - container_stats_record.Set("system", record.Id) - container_stats_record.Set("stats", systemData.Containers) - container_stats_record.Set("type", "1m") - if err := h.app.Dao().SaveRecord(container_stats_record); err != nil { + containerStatsRecord := models.NewRecord(container_stats) + containerStatsRecord.Set("system", record.Id) + containerStatsRecord.Set("stats", systemData.Containers) + containerStatsRecord.Set("type", "1m") + if err := h.app.Dao().SaveRecord(containerStatsRecord); err != nil { h.app.Logger().Error("Failed to save record: ", "err", err.Error()) } } } -// set server to status down and close connection -func (h *Hub) updateServerStatus(record *models.Record, status string) { - // if in map, close connection and remove from map - // this is now down automatically in an after update hook - // if status == "down" || status == "paused" { - // deleteServerConnection(record) - // } +// set system to status down and close connection +func (h *Hub) updateSystemStatus(record *models.Record, status string) { if record.GetString("status") != status { record.Set("status", status) if err := h.app.Dao().SaveRecord(record); err != nil { @@ -335,32 +324,39 @@ func (h *Hub) updateServerStatus(record *models.Record, status string) { } } -func (h *Hub) deleteServerConnection(record *models.Record) { - if _, ok := h.serverConnections[record.Id]; ok { - if h.serverConnections[record.Id].Client != nil { - h.serverConnections[record.Id].Client.Close() +func (h *Hub) deleteSystemConnection(record *models.Record) { + if _, ok := h.systemConnections[record.Id]; ok { + if h.systemConnections[record.Id] != nil { + h.systemConnections[record.Id].Close() } - h.serverConnectionsLock.Lock() - defer h.serverConnectionsLock.Unlock() - delete(h.serverConnections, record.Id) + h.connectionLock.Lock() + defer h.connectionLock.Unlock() + delete(h.systemConnections, record.Id) } } -func (h *Hub) getServerConnection(server *server.Server) (*ssh.Client, error) { - // h.app.Logger().Debug("new ssh connection", "server", server.Host) +func (h *Hub) createSystemConnection(record *models.Record) (*ssh.Client, error) { + client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%s", record.GetString("host"), record.GetString("port")), h.sshClientConfig) + if err != nil { + return nil, err + } + return client, nil +} + +func (h *Hub) createSSHClientConfig() error { key, err := h.getSSHKey() if err != nil { h.app.Logger().Error("Failed to get SSH key: ", "err", err.Error()) - return nil, err + return err } // Create the Signer for this private key. signer, err := ssh.ParsePrivateKey(key) if err != nil { - return nil, err + return err } - config := &ssh.ClientConfig{ + h.sshClientConfig = &ssh.ClientConfig{ User: "u", Auth: []ssh.AuthMethod{ ssh.PublicKeys(signer), @@ -368,17 +364,11 @@ func (h *Hub) getServerConnection(server *server.Server) (*ssh.Client, error) { HostKeyCallback: ssh.InsecureIgnoreHostKey(), Timeout: 5 * time.Second, } - - client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%s", server.Host, server.Port), config) - if err != nil { - return nil, err - } - - return client, nil + return nil } -func requestJson(server *server.Server) (system.SystemData, error) { - session, err := server.Client.NewSession() +func requestJson(client *ssh.Client) (system.SystemData, error) { + session, err := client.NewSession() if err != nil { return system.SystemData{}, errors.New("retry") }