mirror of
https://github.com/fankes/beszel.git
synced 2025-10-19 17:59:28 +08:00
refactor: restructure hub initialization and startup process
- Separated hub initialization logic into distinct methods - Move command specific things to cmd/hub - Add compatibility with new systems package
This commit is contained in:
@@ -1,10 +1,46 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"beszel"
|
||||||
"beszel/internal/hub"
|
"beszel/internal/hub"
|
||||||
_ "beszel/migrations"
|
_ "beszel/migrations"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/pocketbase/pocketbase"
|
||||||
|
"github.com/pocketbase/pocketbase/plugins/migratecmd"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
hub.NewHub().Run()
|
baseApp := getBaseApp()
|
||||||
|
h := hub.NewHub(baseApp)
|
||||||
|
h.BootstrapHub()
|
||||||
|
h.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
// getBaseApp creates a new PocketBase app with the default config
|
||||||
|
func getBaseApp() *pocketbase.PocketBase {
|
||||||
|
isDev := os.Getenv("ENV") == "dev"
|
||||||
|
|
||||||
|
baseApp := pocketbase.NewWithConfig(pocketbase.Config{
|
||||||
|
DefaultDataDir: beszel.AppName + "_data",
|
||||||
|
DefaultDev: isDev,
|
||||||
|
})
|
||||||
|
baseApp.RootCmd.Version = beszel.Version
|
||||||
|
baseApp.RootCmd.Use = beszel.AppName
|
||||||
|
baseApp.RootCmd.Short = ""
|
||||||
|
// add update command
|
||||||
|
baseApp.RootCmd.AddCommand(&cobra.Command{
|
||||||
|
Use: "update",
|
||||||
|
Short: "Update " + beszel.AppName + " to the latest version",
|
||||||
|
Run: hub.Update,
|
||||||
|
})
|
||||||
|
|
||||||
|
// enable auto creation of migration files when making collection changes in the Admin UI
|
||||||
|
migratecmd.MustRegister(baseApp, baseApp.RootCmd, migratecmd.Config{
|
||||||
|
Automigrate: isDev,
|
||||||
|
Dir: "../../migrations",
|
||||||
|
})
|
||||||
|
|
||||||
|
return baseApp
|
||||||
}
|
}
|
||||||
|
@@ -4,67 +4,47 @@ package hub
|
|||||||
import (
|
import (
|
||||||
"beszel"
|
"beszel"
|
||||||
"beszel/internal/alerts"
|
"beszel/internal/alerts"
|
||||||
"beszel/internal/entities/system"
|
"beszel/internal/hub/systems"
|
||||||
"beszel/internal/records"
|
"beszel/internal/records"
|
||||||
"beszel/internal/users"
|
"beszel/internal/users"
|
||||||
"beszel/site"
|
"beszel/site"
|
||||||
"context"
|
|
||||||
"crypto/ed25519"
|
"crypto/ed25519"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"log"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httputil"
|
"net/http/httputil"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/goccy/go-json"
|
|
||||||
"github.com/pocketbase/pocketbase"
|
"github.com/pocketbase/pocketbase"
|
||||||
"github.com/pocketbase/pocketbase/apis"
|
"github.com/pocketbase/pocketbase/apis"
|
||||||
"github.com/pocketbase/pocketbase/core"
|
"github.com/pocketbase/pocketbase/core"
|
||||||
"github.com/pocketbase/pocketbase/plugins/migratecmd"
|
|
||||||
"github.com/spf13/cobra"
|
|
||||||
"golang.org/x/crypto/ssh"
|
"golang.org/x/crypto/ssh"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Hub struct {
|
type Hub struct {
|
||||||
*pocketbase.PocketBase
|
core.App
|
||||||
sshClientConfig *ssh.ClientConfig
|
*alerts.AlertManager
|
||||||
pubKey string
|
|
||||||
am *alerts.AlertManager
|
|
||||||
um *users.UserManager
|
um *users.UserManager
|
||||||
rm *records.RecordManager
|
rm *records.RecordManager
|
||||||
systemStats *core.Collection
|
sm *systems.SystemManager
|
||||||
containerStats *core.Collection
|
pubKey string
|
||||||
appURL string
|
appURL string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHub creates a new Hub instance with default configuration
|
// NewHub creates a new Hub instance with default configuration
|
||||||
func NewHub() *Hub {
|
func NewHub(app core.App) *Hub {
|
||||||
var hub Hub
|
hub := &Hub{}
|
||||||
hub.PocketBase = pocketbase.NewWithConfig(pocketbase.Config{
|
hub.App = app
|
||||||
DefaultDataDir: beszel.AppName + "_data",
|
|
||||||
})
|
|
||||||
|
|
||||||
hub.RootCmd.Version = beszel.Version
|
hub.AlertManager = alerts.NewAlertManager(hub)
|
||||||
hub.RootCmd.Use = beszel.AppName
|
|
||||||
hub.RootCmd.Short = ""
|
|
||||||
// add update command
|
|
||||||
hub.RootCmd.AddCommand(&cobra.Command{
|
|
||||||
Use: "update",
|
|
||||||
Short: "Update " + beszel.AppName + " to the latest version",
|
|
||||||
Run: Update,
|
|
||||||
})
|
|
||||||
|
|
||||||
hub.am = alerts.NewAlertManager(hub)
|
|
||||||
hub.um = users.NewUserManager(hub)
|
hub.um = users.NewUserManager(hub)
|
||||||
hub.rm = records.NewRecordManager(hub)
|
hub.rm = records.NewRecordManager(hub)
|
||||||
|
hub.sm = systems.NewSystemManager(hub)
|
||||||
hub.appURL, _ = GetEnv("APP_URL")
|
hub.appURL, _ = GetEnv("APP_URL")
|
||||||
return &hub
|
return hub
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEnv retrieves an environment variable with a "BESZEL_HUB_" prefix, or falls back to the unprefixed key.
|
// GetEnv retrieves an environment variable with a "BESZEL_HUB_" prefix, or falls back to the unprefixed key.
|
||||||
@@ -76,23 +56,40 @@ func GetEnv(key string) (value string, exists bool) {
|
|||||||
return os.LookupEnv(key)
|
return os.LookupEnv(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) Run() {
|
func (h *Hub) BootstrapHub() (*Hub, error) {
|
||||||
isDev := os.Getenv("ENV") == "dev"
|
if !h.App.IsBootstrapped() {
|
||||||
|
err := h.App.Bootstrap()
|
||||||
// enable auto creation of migration files when making collection changes in the Admin UI
|
if err != nil {
|
||||||
migratecmd.MustRegister(h, h.RootCmd, migratecmd.Config{
|
return nil, err
|
||||||
// (the isDev check is to enable it only during development)
|
}
|
||||||
Automigrate: isDev,
|
}
|
||||||
Dir: "../../migrations",
|
|
||||||
})
|
|
||||||
|
|
||||||
// initial setup
|
// initial setup
|
||||||
h.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
if err := h.initialize(); err != nil {
|
||||||
// create ssh client config
|
return nil, err
|
||||||
err := h.createSSHClientConfig()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serve web ui
|
||||||
|
h.OnServe().BindFunc(h.startServer)
|
||||||
|
// set up scheduled jobs
|
||||||
|
h.OnServe().BindFunc(h.registerCronJobs)
|
||||||
|
// custom api routes
|
||||||
|
h.OnServe().BindFunc(h.registerApiRoutes)
|
||||||
|
// TODO: move to users package
|
||||||
|
// handle default values for user / user_settings creation
|
||||||
|
h.OnRecordCreate("users").BindFunc(h.um.InitializeUserRole)
|
||||||
|
h.OnRecordCreate("user_settings").BindFunc(h.um.InitializeUserSettings)
|
||||||
|
|
||||||
|
// sync systems with config
|
||||||
|
h.syncSystemsWithConfig()
|
||||||
|
// start system updates
|
||||||
|
h.sm.Initialize()
|
||||||
|
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize sets up initial configuration (collections, settings, etc.)
|
||||||
|
func (h *Hub) initialize() error {
|
||||||
// set general settings
|
// set general settings
|
||||||
settings := h.Settings()
|
settings := h.Settings()
|
||||||
// batch requests (for global alerts)
|
// batch requests (for global alerts)
|
||||||
@@ -124,14 +121,20 @@ func (h *Hub) Run() {
|
|||||||
if err := h.Save(usersCollection); err != nil {
|
if err := h.Save(usersCollection); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// sync systems with config
|
return nil
|
||||||
h.syncSystemsWithConfig()
|
}
|
||||||
return se.Next()
|
|
||||||
})
|
|
||||||
|
|
||||||
// serve web ui
|
// Start starts the hub application / server
|
||||||
h.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
func (h *Hub) Start() error {
|
||||||
switch isDev {
|
// Use type assertion to access the Start method
|
||||||
|
if pb, ok := h.App.(*pocketbase.PocketBase); ok {
|
||||||
|
return pb.Start()
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unable to start: App is not *pocketbase.PocketBase")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Hub) startServer(se *core.ServeEvent) error {
|
||||||
|
switch h.IsDev() {
|
||||||
case true:
|
case true:
|
||||||
proxy := httputil.NewSingleHostReverseProxy(&url.URL{
|
proxy := httputil.NewSingleHostReverseProxy(&url.URL{
|
||||||
Scheme: "http",
|
Scheme: "http",
|
||||||
@@ -173,27 +176,20 @@ func (h *Hub) Run() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
return se.Next()
|
return se.Next()
|
||||||
})
|
}
|
||||||
|
|
||||||
// set up scheduled jobs / ticker for system updates
|
// registerCronJobs sets up all scheduled tasks
|
||||||
h.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
func (h *Hub) registerCronJobs(se *core.ServeEvent) error {
|
||||||
// 15 second ticker for system updates
|
|
||||||
go h.startSystemUpdateTicker()
|
|
||||||
// set up cron jobs
|
|
||||||
// delete old records once every hour
|
// delete old records once every hour
|
||||||
h.Cron().MustAdd("delete old records", "8 * * * *", h.rm.DeleteOldRecords)
|
h.Cron().MustAdd("delete old records", "8 * * * *", h.rm.DeleteOldRecords)
|
||||||
// create longer records every 10 minutes
|
// create longer records every 10 minutes
|
||||||
h.Cron().MustAdd("create longer records", "*/10 * * * *", func() {
|
h.Cron().MustAdd("create longer records", "*/10 * * * *", h.rm.CreateLongerRecords)
|
||||||
if systemStats, containerStats, err := h.getCollections(); err == nil {
|
|
||||||
h.rm.CreateLongerRecords([]*core.Collection{systemStats, containerStats})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
return se.Next()
|
return se.Next()
|
||||||
})
|
}
|
||||||
|
|
||||||
// custom api routes
|
// custom api routes
|
||||||
h.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
func (h *Hub) registerApiRoutes(se *core.ServeEvent) error {
|
||||||
// returns public key
|
// returns public key and version
|
||||||
se.Router.GET("/api/beszel/getkey", func(e *core.RequestEvent) error {
|
se.Router.GET("/api/beszel/getkey", func(e *core.RequestEvent) error {
|
||||||
info, _ := e.RequestInfo()
|
info, _ := e.RequestInfo()
|
||||||
if info.Auth == nil {
|
if info.Auth == nil {
|
||||||
@@ -207,7 +203,7 @@ func (h *Hub) Run() {
|
|||||||
return e.JSON(http.StatusOK, map[string]bool{"firstRun": err == nil && total == 0})
|
return e.JSON(http.StatusOK, map[string]bool{"firstRun": err == nil && total == 0})
|
||||||
})
|
})
|
||||||
// send test notification
|
// send test notification
|
||||||
se.Router.GET("/api/beszel/send-test-notification", h.am.SendTestNotification)
|
se.Router.GET("/api/beszel/send-test-notification", h.SendTestNotification)
|
||||||
// API endpoint to get config.yml content
|
// API endpoint to get config.yml content
|
||||||
se.Router.GET("/api/beszel/config-yaml", h.getYamlConfig)
|
se.Router.GET("/api/beszel/config-yaml", h.getYamlConfig)
|
||||||
// create first user endpoint only needed if no users exist
|
// create first user endpoint only needed if no users exist
|
||||||
@@ -215,305 +211,10 @@ func (h *Hub) Run() {
|
|||||||
se.Router.POST("/api/beszel/create-user", h.um.CreateFirstUser)
|
se.Router.POST("/api/beszel/create-user", h.um.CreateFirstUser)
|
||||||
}
|
}
|
||||||
return se.Next()
|
return se.Next()
|
||||||
})
|
|
||||||
|
|
||||||
// system creation defaults
|
|
||||||
h.OnRecordCreate("systems").BindFunc(func(e *core.RecordEvent) error {
|
|
||||||
e.Record.Set("info", system.Info{})
|
|
||||||
e.Record.Set("status", "pending")
|
|
||||||
return e.Next()
|
|
||||||
})
|
|
||||||
|
|
||||||
// immediately create connection for new systems
|
|
||||||
h.OnRecordAfterCreateSuccess("systems").BindFunc(func(e *core.RecordEvent) error {
|
|
||||||
go h.updateSystem(e.Record)
|
|
||||||
return e.Next()
|
|
||||||
})
|
|
||||||
|
|
||||||
// handle default values for user / user_settings creation
|
|
||||||
h.OnRecordCreate("users").BindFunc(h.um.InitializeUserRole)
|
|
||||||
h.OnRecordCreate("user_settings").BindFunc(h.um.InitializeUserSettings)
|
|
||||||
|
|
||||||
// empty info for systems that are paused
|
|
||||||
h.OnRecordUpdate("systems").BindFunc(func(e *core.RecordEvent) error {
|
|
||||||
if e.Record.GetString("status") == "paused" {
|
|
||||||
e.Record.Set("info", system.Info{})
|
|
||||||
}
|
|
||||||
return e.Next()
|
|
||||||
})
|
|
||||||
|
|
||||||
// do things after a systems record is updated
|
|
||||||
h.OnRecordAfterUpdateSuccess("systems").BindFunc(func(e *core.RecordEvent) error {
|
|
||||||
newRecord := e.Record.Fresh()
|
|
||||||
oldRecord := newRecord.Original()
|
|
||||||
newStatus := newRecord.GetString("status")
|
|
||||||
|
|
||||||
// if system is not up and connection exists, remove it
|
|
||||||
if newStatus != "up" {
|
|
||||||
h.deleteSystemConnection(newRecord)
|
|
||||||
}
|
|
||||||
|
|
||||||
// if system is set to pending (unpause), try to connect immediately
|
|
||||||
if newStatus == "pending" {
|
|
||||||
go h.updateSystem(newRecord)
|
|
||||||
} else {
|
|
||||||
h.am.HandleStatusAlerts(newStatus, oldRecord)
|
|
||||||
}
|
|
||||||
return e.Next()
|
|
||||||
})
|
|
||||||
|
|
||||||
// if system is deleted, close connection
|
|
||||||
h.OnRecordAfterDeleteSuccess("systems").BindFunc(func(e *core.RecordEvent) error {
|
|
||||||
h.deleteSystemConnection(e.Record)
|
|
||||||
return e.Next()
|
|
||||||
})
|
|
||||||
|
|
||||||
if err := h.Start(); err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) startSystemUpdateTicker() {
|
// generates key pair if it doesn't exist and returns private key bytes
|
||||||
c := time.Tick(15 * time.Second)
|
func (h *Hub) GetSSHKey() ([]byte, error) {
|
||||||
for range c {
|
|
||||||
h.updateSystems()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Hub) updateSystems() {
|
|
||||||
records, err := h.FindRecordsByFilter(
|
|
||||||
"2hz5ncl8tizk5nx", // systems collection
|
|
||||||
"status != 'paused'", // filter
|
|
||||||
"updated", // sort
|
|
||||||
-1, // limit
|
|
||||||
0, // offset
|
|
||||||
)
|
|
||||||
// log.Println("records", len(records))
|
|
||||||
if err != nil || len(records) == 0 {
|
|
||||||
// h.Logger().Error("Failed to query systems")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fiftySecondsAgo := time.Now().UTC().Add(-50 * time.Second)
|
|
||||||
batchSize := len(records)/4 + 1
|
|
||||||
done := 0
|
|
||||||
for _, record := range records {
|
|
||||||
// break if batch size reached or if the system was updated less than 50 seconds ago
|
|
||||||
if done >= batchSize || record.GetDateTime("updated").Time().After(fiftySecondsAgo) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// don't increment for down systems to avoid them jamming the queue
|
|
||||||
// because they're always first when sorted by least recently updated
|
|
||||||
if record.GetString("status") != "down" {
|
|
||||||
done++
|
|
||||||
}
|
|
||||||
go h.updateSystem(record)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Hub) updateSystem(record *core.Record) {
|
|
||||||
var client *ssh.Client
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// check if system connection exists
|
|
||||||
if existingClient, ok := h.Store().GetOk(record.Id); ok {
|
|
||||||
client = existingClient.(*ssh.Client)
|
|
||||||
} else {
|
|
||||||
// create system connection
|
|
||||||
client, err = h.createSystemConnection(record)
|
|
||||||
if err != nil {
|
|
||||||
if record.GetString("status") != "down" {
|
|
||||||
h.Logger().Error("Failed to connect:", "err", err.Error(), "system", record.GetString("host"), "port", record.GetString("port"))
|
|
||||||
h.updateSystemStatus(record, "down")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Store().Set(record.Id, client)
|
|
||||||
}
|
|
||||||
// get system stats from agent
|
|
||||||
var systemData system.CombinedData
|
|
||||||
if err := h.requestJsonFromAgent(client, &systemData); err != nil {
|
|
||||||
if err.Error() == "bad client" {
|
|
||||||
// if previous connection was closed, try again
|
|
||||||
h.Logger().Error("Existing SSH connection closed. Retrying...", "host", record.GetString("host"), "port", record.GetString("port"))
|
|
||||||
h.deleteSystemConnection(record)
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
h.updateSystem(record)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger().Error("Failed to get system stats: ", "err", err.Error())
|
|
||||||
h.updateSystemStatus(record, "down")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// update system record
|
|
||||||
record.Set("status", "up")
|
|
||||||
record.Set("info", systemData.Info)
|
|
||||||
if err := h.SaveNoValidate(record); err != nil {
|
|
||||||
h.Logger().Error("Failed to update record: ", "err", err.Error())
|
|
||||||
}
|
|
||||||
// add system_stats and container_stats records
|
|
||||||
if systemStats, containerStats, err := h.getCollections(); err != nil {
|
|
||||||
h.Logger().Error("Failed to get collections: ", "err", err.Error())
|
|
||||||
} else {
|
|
||||||
// add new system_stats record
|
|
||||||
systemStatsRecord := core.NewRecord(systemStats)
|
|
||||||
systemStatsRecord.Set("system", record.Id)
|
|
||||||
systemStatsRecord.Set("stats", systemData.Stats)
|
|
||||||
systemStatsRecord.Set("type", "1m")
|
|
||||||
if err := h.SaveNoValidate(systemStatsRecord); err != nil {
|
|
||||||
h.Logger().Error("Failed to save record: ", "err", err.Error())
|
|
||||||
}
|
|
||||||
// add new container_stats record
|
|
||||||
if len(systemData.Containers) > 0 {
|
|
||||||
containerStatsRecord := core.NewRecord(containerStats)
|
|
||||||
containerStatsRecord.Set("system", record.Id)
|
|
||||||
containerStatsRecord.Set("stats", systemData.Containers)
|
|
||||||
containerStatsRecord.Set("type", "1m")
|
|
||||||
if err := h.SaveNoValidate(containerStatsRecord); err != nil {
|
|
||||||
h.Logger().Error("Failed to save record: ", "err", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// system info alerts
|
|
||||||
if err := h.am.HandleSystemAlerts(record, systemData.Info, systemData.Stats.Temperatures, systemData.Stats.ExtraFs); err != nil {
|
|
||||||
h.Logger().Error("System alerts error", "err", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// return system_stats and container_stats collections
|
|
||||||
func (h *Hub) getCollections() (*core.Collection, *core.Collection, error) {
|
|
||||||
if h.systemStats == nil {
|
|
||||||
systemStats, err := h.FindCollectionByNameOrId("system_stats")
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
h.systemStats = systemStats
|
|
||||||
}
|
|
||||||
if h.containerStats == nil {
|
|
||||||
containerStats, err := h.FindCollectionByNameOrId("container_stats")
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
h.containerStats = containerStats
|
|
||||||
}
|
|
||||||
return h.systemStats, h.containerStats, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// set system to specified status and save record
|
|
||||||
func (h *Hub) updateSystemStatus(record *core.Record, status string) {
|
|
||||||
if record.Fresh().GetString("status") != status {
|
|
||||||
record.Set("status", status)
|
|
||||||
if err := h.SaveNoValidate(record); err != nil {
|
|
||||||
h.Logger().Error("Failed to update record: ", "err", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete system connection from map and close connection
|
|
||||||
func (h *Hub) deleteSystemConnection(record *core.Record) {
|
|
||||||
if client, ok := h.Store().GetOk(record.Id); ok {
|
|
||||||
if sshClient := client.(*ssh.Client); sshClient != nil {
|
|
||||||
sshClient.Close()
|
|
||||||
}
|
|
||||||
h.Store().Remove(record.Id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Hub) createSystemConnection(record *core.Record) (*ssh.Client, error) {
|
|
||||||
network := "tcp"
|
|
||||||
host := record.GetString("host")
|
|
||||||
if strings.HasPrefix(host, "/") {
|
|
||||||
network = "unix"
|
|
||||||
} else {
|
|
||||||
host = net.JoinHostPort(host, record.GetString("port"))
|
|
||||||
}
|
|
||||||
client, err := ssh.Dial(network, host, h.sshClientConfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return client, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Hub) createSSHClientConfig() error {
|
|
||||||
key, err := h.getSSHKey()
|
|
||||||
if err != nil {
|
|
||||||
h.Logger().Error("Failed to get SSH key: ", "err", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the Signer for this private key.
|
|
||||||
signer, err := ssh.ParsePrivateKey(key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
h.sshClientConfig = &ssh.ClientConfig{
|
|
||||||
User: "u",
|
|
||||||
Auth: []ssh.AuthMethod{
|
|
||||||
ssh.PublicKeys(signer),
|
|
||||||
},
|
|
||||||
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
|
||||||
Timeout: 4 * time.Second,
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetches system stats from the agent and decodes the json data into the provided struct
|
|
||||||
func (h *Hub) requestJsonFromAgent(client *ssh.Client, systemData *system.CombinedData) error {
|
|
||||||
session, err := newSessionWithTimeout(client, 4*time.Second)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("bad client")
|
|
||||||
}
|
|
||||||
defer session.Close()
|
|
||||||
|
|
||||||
stdout, err := session.StdoutPipe()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := session.Shell(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.NewDecoder(stdout).Decode(systemData); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for the session to complete
|
|
||||||
if err := session.Wait(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Adds timeout to SSH session creation to avoid hanging in case of network issues
|
|
||||||
func newSessionWithTimeout(client *ssh.Client, timeout time.Duration) (*ssh.Session, error) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// use goroutine to create the session
|
|
||||||
sessionChan := make(chan *ssh.Session, 1)
|
|
||||||
errChan := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
if session, err := client.NewSession(); err != nil {
|
|
||||||
errChan <- err
|
|
||||||
} else {
|
|
||||||
sessionChan <- session
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case session := <-sessionChan:
|
|
||||||
return session, nil
|
|
||||||
case err := <-errChan:
|
|
||||||
return nil, err
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, fmt.Errorf("session creation timed out")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Hub) getSSHKey() ([]byte, error) {
|
|
||||||
dataDir := h.DataDir()
|
dataDir := h.DataDir()
|
||||||
// check if the key pair already exists
|
// check if the key pair already exists
|
||||||
existingKey, err := os.ReadFile(dataDir + "/id_ed25519")
|
existingKey, err := os.ReadFile(dataDir + "/id_ed25519")
|
||||||
|
Reference in New Issue
Block a user