Refactor dockerManager

- Introduced a reusable buffer and JSON decoder in dockerManager for efficient decoding of Docker API responses.
- Adjusted network statistics calculations to ensure accurate data handling.
This commit is contained in:
henrygd
2025-07-08 15:27:01 -04:00
parent 7b36036455
commit d67d638a6b

View File

@@ -2,6 +2,7 @@ package agent
import (
"beszel/internal/entities/container"
"bytes"
"context"
"encoding/json"
"fmt"
@@ -27,6 +28,9 @@ type dockerManager struct {
validIds map[string]struct{} // Map of valid container ids, used to prune invalid containers from containerStatsMap
goodDockerVersion bool // Whether docker version is at least 25.0.0 (one-shot works correctly)
isWindows bool // Whether the Docker Engine API is running on Windows
buf *bytes.Buffer // Buffer to store and read response bodies
decoder *json.Decoder // Reusable JSON decoder that reads from buf
apiStats *container.ApiStats // Reusable API stats object
}
// userAgentRoundTripper is a custom http.RoundTripper that adds a User-Agent header to all requests
@@ -63,10 +67,9 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) {
if err != nil {
return nil, err
}
defer resp.Body.Close()
dm.apiContainerList = dm.apiContainerList[:0]
if err := json.NewDecoder(resp.Body).Decode(&dm.apiContainerList); err != nil {
if err := dm.decode(resp, &dm.apiContainerList); err != nil {
return nil, err
}
@@ -83,7 +86,8 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) {
var failedContainers []*container.ApiInfo
for _, ctr := range dm.apiContainerList {
for i := range dm.apiContainerList {
ctr := dm.apiContainerList[i]
ctr.IdShort = ctr.Id[:12]
dm.validIds[ctr.IdShort] = struct{}{}
// check if container is less than 1 minute old (possible restart)
@@ -111,7 +115,8 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) {
// retry failed containers separately so we can run them in parallel (docker 24 bug)
if len(failedContainers) > 0 {
slog.Debug("Retrying failed containers", "count", len(failedContainers))
for _, ctr := range failedContainers {
for i := range failedContainers {
ctr := failedContainers[i]
dm.queue()
go func() {
defer dm.dequeue()
@@ -164,8 +169,13 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo) error {
stats.NetworkRecv = 0
// docker host container stats response
var res container.ApiStats
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
// res := dm.getApiStats()
// defer dm.putApiStats(res)
//
res := dm.apiStats
res.Networks = nil
if err := dm.decode(resp, res); err != nil {
return err
}
@@ -173,9 +183,14 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo) error {
var usedMemory uint64
var cpuPct float64
// store current cpu stats
prevCpuContainer, prevCpuSystem := stats.CpuContainer, stats.CpuSystem
stats.CpuContainer = res.CPUStats.CPUUsage.TotalUsage
stats.CpuSystem = res.CPUStats.SystemUsage
if dm.isWindows {
usedMemory = res.MemoryStats.PrivateWorkingSet
cpuPct = res.CalculateCpuPercentWindows(stats.PrevCpu[0], stats.PrevRead)
cpuPct = res.CalculateCpuPercentWindows(prevCpuContainer, stats.PrevReadTime)
} else {
// check if container has valid data, otherwise may be in restart loop (#103)
if res.MemoryStats.Usage == 0 {
@@ -187,13 +202,12 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo) error {
}
usedMemory = res.MemoryStats.Usage - memCache
cpuPct = res.CalculateCpuPercentLinux(stats.PrevCpu)
cpuPct = res.CalculateCpuPercentLinux(prevCpuContainer, prevCpuSystem)
}
if cpuPct > 100 {
return fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct)
}
stats.PrevCpu = [2]uint64{res.CPUStats.CPUUsage.TotalUsage, res.CPUStats.SystemUsage}
// network
var total_sent, total_recv uint64
@@ -201,21 +215,25 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo) error {
total_sent += v.TxBytes
total_recv += v.RxBytes
}
var sent_delta, recv_delta float64
// prevent first run from sending all prev sent/recv bytes
if initialized {
secondsElapsed := time.Since(stats.PrevRead).Seconds()
sent_delta = float64(total_sent-stats.PrevNet.Sent) / secondsElapsed
recv_delta = float64(total_recv-stats.PrevNet.Recv) / secondsElapsed
var sent_delta, recv_delta uint64
millisecondsElapsed := uint64(time.Since(stats.PrevReadTime).Milliseconds())
if initialized && millisecondsElapsed > 0 {
// get bytes per second
sent_delta = (total_sent - stats.PrevNet.Sent) * 1000 / millisecondsElapsed
recv_delta = (total_recv - stats.PrevNet.Recv) * 1000 / millisecondsElapsed
// check for unrealistic network values (> 5GB/s)
if sent_delta > 5e9 || recv_delta > 5e9 {
slog.Warn("Bad network delta", "container", name)
sent_delta, recv_delta = 0, 0
}
}
stats.PrevNet.Sent = total_sent
stats.PrevNet.Recv = total_recv
stats.PrevNet.Sent, stats.PrevNet.Recv = total_sent, total_recv
stats.Cpu = twoDecimals(cpuPct)
stats.Mem = bytesToMegabytes(float64(usedMemory))
stats.NetworkSent = bytesToMegabytes(sent_delta)
stats.NetworkRecv = bytesToMegabytes(recv_delta)
stats.PrevRead = res.Read
stats.NetworkSent = bytesToMegabytes(float64(sent_delta))
stats.NetworkRecv = bytesToMegabytes(float64(recv_delta))
stats.PrevReadTime = res.Read
return nil
}
@@ -231,7 +249,6 @@ func (dm *dockerManager) deleteContainerStatsSync(id string) {
func newDockerManager(a *Agent) *dockerManager {
dockerHost, exists := GetEnv("DOCKER_HOST")
if exists {
slog.Info("DOCKER_HOST", "host", dockerHost)
// return nil if set to empty string
if dockerHost == "" {
return nil
@@ -242,7 +259,6 @@ func newDockerManager(a *Agent) *dockerManager {
parsedURL, err := url.Parse(dockerHost)
if err != nil {
slog.Error("Error parsing DOCKER_HOST", "err", err)
os.Exit(1)
}
@@ -290,6 +306,7 @@ func newDockerManager(a *Agent) *dockerManager {
containerStatsMap: make(map[string]*container.Stats),
sem: make(chan struct{}, 5),
apiContainerList: []*container.ApiInfo{},
apiStats: &container.ApiStats{},
}
// If using podman, return client
@@ -308,9 +325,8 @@ func newDockerManager(a *Agent) *dockerManager {
if err != nil {
return manager
}
defer resp.Body.Close()
if err := json.NewDecoder(resp.Body).Decode(&versionInfo); err != nil {
if err := manager.decode(resp, &versionInfo); err != nil {
return manager
}
@@ -324,6 +340,22 @@ func newDockerManager(a *Agent) *dockerManager {
return manager
}
// Decodes Docker API JSON response using a reusable buffer and decoder. Not thread safe.
func (dm *dockerManager) decode(resp *http.Response, d any) error {
if dm.buf == nil {
// initialize buffer with 256kb starting size
dm.buf = bytes.NewBuffer(make([]byte, 0, 1024*256))
dm.decoder = json.NewDecoder(dm.buf)
}
defer resp.Body.Close()
defer dm.buf.Reset()
_, err := dm.buf.ReadFrom(resp.Body)
if err != nil {
return err
}
return dm.decoder.Decode(d)
}
// Test docker / podman sockets and return if one exists
func getDockerHost() string {
scheme := "unix://"