diff --git a/beszel/internal/agent/docker.go b/beszel/internal/agent/docker.go index 5ef4763..b634979 100644 --- a/beszel/internal/agent/docker.go +++ b/beszel/internal/agent/docker.go @@ -2,6 +2,7 @@ package agent import ( "beszel/internal/entities/container" + "bytes" "context" "encoding/json" "fmt" @@ -27,6 +28,9 @@ type dockerManager struct { validIds map[string]struct{} // Map of valid container ids, used to prune invalid containers from containerStatsMap goodDockerVersion bool // Whether docker version is at least 25.0.0 (one-shot works correctly) isWindows bool // Whether the Docker Engine API is running on Windows + buf *bytes.Buffer // Buffer to store and read response bodies + decoder *json.Decoder // Reusable JSON decoder that reads from buf + apiStats *container.ApiStats // Reusable API stats object } // userAgentRoundTripper is a custom http.RoundTripper that adds a User-Agent header to all requests @@ -63,10 +67,9 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) { if err != nil { return nil, err } - defer resp.Body.Close() dm.apiContainerList = dm.apiContainerList[:0] - if err := json.NewDecoder(resp.Body).Decode(&dm.apiContainerList); err != nil { + if err := dm.decode(resp, &dm.apiContainerList); err != nil { return nil, err } @@ -83,7 +86,8 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) { var failedContainers []*container.ApiInfo - for _, ctr := range dm.apiContainerList { + for i := range dm.apiContainerList { + ctr := dm.apiContainerList[i] ctr.IdShort = ctr.Id[:12] dm.validIds[ctr.IdShort] = struct{}{} // check if container is less than 1 minute old (possible restart) @@ -111,7 +115,8 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) { // retry failed containers separately so we can run them in parallel (docker 24 bug) if len(failedContainers) > 0 { slog.Debug("Retrying failed containers", "count", len(failedContainers)) - for _, ctr := range failedContainers { + for i := range failedContainers { + ctr := failedContainers[i] dm.queue() go func() { defer dm.dequeue() @@ -164,8 +169,13 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo) error { stats.NetworkRecv = 0 // docker host container stats response - var res container.ApiStats - if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + // res := dm.getApiStats() + // defer dm.putApiStats(res) + // + + res := dm.apiStats + res.Networks = nil + if err := dm.decode(resp, res); err != nil { return err } @@ -173,9 +183,14 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo) error { var usedMemory uint64 var cpuPct float64 + // store current cpu stats + prevCpuContainer, prevCpuSystem := stats.CpuContainer, stats.CpuSystem + stats.CpuContainer = res.CPUStats.CPUUsage.TotalUsage + stats.CpuSystem = res.CPUStats.SystemUsage + if dm.isWindows { usedMemory = res.MemoryStats.PrivateWorkingSet - cpuPct = res.CalculateCpuPercentWindows(stats.PrevCpu[0], stats.PrevRead) + cpuPct = res.CalculateCpuPercentWindows(prevCpuContainer, stats.PrevReadTime) } else { // check if container has valid data, otherwise may be in restart loop (#103) if res.MemoryStats.Usage == 0 { @@ -187,13 +202,12 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo) error { } usedMemory = res.MemoryStats.Usage - memCache - cpuPct = res.CalculateCpuPercentLinux(stats.PrevCpu) + cpuPct = res.CalculateCpuPercentLinux(prevCpuContainer, prevCpuSystem) } if cpuPct > 100 { return fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct) } - stats.PrevCpu = [2]uint64{res.CPUStats.CPUUsage.TotalUsage, res.CPUStats.SystemUsage} // network var total_sent, total_recv uint64 @@ -201,21 +215,25 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo) error { total_sent += v.TxBytes total_recv += v.RxBytes } - var sent_delta, recv_delta float64 - // prevent first run from sending all prev sent/recv bytes - if initialized { - secondsElapsed := time.Since(stats.PrevRead).Seconds() - sent_delta = float64(total_sent-stats.PrevNet.Sent) / secondsElapsed - recv_delta = float64(total_recv-stats.PrevNet.Recv) / secondsElapsed + var sent_delta, recv_delta uint64 + millisecondsElapsed := uint64(time.Since(stats.PrevReadTime).Milliseconds()) + if initialized && millisecondsElapsed > 0 { + // get bytes per second + sent_delta = (total_sent - stats.PrevNet.Sent) * 1000 / millisecondsElapsed + recv_delta = (total_recv - stats.PrevNet.Recv) * 1000 / millisecondsElapsed + // check for unrealistic network values (> 5GB/s) + if sent_delta > 5e9 || recv_delta > 5e9 { + slog.Warn("Bad network delta", "container", name) + sent_delta, recv_delta = 0, 0 + } } - stats.PrevNet.Sent = total_sent - stats.PrevNet.Recv = total_recv + stats.PrevNet.Sent, stats.PrevNet.Recv = total_sent, total_recv stats.Cpu = twoDecimals(cpuPct) stats.Mem = bytesToMegabytes(float64(usedMemory)) - stats.NetworkSent = bytesToMegabytes(sent_delta) - stats.NetworkRecv = bytesToMegabytes(recv_delta) - stats.PrevRead = res.Read + stats.NetworkSent = bytesToMegabytes(float64(sent_delta)) + stats.NetworkRecv = bytesToMegabytes(float64(recv_delta)) + stats.PrevReadTime = res.Read return nil } @@ -231,7 +249,6 @@ func (dm *dockerManager) deleteContainerStatsSync(id string) { func newDockerManager(a *Agent) *dockerManager { dockerHost, exists := GetEnv("DOCKER_HOST") if exists { - slog.Info("DOCKER_HOST", "host", dockerHost) // return nil if set to empty string if dockerHost == "" { return nil @@ -242,7 +259,6 @@ func newDockerManager(a *Agent) *dockerManager { parsedURL, err := url.Parse(dockerHost) if err != nil { - slog.Error("Error parsing DOCKER_HOST", "err", err) os.Exit(1) } @@ -290,6 +306,7 @@ func newDockerManager(a *Agent) *dockerManager { containerStatsMap: make(map[string]*container.Stats), sem: make(chan struct{}, 5), apiContainerList: []*container.ApiInfo{}, + apiStats: &container.ApiStats{}, } // If using podman, return client @@ -308,9 +325,8 @@ func newDockerManager(a *Agent) *dockerManager { if err != nil { return manager } - defer resp.Body.Close() - if err := json.NewDecoder(resp.Body).Decode(&versionInfo); err != nil { + if err := manager.decode(resp, &versionInfo); err != nil { return manager } @@ -324,6 +340,22 @@ func newDockerManager(a *Agent) *dockerManager { return manager } +// Decodes Docker API JSON response using a reusable buffer and decoder. Not thread safe. +func (dm *dockerManager) decode(resp *http.Response, d any) error { + if dm.buf == nil { + // initialize buffer with 256kb starting size + dm.buf = bytes.NewBuffer(make([]byte, 0, 1024*256)) + dm.decoder = json.NewDecoder(dm.buf) + } + defer resp.Body.Close() + defer dm.buf.Reset() + _, err := dm.buf.ReadFrom(resp.Body) + if err != nil { + return err + } + return dm.decoder.Decode(d) +} + // Test docker / podman sockets and return if one exists func getDockerHost() string { scheme := "unix://"