feat: agent data cache to support connections to multiple hubs (#341)

This commit is contained in:
henrygd
2025-03-04 16:25:45 -05:00
parent 681286eb4f
commit c4d8deb986
5 changed files with 150 additions and 23 deletions

View File

@@ -14,6 +14,7 @@ clean:
lint:
golangci-lint run
test: export GOEXPERIMENT=synctest
test:
go test -tags=testing ./...

View File

@@ -9,6 +9,7 @@ import (
"os"
"strings"
"sync"
"time"
"github.com/shirou/gopsutil/v4/common"
)
@@ -27,11 +28,13 @@ type Agent struct {
sensorsWhitelist map[string]struct{} // List of sensors to monitor
systemInfo system.Info // Host system info
gpuManager *GPUManager // Manages GPU data
cache *SessionCache // Cache for system stats based on primary session ID
}
func NewAgent() *Agent {
agent := &Agent{
fsStats: make(map[string]*system.FsStats),
cache: NewSessionCache(69 * time.Second),
}
agent.memCalc, _ = GetEnv("MEM_CALC")
@@ -63,7 +66,7 @@ func NewAgent() *Agent {
// Set sensors whitelist
if sensors, exists := GetEnv("SENSORS"); exists {
agent.sensorsWhitelist = make(map[string]struct{})
for _, sensor := range strings.Split(sensors, ",") {
for sensor := range strings.SplitSeq(sensors, ",") {
if sensor != "" {
agent.sensorsWhitelist[sensor] = struct{}{}
}
@@ -85,7 +88,7 @@ func NewAgent() *Agent {
// if debugging, print stats
if agent.debug {
slog.Debug("Stats", "data", agent.gatherStats())
slog.Debug("Stats", "data", agent.gatherStats(""))
}
return agent
@@ -100,29 +103,37 @@ func GetEnv(key string) (value string, exists bool) {
return os.LookupEnv(key)
}
func (a *Agent) gatherStats() system.CombinedData {
func (a *Agent) gatherStats(sessionID string) *system.CombinedData {
a.Lock()
defer a.Unlock()
slog.Debug("Getting stats")
systemData := system.CombinedData{
cachedData, ok := a.cache.Get(sessionID)
if ok {
slog.Debug("Cached stats", "session", sessionID)
return cachedData
}
*cachedData = system.CombinedData{
Stats: a.getSystemStats(),
Info: a.systemInfo,
}
slog.Debug("System stats", "data", systemData)
// add docker stats
slog.Debug("System stats", "data", cachedData)
if containerStats, err := a.dockerManager.getDockerStats(); err == nil {
systemData.Containers = containerStats
slog.Debug("Docker stats", "data", systemData.Containers)
cachedData.Containers = containerStats
slog.Debug("Docker stats", "data", cachedData.Containers)
} else {
slog.Debug("Error getting docker stats", "err", err)
slog.Debug("Docker stats", "err", err)
}
// add extra filesystems
systemData.Stats.ExtraFs = make(map[string]*system.FsStats)
cachedData.Stats.ExtraFs = make(map[string]*system.FsStats)
for name, stats := range a.fsStats {
if !stats.Root && stats.DiskTotal > 0 {
systemData.Stats.ExtraFs[name] = stats
cachedData.Stats.ExtraFs[name] = stats
}
}
slog.Debug("Extra filesystems", "data", systemData.Stats.ExtraFs)
return systemData
slog.Debug("Extra filesystems", "data", cachedData.Stats.ExtraFs)
a.cache.Set(sessionID, cachedData)
return cachedData
}

View File

@@ -0,0 +1,36 @@
package agent
import (
"beszel/internal/entities/system"
"time"
)
// Not thread safe since we only access from gatherStats which is already locked
type SessionCache struct {
data *system.CombinedData
lastUpdate time.Time
primarySession string
leaseTime time.Duration
}
func NewSessionCache(leaseTime time.Duration) *SessionCache {
return &SessionCache{
leaseTime: leaseTime,
data: &system.CombinedData{},
}
}
func (c *SessionCache) Get(sessionID string) (stats *system.CombinedData, isCached bool) {
if sessionID != c.primarySession && time.Since(c.lastUpdate) < c.leaseTime {
return c.data, true
}
return c.data, false
}
func (c *SessionCache) Set(sessionID string, data *system.CombinedData) {
if data != nil {
*c.data = *data
}
c.primarySession = sessionID
c.lastUpdate = time.Now()
}

View File

@@ -0,0 +1,85 @@
package agent
import (
"beszel/internal/entities/system"
"testing"
"testing/synctest"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSessionCache_GetSet(t *testing.T) {
synctest.Run(func() {
cache := NewSessionCache(69 * time.Second)
testData := &system.CombinedData{
Info: system.Info{
Hostname: "test-host",
Cores: 4,
},
Stats: system.Stats{
Cpu: 50.0,
MemPct: 30.0,
DiskPct: 40.0,
},
}
// Test initial state - should not be cached
data, isCached := cache.Get("session1")
assert.False(t, isCached, "Expected no cached data initially")
assert.NotNil(t, data, "Expected data to be initialized")
// Set data for session1
cache.Set("session1", testData)
time.Sleep(15 * time.Second)
// Get data for a different session - should be cached
data, isCached = cache.Get("session2")
assert.True(t, isCached, "Expected data to be cached for non-primary session")
require.NotNil(t, data, "Expected cached data to be returned")
assert.Equal(t, "test-host", data.Info.Hostname, "Hostname should match test data")
assert.Equal(t, 4, data.Info.Cores, "Cores should match test data")
assert.Equal(t, 50.0, data.Stats.Cpu, "CPU should match test data")
assert.Equal(t, 30.0, data.Stats.MemPct, "Memory percentage should match test data")
assert.Equal(t, 40.0, data.Stats.DiskPct, "Disk percentage should match test data")
time.Sleep(10 * time.Second)
// Get data for the primary session - should not be cached
data, isCached = cache.Get("session1")
assert.False(t, isCached, "Expected data not to be cached for primary session")
require.NotNil(t, data, "Expected data to be returned even if not cached")
assert.Equal(t, "test-host", data.Info.Hostname, "Hostname should match test data")
// if not cached, agent will update the data
cache.Set("session1", testData)
time.Sleep(45 * time.Second)
// Get data for a different session - should still be cached
_, isCached = cache.Get("session2")
assert.True(t, isCached, "Expected data to be cached for non-primary session")
// Wait for the lease to expire
time.Sleep(30 * time.Second)
// Get data for session2 - should not be cached
_, isCached = cache.Get("session2")
assert.False(t, isCached, "Expected data not to be cached after lease expiration")
})
}
func TestSessionCache_NilData(t *testing.T) {
// Create a new SessionCache
cache := NewSessionCache(30 * time.Second)
// Test setting nil data (should not panic)
assert.NotPanics(t, func() {
cache.Set("session1", nil)
}, "Setting nil data should not panic")
// Get data - should not be nil even though we set nil
data, _ := cache.Get("session2")
assert.NotNil(t, data, "Expected data to not be nil after setting nil data")
}

View File

@@ -61,8 +61,8 @@ func (a *Agent) StartServer(opts ServerOptions) error {
}
func (a *Agent) handleSession(s sshServer.Session) {
// slog.Debug("connection", "remoteaddr", s.RemoteAddr(), "user", s.User())
stats := a.gatherStats()
slog.Debug("New session", "client", s.RemoteAddr())
stats := a.gatherStats(s.Context().SessionID())
if err := json.NewEncoder(s).Encode(stats); err != nil {
slog.Error("Error encoding stats", "err", err, "stats", stats)
s.Exit(1)
@@ -74,24 +74,18 @@ func (a *Agent) handleSession(s sshServer.Session) {
// It returns a slice of ssh.PublicKey and an error if any key fails to parse.
func ParseKeys(input string) ([]ssh.PublicKey, error) {
var parsedKeys []ssh.PublicKey
for line := range strings.Lines(input) {
line = strings.TrimSpace(line)
// Skip empty lines or comments
if len(line) == 0 || strings.HasPrefix(line, "#") {
continue
}
// Parse the key
parsedKey, _, _, _, err := ssh.ParseAuthorizedKey([]byte(line))
if err != nil {
return nil, fmt.Errorf("failed to parse key: %s, error: %w", line, err)
}
// Append the parsed key to the list
parsedKeys = append(parsedKeys, parsedKey)
}
return parsedKeys, nil
}