rename /src to /internal (sorry i'll fix the prs)

This commit is contained in:
henrygd
2025-09-09 13:29:07 -04:00
parent 86ea23fe39
commit 8a13b05c20
177 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,390 @@
package systems
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net"
"strings"
"time"
"github.com/henrygd/beszel/src/hub/ws"
"github.com/henrygd/beszel/src/entities/system"
"github.com/henrygd/beszel"
"github.com/blang/semver"
"github.com/fxamacker/cbor/v2"
"github.com/pocketbase/pocketbase/core"
"golang.org/x/crypto/ssh"
)
type System struct {
Id string `db:"id"`
Host string `db:"host"`
Port string `db:"port"`
Status string `db:"status"`
manager *SystemManager // Manager that this system belongs to
client *ssh.Client // SSH client for fetching data
data *system.CombinedData // system data from agent
ctx context.Context // Context for stopping the updater
cancel context.CancelFunc // Stops and removes system from updater
WsConn *ws.WsConn // Handler for agent WebSocket connection
agentVersion semver.Version // Agent version
updateTicker *time.Ticker // Ticker for updating the system
}
func (sm *SystemManager) NewSystem(systemId string) *System {
system := &System{
Id: systemId,
data: &system.CombinedData{},
}
system.ctx, system.cancel = system.getContext()
return system
}
// StartUpdater starts the system updater.
// It first fetches the data from the agent then updates the records.
// If the data is not found or the system is down, it sets the system down.
func (sys *System) StartUpdater() {
// Channel that can be used to set the system down. Currently only used to
// allow a short delay for reconnection after websocket connection is closed.
var downChan chan struct{}
// Add random jitter to first WebSocket connection to prevent
// clustering if all agents are started at the same time.
// SSH connections during hub startup are already staggered.
var jitter <-chan time.Time
if sys.WsConn != nil {
jitter = getJitter()
// use the websocket connection's down channel to set the system down
downChan = sys.WsConn.DownChan
} else {
// if the system does not have a websocket connection, wait before updating
// to allow the agent to connect via websocket (makes sure fingerprint is set).
time.Sleep(11 * time.Second)
}
// update immediately if system is not paused (only for ws connections)
// we'll wait a minute before connecting via SSH to prioritize ws connections
if sys.Status != paused && sys.ctx.Err() == nil {
if err := sys.update(); err != nil {
_ = sys.setDown(err)
}
}
sys.updateTicker = time.NewTicker(time.Duration(interval) * time.Millisecond)
// Go 1.23+ will automatically stop the ticker when the system is garbage collected, however we seem to need this or testing/synctest will block even if calling runtime.GC()
defer sys.updateTicker.Stop()
for {
select {
case <-sys.ctx.Done():
return
case <-sys.updateTicker.C:
if err := sys.update(); err != nil {
_ = sys.setDown(err)
}
case <-downChan:
sys.WsConn = nil
downChan = nil
_ = sys.setDown(nil)
case <-jitter:
sys.updateTicker.Reset(time.Duration(interval) * time.Millisecond)
if err := sys.update(); err != nil {
_ = sys.setDown(err)
}
}
}
}
// update updates the system data and records.
func (sys *System) update() error {
if sys.Status == paused {
sys.handlePaused()
return nil
}
data, err := sys.fetchDataFromAgent()
if err == nil {
_, err = sys.createRecords(data)
}
return err
}
func (sys *System) handlePaused() {
if sys.WsConn == nil {
// if the system is paused and there's no websocket connection, remove the system
_ = sys.manager.RemoveSystem(sys.Id)
} else {
// Send a ping to the agent to keep the connection alive if the system is paused
if err := sys.WsConn.Ping(); err != nil {
sys.manager.hub.Logger().Warn("Failed to ping agent", "system", sys.Id, "err", err)
_ = sys.manager.RemoveSystem(sys.Id)
}
}
}
// createRecords updates the system record and adds system_stats and container_stats records
func (sys *System) createRecords(data *system.CombinedData) (*core.Record, error) {
systemRecord, err := sys.getRecord()
if err != nil {
return nil, err
}
hub := sys.manager.hub
// add system_stats and container_stats records
systemStatsCollection, err := hub.FindCachedCollectionByNameOrId("system_stats")
if err != nil {
return nil, err
}
systemStatsRecord := core.NewRecord(systemStatsCollection)
systemStatsRecord.Set("system", systemRecord.Id)
systemStatsRecord.Set("stats", data.Stats)
systemStatsRecord.Set("type", "1m")
if err := hub.SaveNoValidate(systemStatsRecord); err != nil {
return nil, err
}
// add new container_stats record
if len(data.Containers) > 0 {
containerStatsCollection, err := hub.FindCachedCollectionByNameOrId("container_stats")
if err != nil {
return nil, err
}
containerStatsRecord := core.NewRecord(containerStatsCollection)
containerStatsRecord.Set("system", systemRecord.Id)
containerStatsRecord.Set("stats", data.Containers)
containerStatsRecord.Set("type", "1m")
if err := hub.SaveNoValidate(containerStatsRecord); err != nil {
return nil, err
}
}
// update system record (do this last because it triggers alerts and we need above records to be inserted first)
systemRecord.Set("status", up)
systemRecord.Set("info", data.Info)
if err := hub.SaveNoValidate(systemRecord); err != nil {
return nil, err
}
return systemRecord, nil
}
// getRecord retrieves the system record from the database.
// If the record is not found, it removes the system from the manager.
func (sys *System) getRecord() (*core.Record, error) {
record, err := sys.manager.hub.FindRecordById("systems", sys.Id)
if err != nil || record == nil {
_ = sys.manager.RemoveSystem(sys.Id)
return nil, err
}
return record, nil
}
// setDown marks a system as down in the database.
// It takes the original error that caused the system to go down and returns any error
// encountered during the process of updating the system status.
func (sys *System) setDown(originalError error) error {
if sys.Status == down || sys.Status == paused {
return nil
}
record, err := sys.getRecord()
if err != nil {
return err
}
if originalError != nil {
sys.manager.hub.Logger().Error("System down", "system", record.GetString("name"), "err", originalError)
}
record.Set("status", down)
return sys.manager.hub.SaveNoValidate(record)
}
func (sys *System) getContext() (context.Context, context.CancelFunc) {
if sys.ctx == nil {
sys.ctx, sys.cancel = context.WithCancel(context.Background())
}
return sys.ctx, sys.cancel
}
// fetchDataFromAgent attempts to fetch data from the agent,
// prioritizing WebSocket if available.
func (sys *System) fetchDataFromAgent() (*system.CombinedData, error) {
if sys.data == nil {
sys.data = &system.CombinedData{}
}
if sys.WsConn != nil && sys.WsConn.IsConnected() {
wsData, err := sys.fetchDataViaWebSocket()
if err == nil {
return wsData, nil
}
// close the WebSocket connection if error and try SSH
sys.closeWebSocketConnection()
}
sshData, err := sys.fetchDataViaSSH()
if err != nil {
return nil, err
}
return sshData, nil
}
func (sys *System) fetchDataViaWebSocket() (*system.CombinedData, error) {
if sys.WsConn == nil || !sys.WsConn.IsConnected() {
return nil, errors.New("no websocket connection")
}
err := sys.WsConn.RequestSystemData(sys.data)
if err != nil {
return nil, err
}
return sys.data, nil
}
// fetchDataViaSSH handles fetching data using SSH.
// This function encapsulates the original SSH logic.
// It updates sys.data directly upon successful fetch.
func (sys *System) fetchDataViaSSH() (*system.CombinedData, error) {
maxRetries := 1
for attempt := 0; attempt <= maxRetries; attempt++ {
if sys.client == nil || sys.Status == down {
if err := sys.createSSHClient(); err != nil {
return nil, err
}
}
session, err := sys.createSessionWithTimeout(4 * time.Second)
if err != nil {
if attempt >= maxRetries {
return nil, err
}
sys.manager.hub.Logger().Warn("Session closed. Retrying...", "host", sys.Host, "port", sys.Port, "err", err)
sys.closeSSHConnection()
// Reset format detection on connection failure - agent might have been upgraded
continue
}
defer session.Close()
stdout, err := session.StdoutPipe()
if err != nil {
return nil, err
}
if err := session.Shell(); err != nil {
return nil, err
}
*sys.data = system.CombinedData{}
if sys.agentVersion.GTE(beszel.MinVersionCbor) {
err = cbor.NewDecoder(stdout).Decode(sys.data)
} else {
err = json.NewDecoder(stdout).Decode(sys.data)
}
if err != nil {
sys.closeSSHConnection()
if attempt < maxRetries {
continue
}
return nil, err
}
// wait for the session to complete
if err := session.Wait(); err != nil {
return nil, err
}
return sys.data, nil
}
// this should never be reached due to the return in the loop
return nil, fmt.Errorf("failed to fetch data")
}
// createSSHClient creates a new SSH client for the system
func (s *System) createSSHClient() error {
if s.manager.sshConfig == nil {
if err := s.manager.createSSHClientConfig(); err != nil {
return err
}
}
network := "tcp"
host := s.Host
if strings.HasPrefix(host, "/") {
network = "unix"
} else {
host = net.JoinHostPort(host, s.Port)
}
var err error
s.client, err = ssh.Dial(network, host, s.manager.sshConfig)
if err != nil {
return err
}
s.agentVersion, _ = extractAgentVersion(string(s.client.Conn.ServerVersion()))
return nil
}
// createSessionWithTimeout creates a new SSH session with a timeout to avoid hanging
// in case of network issues
func (sys *System) createSessionWithTimeout(timeout time.Duration) (*ssh.Session, error) {
if sys.client == nil {
return nil, fmt.Errorf("client not initialized")
}
ctx, cancel := context.WithTimeout(sys.ctx, timeout)
defer cancel()
sessionChan := make(chan *ssh.Session, 1)
errChan := make(chan error, 1)
go func() {
if session, err := sys.client.NewSession(); err != nil {
errChan <- err
} else {
sessionChan <- session
}
}()
select {
case session := <-sessionChan:
return session, nil
case err := <-errChan:
return nil, err
case <-ctx.Done():
return nil, fmt.Errorf("timeout")
}
}
// closeSSHConnection closes the SSH connection but keeps the system in the manager
func (sys *System) closeSSHConnection() {
if sys.client != nil {
sys.client.Close()
sys.client = nil
}
}
// closeWebSocketConnection closes the WebSocket connection but keeps the system in the manager
// to allow updating via SSH. It will be removed if the WS connection is re-established.
// The system will be set as down a few seconds later if the connection is not re-established.
func (sys *System) closeWebSocketConnection() {
if sys.WsConn != nil {
sys.WsConn.Close(nil)
}
}
// extractAgentVersion extracts the beszel version from SSH server version string
func extractAgentVersion(versionString string) (semver.Version, error) {
_, after, _ := strings.Cut(versionString, "_")
return semver.Parse(after)
}
// getJitter returns a channel that will be triggered after a random delay
// between 40% and 90% of the interval.
// This is used to stagger the initial WebSocket connections to prevent clustering.
func getJitter() <-chan time.Time {
minPercent := 40
maxPercent := 90
jitterRange := maxPercent - minPercent
msDelay := (interval * minPercent / 100) + rand.Intn(interval*jitterRange/100)
return time.After(time.Duration(msDelay) * time.Millisecond)
}

