From b5607025f78dd2ebfa8110c50377df1b5b9d7bb0 Mon Sep 17 00:00:00 2001 From: Henry Dollman Date: Sun, 18 Aug 2024 17:45:39 -0400 Subject: [PATCH] slight improvements to agent memory usage --- beszel/cmd/agent/agent.go | 12 +-- beszel/internal/agent/agent.go | 176 ++++++++++++++++++++------------- 2 files changed, 113 insertions(+), 75 deletions(-) diff --git a/beszel/cmd/agent/agent.go b/beszel/cmd/agent/agent.go index a8e081d..0ea0097 100644 --- a/beszel/cmd/agent/agent.go +++ b/beszel/cmd/agent/agent.go @@ -29,14 +29,14 @@ func main() { log.Fatal("KEY environment variable is not set") } - port := ":45876" - if p, exists := os.LookupEnv("PORT"); exists { + addr := ":45876" + if portEnvVar, exists := os.LookupEnv("PORT"); exists { // allow passing an address in the form of "127.0.0.1:45876" - if !strings.Contains(p, ":") { - p = ":" + p + if !strings.Contains(portEnvVar, ":") { + portEnvVar = ":" + portEnvVar } - port = p + addr = portEnvVar } - agent.NewAgent(pubKey, port).Run() + agent.NewAgent(pubKey, addr).Run() } diff --git a/beszel/internal/agent/agent.go b/beszel/internal/agent/agent.go index 7a09989..b6e656a 100644 --- a/beszel/internal/agent/agent.go +++ b/beszel/internal/agent/agent.go @@ -3,6 +3,7 @@ package agent import ( "beszel/internal/entities/container" "beszel/internal/entities/system" + "bytes" "context" "encoding/json" "fmt" @@ -26,27 +27,39 @@ import ( psutilNet "github.com/shirou/gopsutil/v4/net" ) -var containerStatsMap = make(map[string]*container.PrevContainerStats) - type Agent struct { - port string + addr string pubKey []byte sem chan struct{} + containerStatsMap map[string]*container.PrevContainerStats containerStatsMutex *sync.Mutex - diskIoStats system.DiskIoStats - netIoStats system.NetIoStats + diskIoStats *system.DiskIoStats + netIoStats *system.NetIoStats dockerClient *http.Client + containerStatsPool *sync.Pool + bufferPool *sync.Pool } -func NewAgent(pubKey []byte, port string) *Agent { +func NewAgent(pubKey []byte, addr string) *Agent { return &Agent{ + addr: addr, pubKey: pubKey, sem: make(chan struct{}, 15), - port: port, + containerStatsMap: make(map[string]*container.PrevContainerStats), containerStatsMutex: &sync.Mutex{}, - diskIoStats: system.DiskIoStats{}, - netIoStats: system.NetIoStats{}, + diskIoStats: &system.DiskIoStats{}, + netIoStats: &system.NetIoStats{}, dockerClient: newDockerClient(), + containerStatsPool: &sync.Pool{ + New: func() interface{} { + return new(container.Stats) + }, + }, + bufferPool: &sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, } } @@ -156,14 +169,14 @@ func (a *Agent) getDockerStats() ([]*container.Stats, error) { resp, err := a.dockerClient.Get("http://localhost/containers/json") if err != nil { a.closeIdleConnections(err) - return []*container.Stats{}, err + return nil, err } defer resp.Body.Close() var containers []*container.ApiInfo if err := json.NewDecoder(resp.Body).Decode(&containers); err != nil { log.Printf("Error decoding containers: %+v\n", err) - return []*container.Stats{}, err + return nil, err } containerStats := make([]*container.Stats, 0, len(containers)) @@ -206,10 +219,10 @@ func (a *Agent) getDockerStats() ([]*container.Stats, error) { wg.Wait() - for id := range containerStatsMap { + for id := range a.containerStatsMap { if _, exists := validIds[id]; !exists { // log.Printf("Removing container cpu map entry: %+v\n", id) - delete(containerStatsMap, id) + delete(a.containerStatsMap, id) } } @@ -220,15 +233,27 @@ func (a *Agent) getContainerStats(ctr *container.ApiInfo) (*container.Stats, err // use semaphore to limit concurrency a.acquireSemaphore() defer a.releaseSemaphore() + resp, err := a.dockerClient.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1") if err != nil { - return &container.Stats{}, err + return nil, err } defer resp.Body.Close() + // get a buffer from the pool + buf := a.bufferPool.Get().(*bytes.Buffer) + defer a.bufferPool.Put(buf) + buf.Reset() + // read the response body into the buffer + _, err = io.Copy(buf, resp.Body) + if err != nil { + return nil, err + } + + // unmarshal the json data from the buffer var statsJson container.ApiStats - if err := json.NewDecoder(resp.Body).Decode(&statsJson); err != nil { - log.Fatal(err) + if err := json.Unmarshal(buf.Bytes(), &statsJson); err != nil { + return nil, err } name := ctr.Names[0][1:] @@ -244,10 +269,10 @@ func (a *Agent) getContainerStats(ctr *container.ApiInfo) (*container.Stats, err defer a.containerStatsMutex.Unlock() // add empty values if they doesn't exist in map - stats, initialized := containerStatsMap[ctr.IdShort] + stats, initialized := a.containerStatsMap[ctr.IdShort] if !initialized { stats = &container.PrevContainerStats{} - containerStatsMap[ctr.IdShort] = stats + a.containerStatsMap[ctr.IdShort] = stats } // cpu @@ -255,7 +280,7 @@ func (a *Agent) getContainerStats(ctr *container.ApiInfo) (*container.Stats, err systemDelta := statsJson.CPUStats.SystemUsage - stats.Cpu[1] cpuPct := float64(cpuDelta) / float64(systemDelta) * 100 if cpuPct > 100 { - return &container.Stats{}, fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct) + return nil, fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct) } stats.Cpu = [2]uint64{statsJson.CPUStats.CPUUsage.TotalUsage, statsJson.CPUStats.SystemUsage} @@ -277,13 +302,13 @@ func (a *Agent) getContainerStats(ctr *container.ApiInfo) (*container.Stats, err stats.Net.Recv = total_recv stats.Net.Time = time.Now() - cStats := &container.Stats{ - Name: name, - Cpu: twoDecimals(cpuPct), - Mem: bytesToMegabytes(float64(usedMemory)), - NetworkSent: bytesToMegabytes(sent_delta), - NetworkRecv: bytesToMegabytes(recv_delta), - } + cStats := a.containerStatsPool.Get().(*container.Stats) + cStats.Name = name + cStats.Cpu = twoDecimals(cpuPct) + cStats.Mem = bytesToMegabytes(float64(usedMemory)) + cStats.NetworkSent = bytesToMegabytes(sent_delta) + cStats.NetworkRecv = bytesToMegabytes(recv_delta) + return cStats, nil } @@ -291,7 +316,7 @@ func (a *Agent) getContainerStats(ctr *container.ApiInfo) (*container.Stats, err func (a *Agent) deleteContainerStatsSync(id string) { a.containerStatsMutex.Lock() defer a.containerStatsMutex.Unlock() - delete(containerStatsMap, id) + delete(a.containerStatsMap, id) } func (a *Agent) gatherStats() *system.CombinedData { @@ -299,28 +324,28 @@ func (a *Agent) gatherStats() *system.CombinedData { systemData := &system.CombinedData{ Stats: systemStats, Info: systemInfo, - // Containers: []*container.Stats{}, } if containerStats, err := a.getDockerStats(); err == nil { systemData.Containers = containerStats } - // fmt.Printf("%+v\n", stats) + // fmt.Printf("%+v\n", systemData) return systemData } -func (a *Agent) startServer(addr string, pubKey []byte) { - sshServer.Handle(func(s sshServer.Session) { - stats := a.gatherStats() - var jsonStats []byte - jsonStats, _ = json.Marshal(stats) - io.WriteString(s, string(jsonStats)) - s.Exit(0) - }) +// return container stats to pool +func (a *Agent) returnStatsToPool(containerStats []*container.Stats) { + for _, stats := range containerStats { + a.containerStatsPool.Put(stats) + } +} - log.Printf("Starting SSH server on %s", addr) - if err := sshServer.ListenAndServe(addr, nil, sshServer.NoPty(), +func (a *Agent) startServer() { + sshServer.Handle(a.handleSession) + + log.Printf("Starting SSH server on %s", a.addr) + if err := sshServer.ListenAndServe(a.addr, nil, sshServer.NoPty(), sshServer.PublicKeyAuth(func(ctx sshServer.Context, key sshServer.PublicKey) bool { - allowed, _, _, _, _ := sshServer.ParseAuthorizedKey(pubKey) + allowed, _, _, _, _ := sshServer.ParseAuthorizedKey(a.pubKey) return sshServer.KeysEqual(key, allowed) }), ); err != nil { @@ -328,6 +353,18 @@ func (a *Agent) startServer(addr string, pubKey []byte) { } } +func (a *Agent) handleSession(s sshServer.Session) { + stats := a.gatherStats() + defer a.returnStatsToPool(stats.Containers) + encoder := json.NewEncoder(s) + if err := encoder.Encode(stats); err != nil { + log.Println("Error encoding stats:", err.Error()) + s.Exit(1) + return + } + s.Exit(0) +} + func (a *Agent) Run() { if filesystem, exists := os.LookupEnv("FILESYSTEM"); exists { a.diskIoStats.Filesystem = filesystem @@ -338,7 +375,35 @@ func (a *Agent) Run() { a.initializeDiskIoStats() a.initializeNetIoStats() - a.startServer(a.port, a.pubKey) + a.startServer() +} + +func (a *Agent) initializeDiskIoStats() { + if io, err := disk.IOCounters(a.diskIoStats.Filesystem); err == nil { + for _, d := range io { + a.diskIoStats.Time = time.Now() + a.diskIoStats.Read = d.ReadBytes + a.diskIoStats.Write = d.WriteBytes + } + } +} + +func (a *Agent) initializeNetIoStats() { + if netIO, err := psutilNet.IOCounters(true); err == nil { + bytesSent := uint64(0) + bytesRecv := uint64(0) + for _, v := range netIO { + if skipNetworkInterface(&v) { + continue + } + log.Printf("Found network interface: %+v (%+v recv, %+v sent)\n", v.Name, v.BytesRecv, v.BytesSent) + bytesSent += v.BytesSent + bytesRecv += v.BytesRecv + } + a.netIoStats.BytesSent = bytesSent + a.netIoStats.BytesRecv = bytesRecv + a.netIoStats.Time = time.Now() + } } func bytesToMegabytes(b float64) float64 { @@ -379,34 +444,6 @@ func skipNetworkInterface(v *psutilNet.IOCountersStat) bool { } } -func (a *Agent) initializeDiskIoStats() { - if io, err := disk.IOCounters(a.diskIoStats.Filesystem); err == nil { - for _, d := range io { - a.diskIoStats.Time = time.Now() - a.diskIoStats.Read = d.ReadBytes - a.diskIoStats.Write = d.WriteBytes - } - } -} - -func (a *Agent) initializeNetIoStats() { - if netIO, err := psutilNet.IOCounters(true); err == nil { - bytesSent := uint64(0) - bytesRecv := uint64(0) - for _, v := range netIO { - if skipNetworkInterface(&v) { - continue - } - log.Printf("Found network interface: %+v (%+v recv, %+v sent)\n", v.Name, v.BytesRecv, v.BytesSent) - bytesSent += v.BytesSent - bytesRecv += v.BytesRecv - } - a.netIoStats.BytesSent = bytesSent - a.netIoStats.BytesRecv = bytesRecv - a.netIoStats.Time = time.Now() - } -} - func newDockerClient() *http.Client { dockerHost := "unix:///var/run/docker.sock" if dockerHostEnv, exists := os.LookupEnv("DOCKER_HOST"); exists { @@ -422,6 +459,7 @@ func newDockerClient() *http.Client { ForceAttemptHTTP2: false, IdleConnTimeout: 90 * time.Second, DisableCompression: true, + MaxConnsPerHost: 20, MaxIdleConnsPerHost: 20, DisableKeepAlives: false, }