refactor: hub

This commit is contained in:
Henry Dollman
2024-08-14 11:28:43 -04:00
parent f8d2161489
commit 083da9598e
2 changed files with 78 additions and 105 deletions

View File

@@ -1,17 +0,0 @@
package server
import "golang.org/x/crypto/ssh"
type Server struct {
Host string
Port string
Status string
Client *ssh.Client
}
func NewServer(host, port string) *Server {
return &Server{
Host: host,
Port: port,
}
}

View File

@@ -2,13 +2,11 @@ package hub
import ( import (
"beszel/internal/alerts" "beszel/internal/alerts"
"beszel/internal/entities/server"
"beszel/internal/entities/system" "beszel/internal/entities/system"
"beszel/internal/records" "beszel/internal/records"
"beszel/site" "beszel/site"
"bytes" "bytes"
"crypto/ed25519" "crypto/ed25519"
"encoding/json"
"encoding/pem" "encoding/pem"
"errors" "errors"
"fmt" "fmt"
@@ -21,6 +19,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/goccy/go-json"
"github.com/labstack/echo/v5" "github.com/labstack/echo/v5"
"github.com/pocketbase/pocketbase" "github.com/pocketbase/pocketbase"
"github.com/pocketbase/pocketbase/apis" "github.com/pocketbase/pocketbase/apis"
@@ -32,16 +32,17 @@ import (
) )
type Hub struct { type Hub struct {
app *pocketbase.PocketBase app *pocketbase.PocketBase
serverConnectionsLock *sync.Mutex connectionLock *sync.Mutex
serverConnections map[string]*server.Server systemConnections map[string]*ssh.Client
sshClientConfig *ssh.ClientConfig
} }
func NewHub(app *pocketbase.PocketBase) *Hub { func NewHub(app *pocketbase.PocketBase) *Hub {
return &Hub{ return &Hub{
app: app, app: app,
serverConnectionsLock: &sync.Mutex{}, connectionLock: &sync.Mutex{},
serverConnections: make(map[string]*server.Server), systemConnections: make(map[string]*ssh.Client),
} }
} }
@@ -59,15 +60,17 @@ func (h *Hub) Run() {
Dir: "../../migrations", Dir: "../../migrations",
}) })
// set up record manager and alert manager // initial setup
h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error { h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error {
// set up record manager and alert manager
rm = records.NewRecordManager(h.app) rm = records.NewRecordManager(h.app)
am = alerts.NewAlertManager(h.app) am = alerts.NewAlertManager(h.app)
return nil // create ssh client config
}) err := h.createSSHClientConfig()
if err != nil {
// set auth settings log.Fatal(err)
h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error { }
// set auth settings
usersCollection, err := h.app.Dao().FindCollectionByNameOrId("users") usersCollection, err := h.app.Dao().FindCollectionByNameOrId("users")
if err != nil { if err != nil {
return err return err
@@ -122,11 +125,9 @@ func (h *Hub) Run() {
return nil return nil
}) })
// ssh key setup // custom api routes
h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error { h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error {
// create ssh key if it doesn't exist // returns public key
h.getSSHKey()
// api route to return public key
e.Router.GET("/api/beszel/getkey", func(c echo.Context) error { e.Router.GET("/api/beszel/getkey", func(c echo.Context) error {
requestData := apis.RequestInfo(c) requestData := apis.RequestInfo(c)
if requestData.AuthRecord == nil { if requestData.AuthRecord == nil {
@@ -138,11 +139,6 @@ func (h *Hub) Run() {
} }
return c.JSON(http.StatusOK, map[string]string{"key": strings.TrimSuffix(string(key), "\n")}) return c.JSON(http.StatusOK, map[string]string{"key": strings.TrimSuffix(string(key), "\n")})
}) })
return nil
})
// other api routes
h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error {
// check if first time setup on login page // check if first time setup on login page
e.Router.GET("/api/beszel/first-run", func(c echo.Context) error { e.Router.GET("/api/beszel/first-run", func(c echo.Context) error {
adminNum, err := h.app.Dao().TotalAdmins() adminNum, err := h.app.Dao().TotalAdmins()
@@ -171,7 +167,7 @@ func (h *Hub) Run() {
return nil return nil
}) })
// immediately create connection for new servers // immediately create connection for new systems
h.app.OnModelAfterCreate("systems").Add(func(e *core.ModelEvent) error { h.app.OnModelAfterCreate("systems").Add(func(e *core.ModelEvent) error {
go h.updateSystem(e.Model.(*models.Record)) go h.updateSystem(e.Model.(*models.Record))
return nil return nil
@@ -183,12 +179,12 @@ func (h *Hub) Run() {
oldRecord := newRecord.OriginalCopy() oldRecord := newRecord.OriginalCopy()
newStatus := newRecord.GetString("status") newStatus := newRecord.GetString("status")
// if server is disconnected and connection exists, remove it // if system is disconnected and connection exists, remove it
if newStatus == "down" || newStatus == "paused" { if newStatus == "down" || newStatus == "paused" {
h.deleteServerConnection(newRecord) h.deleteSystemConnection(newRecord)
} }
// if server is set to pending (unpause), try to connect immediately // if system is set to pending (unpause), try to connect immediately
if newStatus == "pending" { if newStatus == "pending" {
go h.updateSystem(newRecord) go h.updateSystem(newRecord)
} }
@@ -200,8 +196,8 @@ func (h *Hub) Run() {
// do things after a systems record is deleted // do things after a systems record is deleted
h.app.OnModelAfterDelete("systems").Add(func(e *core.ModelEvent) error { h.app.OnModelAfterDelete("systems").Add(func(e *core.ModelEvent) error {
// if server connection exists, close it // if system connection exists, close it
h.deleteServerConnection(e.Model.(*models.Record)) h.deleteSystemConnection(e.Model.(*models.Record))
return nil return nil
}) })
@@ -258,38 +254,36 @@ func (h *Hub) updateSystems() {
} }
func (h *Hub) updateSystem(record *models.Record) { func (h *Hub) updateSystem(record *models.Record) {
var s *server.Server var client *ssh.Client
// check if server connection data exists var err error
if _, ok := h.serverConnections[record.Id]; ok {
s = h.serverConnections[record.Id] // check if system connection data exists
if _, ok := h.systemConnections[record.Id]; ok {
client = h.systemConnections[record.Id]
} else { } else {
// create server connection struct // create system connection
s = server.NewServer( client, err = h.createSystemConnection(record)
record.GetString("host"),
record.GetString("port"))
client, err := h.getServerConnection(s)
if err != nil { if err != nil {
h.app.Logger().Error("Failed to connect:", "err", err.Error(), "server", s.Host, "port", s.Port) h.app.Logger().Error("Failed to connect:", "err", err.Error(), "system", record.GetString("host"), "port", record.GetString("port"))
h.updateServerStatus(record, "down") h.updateSystemStatus(record, "down")
return return
} }
s.Client = client h.connectionLock.Lock()
h.serverConnectionsLock.Lock() h.systemConnections[record.Id] = client
h.serverConnections[record.Id] = s h.connectionLock.Unlock()
h.serverConnectionsLock.Unlock()
} }
// get server stats from agent // get system stats from agent
systemData, err := requestJson(s) systemData, err := requestJson(client)
if err != nil { if err != nil {
if err.Error() == "retry" { if err.Error() == "retry" {
// if previous connection was closed, try again // if previous connection was closed, try again
h.app.Logger().Error("Existing SSH connection closed. Retrying...", "host", s.Host, "port", s.Port) h.app.Logger().Error("Existing SSH connection closed. Retrying...", "host", record.GetString("host"), "port", record.GetString("port"))
h.deleteServerConnection(record) h.deleteSystemConnection(record)
h.updateSystem(record) h.updateSystem(record)
return return
} }
h.app.Logger().Error("Failed to get server stats: ", "err", err.Error()) h.app.Logger().Error("Failed to get system stats: ", "err", err.Error())
h.updateServerStatus(record, "down") h.updateSystemStatus(record, "down")
return return
} }
// update system record // update system record
@@ -300,33 +294,28 @@ func (h *Hub) updateSystem(record *models.Record) {
} }
// add new system_stats record // add new system_stats record
system_stats, _ := h.app.Dao().FindCollectionByNameOrId("system_stats") system_stats, _ := h.app.Dao().FindCollectionByNameOrId("system_stats")
system_stats_record := models.NewRecord(system_stats) systemStatsRecord := models.NewRecord(system_stats)
system_stats_record.Set("system", record.Id) systemStatsRecord.Set("system", record.Id)
system_stats_record.Set("stats", systemData.Stats) systemStatsRecord.Set("stats", systemData.Stats)
system_stats_record.Set("type", "1m") systemStatsRecord.Set("type", "1m")
if err := h.app.Dao().SaveRecord(system_stats_record); err != nil { if err := h.app.Dao().SaveRecord(systemStatsRecord); err != nil {
h.app.Logger().Error("Failed to save record: ", "err", err.Error()) h.app.Logger().Error("Failed to save record: ", "err", err.Error())
} }
// add new container_stats record // add new container_stats record
if len(systemData.Containers) > 0 { if len(systemData.Containers) > 0 {
container_stats, _ := h.app.Dao().FindCollectionByNameOrId("container_stats") container_stats, _ := h.app.Dao().FindCollectionByNameOrId("container_stats")
container_stats_record := models.NewRecord(container_stats) containerStatsRecord := models.NewRecord(container_stats)
container_stats_record.Set("system", record.Id) containerStatsRecord.Set("system", record.Id)
container_stats_record.Set("stats", systemData.Containers) containerStatsRecord.Set("stats", systemData.Containers)
container_stats_record.Set("type", "1m") containerStatsRecord.Set("type", "1m")
if err := h.app.Dao().SaveRecord(container_stats_record); err != nil { if err := h.app.Dao().SaveRecord(containerStatsRecord); err != nil {
h.app.Logger().Error("Failed to save record: ", "err", err.Error()) h.app.Logger().Error("Failed to save record: ", "err", err.Error())
} }
} }
} }
// set server to status down and close connection // set system to status down and close connection
func (h *Hub) updateServerStatus(record *models.Record, status string) { func (h *Hub) updateSystemStatus(record *models.Record, status string) {
// if in map, close connection and remove from map
// this is now down automatically in an after update hook
// if status == "down" || status == "paused" {
// deleteServerConnection(record)
// }
if record.GetString("status") != status { if record.GetString("status") != status {
record.Set("status", status) record.Set("status", status)
if err := h.app.Dao().SaveRecord(record); err != nil { if err := h.app.Dao().SaveRecord(record); err != nil {
@@ -335,32 +324,39 @@ func (h *Hub) updateServerStatus(record *models.Record, status string) {
} }
} }
func (h *Hub) deleteServerConnection(record *models.Record) { func (h *Hub) deleteSystemConnection(record *models.Record) {
if _, ok := h.serverConnections[record.Id]; ok { if _, ok := h.systemConnections[record.Id]; ok {
if h.serverConnections[record.Id].Client != nil { if h.systemConnections[record.Id] != nil {
h.serverConnections[record.Id].Client.Close() h.systemConnections[record.Id].Close()
} }
h.serverConnectionsLock.Lock() h.connectionLock.Lock()
defer h.serverConnectionsLock.Unlock() defer h.connectionLock.Unlock()
delete(h.serverConnections, record.Id) delete(h.systemConnections, record.Id)
} }
} }
func (h *Hub) getServerConnection(server *server.Server) (*ssh.Client, error) { func (h *Hub) createSystemConnection(record *models.Record) (*ssh.Client, error) {
// h.app.Logger().Debug("new ssh connection", "server", server.Host) client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%s", record.GetString("host"), record.GetString("port")), h.sshClientConfig)
if err != nil {
return nil, err
}
return client, nil
}
func (h *Hub) createSSHClientConfig() error {
key, err := h.getSSHKey() key, err := h.getSSHKey()
if err != nil { if err != nil {
h.app.Logger().Error("Failed to get SSH key: ", "err", err.Error()) h.app.Logger().Error("Failed to get SSH key: ", "err", err.Error())
return nil, err return err
} }
// Create the Signer for this private key. // Create the Signer for this private key.
signer, err := ssh.ParsePrivateKey(key) signer, err := ssh.ParsePrivateKey(key)
if err != nil { if err != nil {
return nil, err return err
} }
config := &ssh.ClientConfig{ h.sshClientConfig = &ssh.ClientConfig{
User: "u", User: "u",
Auth: []ssh.AuthMethod{ Auth: []ssh.AuthMethod{
ssh.PublicKeys(signer), ssh.PublicKeys(signer),
@@ -368,17 +364,11 @@ func (h *Hub) getServerConnection(server *server.Server) (*ssh.Client, error) {
HostKeyCallback: ssh.InsecureIgnoreHostKey(), HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: 5 * time.Second, Timeout: 5 * time.Second,
} }
return nil
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%s", server.Host, server.Port), config)
if err != nil {
return nil, err
}
return client, nil
} }
func requestJson(server *server.Server) (system.SystemData, error) { func requestJson(client *ssh.Client) (system.SystemData, error) {
session, err := server.Client.NewSession() session, err := client.NewSession()
if err != nil { if err != nil {
return system.SystemData{}, errors.New("retry") return system.SystemData{}, errors.New("retry")
} }