View File

@@ -0,0 +1,348 @@
package systems
import (
"errors"
"fmt"
"time"
"github.com/henrygd/beszel/src/hub/ws"
"github.com/henrygd/beszel/src/entities/system"
"github.com/henrygd/beszel/src/common"
"github.com/henrygd/beszel"
"github.com/blang/semver"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/store"
"golang.org/x/crypto/ssh"
)
// System status constants
const (
up string = "up" // System is online and responding
down string = "down" // System is offline or not responding
paused string = "paused" // System monitoring is paused
pending string = "pending" // System is waiting on initial connection result
// interval is the default update interval in milliseconds (60 seconds)
interval int = 60_000
// interval int = 10_000 // Debug interval for faster updates
// sessionTimeout is the maximum time to wait for SSH connections
sessionTimeout = 4 * time.Second
)
// errSystemExists is returned when attempting to add a system that already exists
var errSystemExists = errors.New("system exists")
// SystemManager manages a collection of monitored systems and their connections.
// It handles system lifecycle, status updates, and maintains both SSH and WebSocket connections.
type SystemManager struct {
hub hubLike // Hub interface for database and alert operations
systems *store.Store[string, *System] // Thread-safe store of active systems
sshConfig *ssh.ClientConfig // SSH client configuration for system connections
}
// hubLike defines the interface requirements for the hub dependency.
// It extends core.App with system-specific functionality.
type hubLike interface {
core.App
GetSSHKey(dataDir string) (ssh.Signer, error)
HandleSystemAlerts(systemRecord *core.Record, data *system.CombinedData) error
HandleStatusAlerts(status string, systemRecord *core.Record) error
}
// NewSystemManager creates a new SystemManager instance with the provided hub.
// The hub must implement the hubLike interface to provide database and alert functionality.
func NewSystemManager(hub hubLike) *SystemManager {
return &SystemManager{
systems: store.New(map[string]*System{}),
hub: hub,
}
}
// Initialize sets up the system manager by binding event hooks and starting existing systems.
// It configures SSH client settings and begins monitoring all non-paused systems from the database.
// Systems are started with staggered delays to prevent overwhelming the hub during startup.
func (sm *SystemManager) Initialize() error {
sm.bindEventHooks()
// Initialize SSH client configuration
err := sm.createSSHClientConfig()
if err != nil {
return err
}
// Load existing systems from database (excluding paused ones)
var systems []*System
err = sm.hub.DB().NewQuery("SELECT id, host, port, status FROM systems WHERE status != 'paused'").All(&systems)
if err != nil || len(systems) == 0 {
return err
}
// Start systems in background with staggered timing
go func() {
// Calculate staggered delay between system starts (max 2 seconds per system)
delta := interval / max(1, len(systems))
delta = min(delta, 2_000)
sleepTime := time.Duration(delta) * time.Millisecond
for _, system := range systems {
time.Sleep(sleepTime)
_ = sm.AddSystem(system)
}
}()
return nil
}
// bindEventHooks registers event handlers for system and fingerprint record changes.
// These hooks ensure the system manager stays synchronized with database changes.
func (sm *SystemManager) bindEventHooks() {
sm.hub.OnRecordCreate("systems").BindFunc(sm.onRecordCreate)
sm.hub.OnRecordAfterCreateSuccess("systems").BindFunc(sm.onRecordAfterCreateSuccess)
sm.hub.OnRecordUpdate("systems").BindFunc(sm.onRecordUpdate)
sm.hub.OnRecordAfterUpdateSuccess("systems").BindFunc(sm.onRecordAfterUpdateSuccess)
sm.hub.OnRecordAfterDeleteSuccess("systems").BindFunc(sm.onRecordAfterDeleteSuccess)
sm.hub.OnRecordAfterUpdateSuccess("fingerprints").BindFunc(sm.onTokenRotated)
}
// onTokenRotated handles fingerprint token rotation events.
// When a system's authentication token is rotated, any existing WebSocket connection
// must be closed to force re-authentication with the new token.
func (sm *SystemManager) onTokenRotated(e *core.RecordEvent) error {
systemID := e.Record.GetString("system")
system, ok := sm.systems.GetOk(systemID)
if !ok {
return e.Next()
}
// No need to close connection if not connected via websocket
if system.WsConn == nil {
return e.Next()
}
system.setDown(nil)
sm.RemoveSystem(systemID)
return e.Next()
}
// onRecordCreate is called before a new system record is committed to the database.
// It initializes the record with default values: empty info and pending status.
func (sm *SystemManager) onRecordCreate(e *core.RecordEvent) error {
e.Record.Set("info", system.Info{})
e.Record.Set("status", pending)
return e.Next()
}
// onRecordAfterCreateSuccess is called after a new system record is successfully created.
// It adds the new system to the manager to begin monitoring.
func (sm *SystemManager) onRecordAfterCreateSuccess(e *core.RecordEvent) error {
if err := sm.AddRecord(e.Record, nil); err != nil {
e.App.Logger().Error("Error adding record", "err", err)
}
return e.Next()
}
// onRecordUpdate is called before a system record is updated in the database.
// It clears system info when the status is changed to paused.
func (sm *SystemManager) onRecordUpdate(e *core.RecordEvent) error {
if e.Record.GetString("status") == paused {
e.Record.Set("info", system.Info{})
}
return e.Next()
}
// onRecordAfterUpdateSuccess handles system record updates after they're committed to the database.
// It manages system lifecycle based on status changes and triggers appropriate alerts.
// Status transitions are handled as follows:
// - paused: Closes SSH connection and deactivates alerts
// - pending: Starts monitoring (reuses WebSocket if available)
// - up: Triggers system alerts
// - down: Triggers status change alerts
func (sm *SystemManager) onRecordAfterUpdateSuccess(e *core.RecordEvent) error {
newStatus := e.Record.GetString("status")
prevStatus := pending
system, ok := sm.systems.GetOk(e.Record.Id)
if ok {
prevStatus = system.Status
system.Status = newStatus
}
switch newStatus {
case paused:
if ok {
// Pause monitoring but keep system in manager for potential resume
system.closeSSHConnection()
}
_ = deactivateAlerts(e.App, e.Record.Id)
return e.Next()
case pending:
// Resume monitoring, preferring existing WebSocket connection
if ok && system.WsConn != nil {
go system.update()
return e.Next()
}
// Start new monitoring session
if err := sm.AddRecord(e.Record, nil); err != nil {
e.App.Logger().Error("Error adding record", "err", err)
}
_ = deactivateAlerts(e.App, e.Record.Id)
return e.Next()
}
// Handle systems not in manager
if !ok {
return sm.AddRecord(e.Record, nil)
}
// Trigger system alerts when system comes online
if newStatus == up {
if err := sm.hub.HandleSystemAlerts(e.Record, system.data); err != nil {
e.App.Logger().Error("Error handling system alerts", "err", err)
}
}
// Trigger status change alerts for up/down transitions
if (newStatus == down && prevStatus == up) || (newStatus == up && prevStatus == down) {
if err := sm.hub.HandleStatusAlerts(newStatus, e.Record); err != nil {
e.App.Logger().Error("Error handling status alerts", "err", err)
}
}
return e.Next()
}
// onRecordAfterDeleteSuccess is called after a system record is successfully deleted.
// It removes the system from the manager and cleans up all associated resources.
func (sm *SystemManager) onRecordAfterDeleteSuccess(e *core.RecordEvent) error {
sm.RemoveSystem(e.Record.Id)
return e.Next()
}
// AddSystem adds a system to the manager and starts monitoring it.
// It validates required fields, initializes the system context, and starts the update goroutine.
// Returns error if a system with the same ID already exists.
func (sm *SystemManager) AddSystem(sys *System) error {
if sm.systems.Has(sys.Id) {
return errSystemExists
}
if sys.Id == "" || sys.Host == "" {
return errors.New("system missing required fields")
}
// Initialize system for monitoring
sys.manager = sm
sys.ctx, sys.cancel = sys.getContext()
sys.data = &system.CombinedData{}
sm.systems.Set(sys.Id, sys)
// Start monitoring in background
go sys.StartUpdater()
return nil
}
// RemoveSystem removes a system from the manager and cleans up all associated resources.
// It cancels the system's context, closes all connections, and removes it from the store.
// Returns an error if the system is not found.
func (sm *SystemManager) RemoveSystem(systemID string) error {
system, ok := sm.systems.GetOk(systemID)
if !ok {
return errors.New("system not found")
}
// Stop the update goroutine
if system.cancel != nil {
system.cancel()
}
// Clean up all connections
system.closeSSHConnection()
system.closeWebSocketConnection()
sm.systems.Remove(systemID)
return nil
}
// AddRecord creates a System instance from a database record and adds it to the manager.
// If a system with the same ID already exists, it's removed first to ensure clean state.
// If no system instance is provided, a new one is created.
// This method is typically called when systems are created or their status changes to pending.
func (sm *SystemManager) AddRecord(record *core.Record, system *System) (err error) {
// Remove existing system to ensure clean state
if sm.systems.Has(record.Id) {
_ = sm.RemoveSystem(record.Id)
}
// Create new system if none provided
if system == nil {
system = sm.NewSystem(record.Id)
}
// Populate system from record
system.Status = record.GetString("status")
system.Host = record.GetString("host")
system.Port = record.GetString("port")
return sm.AddSystem(system)
}
// AddWebSocketSystem creates and adds a system with an established WebSocket connection.
// This method is called when an agent connects via WebSocket with valid authentication.
// The system is immediately added to monitoring with the provided connection and version info.
func (sm *SystemManager) AddWebSocketSystem(systemId string, agentVersion semver.Version, wsConn *ws.WsConn) error {
systemRecord, err := sm.hub.FindRecordById("systems", systemId)
if err != nil {
return err
}
system := sm.NewSystem(systemId)
system.WsConn = wsConn
system.agentVersion = agentVersion
if err := sm.AddRecord(systemRecord, system); err != nil {
return err
}
return nil
}
// createSSHClientConfig initializes the SSH client configuration for connecting to an agent's server
func (sm *SystemManager) createSSHClientConfig() error {
privateKey, err := sm.hub.GetSSHKey("")
if err != nil {
return err
}
sm.sshConfig = &ssh.ClientConfig{
User: "u",
Auth: []ssh.AuthMethod{
ssh.PublicKeys(privateKey),
},
Config: ssh.Config{
Ciphers: common.DefaultCiphers,
KeyExchanges: common.DefaultKeyExchanges,
MACs: common.DefaultMACs,
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
ClientVersion: fmt.Sprintf("SSH-2.0-%s_%s", beszel.AppName, beszel.Version),
Timeout: sessionTimeout,
}
return nil
}
// deactivateAlerts finds all triggered alerts for a system and sets them to inactive.
// This is called when a system is paused or goes offline to prevent continued alerts.
func deactivateAlerts(app core.App, systemID string) error {
// Note: Direct SQL updates don't trigger SSE, so we use the PocketBase API
// _, err := app.DB().NewQuery(fmt.Sprintf("UPDATE alerts SET triggered = false WHERE system = '%s'", systemID)).Execute()
alerts, err := app.FindRecordsByFilter("alerts", fmt.Sprintf("system = '%s' && triggered = 1", systemID), "", -1, 0)
if err != nil {
return err
}
for _, alert := range alerts {
alert.Set("triggered", false)
if err := app.SaveNoValidate(alert); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,422 @@
//go:build testing
// +build testing
package systems_test
import (
"fmt"
"sync"
"testing"
"testing/synctest"
"time"
"github.com/henrygd/beszel/src/entities/container"
"github.com/henrygd/beszel/src/entities/system"
"github.com/henrygd/beszel/src/hub/systems"
"github.com/henrygd/beszel/src/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSystemManagerNew(t *testing.T) {
hub, err := tests.NewTestHub(t.TempDir())
if err != nil {
t.Fatal(err)
}
defer hub.Cleanup()
sm := hub.GetSystemManager()
user, err := tests.CreateUser(hub, "test@test.com", "testtesttest")
require.NoError(t, err)
synctest.Test(t, func(t *testing.T) {
sm.Initialize()
record, err := tests.CreateRecord(hub, "systems", map[string]any{
"name": "it-was-coney-island",
"host": "the-playground-of-the-world",
"port": "33914",
"users": []string{user.Id},
})
require.NoError(t, err)
assert.Equal(t, "pending", record.GetString("status"), "System status should be 'pending'")
assert.Equal(t, "pending", sm.GetSystemStatusFromStore(record.Id), "System status should be 'pending'")
// Verify the system host and port
host, port := sm.GetSystemHostPort(record.Id)
assert.Equal(t, record.GetString("host"), host, "System host should match")
assert.Equal(t, record.GetString("port"), port, "System port should match")
time.Sleep(13 * time.Second)
synctest.Wait()
assert.Equal(t, "pending", record.Fresh().GetString("status"), "System status should be 'pending'")
// Verify the system was added by checking if it exists
assert.True(t, sm.HasSystem(record.Id), "System should exist in the store")
time.Sleep(10 * time.Second)
synctest.Wait()
// system should be set to down after 15 seconds (no websocket connection)
assert.Equal(t, "down", sm.GetSystemStatusFromStore(record.Id), "System status should be 'down'")
// make sure the system is down in the db
record, err = hub.FindRecordById("systems", record.Id)
require.NoError(t, err)
assert.Equal(t, "down", record.GetString("status"), "System status should be 'down'")
assert.Equal(t, 1, sm.GetSystemCount(), "System count should be 1")
err = sm.RemoveSystem(record.Id)
assert.NoError(t, err)
assert.Equal(t, 0, sm.GetSystemCount(), "System count should be 0")
assert.False(t, sm.HasSystem(record.Id), "System should not exist in the store after removal")
// let's also make sure a system is removed from the store when the record is deleted
record, err = tests.CreateRecord(hub, "systems", map[string]any{
"name": "there-was-no-place-like-it",
"host": "in-the-whole-world",
"port": "33914",
"users": []string{user.Id},
})
require.NoError(t, err)
assert.True(t, sm.HasSystem(record.Id), "System should exist in the store after creation")
time.Sleep(8 * time.Second)
synctest.Wait()
assert.Equal(t, "pending", sm.GetSystemStatusFromStore(record.Id), "System status should be 'pending'")
sm.SetSystemStatusInDB(record.Id, "up")
time.Sleep(time.Second)
synctest.Wait()
assert.Equal(t, "up", sm.GetSystemStatusFromStore(record.Id), "System status should be 'up'")
// make sure the system switches to down after 11 seconds
sm.RemoveSystem(record.Id)
sm.AddRecord(record, nil)
assert.Equal(t, "pending", sm.GetSystemStatusFromStore(record.Id), "System status should be 'pending'")
time.Sleep(12 * time.Second)
synctest.Wait()
assert.Equal(t, "down", sm.GetSystemStatusFromStore(record.Id), "System status should be 'down'")
// sm.SetSystemStatusInDB(record.Id, "paused")
// time.Sleep(time.Second)
// synctest.Wait()
// assert.Equal(t, "paused", sm.GetSystemStatusFromStore(record.Id), "System status should be 'paused'")
// delete the record
err = hub.Delete(record)
require.NoError(t, err)
assert.False(t, sm.HasSystem(record.Id), "System should not exist in the store after deletion")
})
testOld(t, hub)
synctest.Test(t, func(t *testing.T) {
time.Sleep(time.Second)
synctest.Wait()
for _, systemId := range sm.GetAllSystemIDs() {
err = sm.RemoveSystem(systemId)
require.NoError(t, err)
assert.False(t, sm.HasSystem(systemId), "System should not exist in the store after deletion")
}
assert.Equal(t, 0, sm.GetSystemCount(), "System count should be 0")
// TODO: test with websocket client
})
}
func testOld(t *testing.T, hub *tests.TestHub) {
user, err := tests.CreateUser(hub, "test@testy.com", "testtesttest")
require.NoError(t, err)
sm := hub.GetSystemManager()
assert.NotNil(t, sm)
// error expected when creating a user with a duplicate email
_, err = tests.CreateUser(hub, "test@test.com", "testtesttest")
require.Error(t, err)
// Test collection existence. todo: move to hub package tests
t.Run("CollectionExistence", func(t *testing.T) {
// Verify that required collections exist
systems, err := hub.FindCachedCollectionByNameOrId("systems")
require.NoError(t, err)
assert.NotNil(t, systems)
systemStats, err := hub.FindCachedCollectionByNameOrId("system_stats")
require.NoError(t, err)
assert.NotNil(t, systemStats)
containerStats, err := hub.FindCachedCollectionByNameOrId("container_stats")
require.NoError(t, err)
assert.NotNil(t, containerStats)
})
t.Run("RemoveSystem", func(t *testing.T) {
// Get the count before adding the system
countBefore := sm.GetSystemCount()
// Create a test system record
record, err := tests.CreateRecord(hub, "systems", map[string]any{
"name": "i-even-got-lost-at-coney-island",
"host": "but-they-found-me",
"port": "33914",
"users": []string{user.Id},
})
require.NoError(t, err)
// Verify the system count increased
countAfterAdd := sm.GetSystemCount()
assert.Equal(t, countBefore+1, countAfterAdd, "System count should increase after adding a system via event hook")
// Verify the system exists
assert.True(t, sm.HasSystem(record.Id), "System should exist in the store")
// Remove the system
err = sm.RemoveSystem(record.Id)
assert.NoError(t, err)
// Check that the system count decreased
countAfterRemove := sm.GetSystemCount()
assert.Equal(t, countAfterAdd-1, countAfterRemove, "System count should decrease after removing a system")
// Verify the system no longer exists
assert.False(t, sm.HasSystem(record.Id), "System should not exist in the store after removal")
// Verify the system is not in the list of all system IDs
ids := sm.GetAllSystemIDs()
assert.NotContains(t, ids, record.Id, "System ID should not be in the list of all system IDs after removal")
// Verify the system status is empty
status := sm.GetSystemStatusFromStore(record.Id)
assert.Equal(t, "", status, "System status should be empty after removal")
// Try to remove it again - should return an error since it's already removed
err = sm.RemoveSystem(record.Id)
assert.Error(t, err)
})
t.Run("NewRecordPending", func(t *testing.T) {
// Create a test system
record, err := tests.CreateRecord(hub, "systems", map[string]any{
"name": "and-you-know",
"host": "i-feel-very-bad",
"port": "33914",
"users": []string{user.Id},
})
require.NoError(t, err)
// Add the record to the system manager
err = sm.AddRecord(record, nil)
require.NoError(t, err)
// Test filtering records by status - should be "pending" now
filter := "status = 'pending'"
pendingSystems, err := hub.FindRecordsByFilter("systems", filter, "-created", 0, 0, nil)
require.NoError(t, err)
assert.GreaterOrEqual(t, len(pendingSystems), 1)
})
t.Run("SystemStatusUpdate", func(t *testing.T) {
// Create a test system record
record, err := tests.CreateRecord(hub, "systems", map[string]any{
"name": "we-used-to-sleep-on-the-beach",
"host": "sleep-overnight-here",
"port": "33914",
"users": []string{user.Id},
})
require.NoError(t, err)
// Add the record to the system manager
err = sm.AddRecord(record, nil)
require.NoError(t, err)
// Test status changes
initialStatus := sm.GetSystemStatusFromStore(record.Id)
// Set a new status
sm.SetSystemStatusInDB(record.Id, "up")
// Verify status was updated
newStatus := sm.GetSystemStatusFromStore(record.Id)
assert.Equal(t, "up", newStatus, "System status should be updated to 'up'")
assert.NotEqual(t, initialStatus, newStatus, "Status should have changed")
// Verify the database was updated
updatedRecord, err := hub.FindRecordById("systems", record.Id)
require.NoError(t, err)
assert.Equal(t, "up", updatedRecord.Get("status"), "Database status should match")
})
t.Run("HandleSystemData", func(t *testing.T) {
// Create a test system record
record, err := tests.CreateRecord(hub, "systems", map[string]any{
"name": "things-changed-you-know",
"host": "they-dont-sleep-anymore-on-the-beach",
"port": "33914",
"users": []string{user.Id},
})
require.NoError(t, err)
// Create test system data
testData := &system.CombinedData{
Info: system.Info{
Hostname: "data-test.example.com",
KernelVersion: "5.15.0-generic",
Cores: 4,
Threads: 8,
CpuModel: "Test CPU",
Uptime: 3600,
Cpu: 25.5,
MemPct: 40.2,
DiskPct: 60.0,
Bandwidth: 100.0,
AgentVersion: "1.0.0",
},
Stats: system.Stats{
Cpu: 25.5,
Mem: 16384.0,
MemUsed: 6553.6,
MemPct: 40.0,
DiskTotal: 1024000.0,
DiskUsed: 614400.0,
DiskPct: 60.0,
NetworkSent: 1024.0,
NetworkRecv: 2048.0,
},
Containers: []*container.Stats{},
}
// Test handling system data. todo: move to hub/alerts package tests
err = hub.HandleSystemAlerts(record, testData)
assert.NoError(t, err)
})
t.Run("ErrorHandling", func(t *testing.T) {
// Try to add a non-existent record
nonExistentId := "non_existent_id"
err := sm.RemoveSystem(nonExistentId)
assert.Error(t, err)
// Try to add a system with invalid host
system := &systems.System{
Host: "",
}
err = sm.AddSystem(system)
assert.Error(t, err)
})
t.Run("ConcurrentOperations", func(t *testing.T) {
// Create a test system
record, err := tests.CreateRecord(hub, "systems", map[string]any{
"name": "jfkjahkfajs",
"host": "localhost",
"port": "33914",
"users": []string{user.Id},
})
require.NoError(t, err)
// Run concurrent operations
const goroutines = 5
var wg sync.WaitGroup
wg.Add(goroutines)
for i := range goroutines {
go func(i int) {
defer wg.Done()
// Alternate between different operations
switch i % 3 {
case 0:
status := fmt.Sprintf("status-%d", i)
sm.SetSystemStatusInDB(record.Id, status)
case 1:
_ = sm.GetSystemStatusFromStore(record.Id)
case 2:
_, _ = sm.GetSystemHostPort(record.Id)
}
}(i)
}
wg.Wait()
// Verify system still exists and is in a valid state
assert.True(t, sm.HasSystem(record.Id), "System should still exist after concurrent operations")
status := sm.GetSystemStatusFromStore(record.Id)
assert.NotEmpty(t, status, "System should have a status after concurrent operations")
})
t.Run("ContextCancellation", func(t *testing.T) {
// Create a test system record
record, err := tests.CreateRecord(hub, "systems", map[string]any{
"name": "lkhsdfsjf",
"host": "localhost",
"port": "33914",
"users": []string{user.Id},
})
require.NoError(t, err)
// Verify the system exists in the store
assert.True(t, sm.HasSystem(record.Id), "System should exist in the store")
// Store the original context and cancel function
originalCtx, originalCancel, err := sm.GetSystemContextFromStore(record.Id)
assert.NoError(t, err)
// Ensure the context is not nil
assert.NotNil(t, originalCtx, "System context should not be nil")
assert.NotNil(t, originalCancel, "System cancel function should not be nil")
// Cancel the context
originalCancel()
// Wait a short time for cancellation to propagate
time.Sleep(10 * time.Millisecond)
// Verify the context is done
select {
case <-originalCtx.Done():
// Context was properly cancelled
default:
t.Fatal("Context was not cancelled")
}
// Verify the system is still in the store (cancellation shouldn't remove it)
assert.True(t, sm.HasSystem(record.Id), "System should still exist after context cancellation")
// Explicitly remove the system
err = sm.RemoveSystem(record.Id)
assert.NoError(t, err, "RemoveSystem should succeed")
// Verify the system is removed
assert.False(t, sm.HasSystem(record.Id), "System should be removed after RemoveSystem")
// Try to remove it again - should return an error
err = sm.RemoveSystem(record.Id)
assert.Error(t, err, "RemoveSystem should fail for non-existent system")
// Add the system back
err = sm.AddRecord(record, nil)
require.NoError(t, err, "AddRecord should succeed")
// Verify the system is back in the store
assert.True(t, sm.HasSystem(record.Id), "System should exist after re-adding")
// Verify a new context was created
newCtx, newCancel, err := sm.GetSystemContextFromStore(record.Id)
assert.NoError(t, err)
assert.NotNil(t, newCtx, "New system context should not be nil")
assert.NotNil(t, newCancel, "New system cancel function should not be nil")
assert.NotEqual(t, originalCtx, newCtx, "New context should be different from original")
// Clean up
err = sm.RemoveSystem(record.Id)
assert.NoError(t, err)
})
}

View File

@@ -0,0 +1,110 @@
//go:build testing
// +build testing
package systems
import (
"context"
"fmt"
entities "github.com/henrygd/beszel/src/entities/system"
)
// TESTING ONLY: GetSystemCount returns the number of systems in the store
func (sm *SystemManager) GetSystemCount() int {
return sm.systems.Length()
}
// TESTING ONLY: HasSystem checks if a system with the given ID exists in the store
func (sm *SystemManager) HasSystem(systemID string) bool {
return sm.systems.Has(systemID)
}
// TESTING ONLY: GetSystemStatusFromStore returns the status of a system with the given ID
// Returns an empty string if the system doesn't exist
func (sm *SystemManager) GetSystemStatusFromStore(systemID string) string {
sys, ok := sm.systems.GetOk(systemID)
if !ok {
return ""
}
return sys.Status
}
// TESTING ONLY: GetSystemContextFromStore returns the context and cancel function for a system
func (sm *SystemManager) GetSystemContextFromStore(systemID string) (context.Context, context.CancelFunc, error) {
sys, ok := sm.systems.GetOk(systemID)
if !ok {
return nil, nil, fmt.Errorf("no system")
}
return sys.ctx, sys.cancel, nil
}
// TESTING ONLY: GetSystemFromStore returns a store from the system
func (sm *SystemManager) GetSystemFromStore(systemID string) (*System, error) {
sys, ok := sm.systems.GetOk(systemID)
if !ok {
return nil, fmt.Errorf("no system")
}
return sys, nil
}
// TESTING ONLY: GetAllSystemIDs returns a slice of all system IDs in the store
func (sm *SystemManager) GetAllSystemIDs() []string {
data := sm.systems.GetAll()
ids := make([]string, 0, len(data))
for id := range data {
ids = append(ids, id)
}
return ids
}
// TESTING ONLY: GetSystemData returns the combined data for a system with the given ID
// Returns nil if the system doesn't exist
// This method is intended for testing
func (sm *SystemManager) GetSystemData(systemID string) *entities.CombinedData {
sys, ok := sm.systems.GetOk(systemID)
if !ok {
return nil
}
return sys.data
}
// TESTING ONLY: GetSystemHostPort returns the host and port for a system with the given ID
// Returns empty strings if the system doesn't exist
func (sm *SystemManager) GetSystemHostPort(systemID string) (string, string) {
sys, ok := sm.systems.GetOk(systemID)
if !ok {
return "", ""
}
return sys.Host, sys.Port
}
// TESTING ONLY: SetSystemStatusInDB sets the status of a system directly and updates the database record
// This is intended for testing
// Returns false if the system doesn't exist
func (sm *SystemManager) SetSystemStatusInDB(systemID string, status string) bool {
if !sm.HasSystem(systemID) {
return false
}
// Update the database record
record, err := sm.hub.FindRecordById("systems", systemID)
if err != nil {
return false
}
record.Set("status", status)
err = sm.hub.Save(record)
if err != nil {
return false
}
return true
}
// TESTING ONLY: RemoveAllSystems removes all systems from the store
func (sm *SystemManager) RemoveAllSystems() {
for _, system := range sm.systems.GetAll() {
sm.RemoveSystem(system.Id)
}
}