diff --git a/beszel/internal/agent/agent.go b/beszel/internal/agent/agent.go index d2b53ff..3c1b69c 100644 --- a/beszel/internal/agent/agent.go +++ b/beszel/internal/agent/agent.go @@ -2,40 +2,30 @@ package agent import ( - "beszel/internal/entities/container" "beszel/internal/entities/system" "context" "log/slog" - "net/http" "os" "strings" - "sync" "github.com/shirou/gopsutil/v4/common" ) type Agent struct { - debug bool // true if LOG_LEVEL is set to debug - fsNames []string // List of filesystem device names being monitored - fsStats map[string]*system.FsStats // Keeps track of disk stats for each filesystem - netInterfaces map[string]struct{} // Stores all valid network interfaces - netIoStats system.NetIoStats // Keeps track of bandwidth usage - containerStatsMap map[string]*container.Stats // Keeps track of container stats - containerStatsMutex sync.RWMutex // Mutex to prevent concurrent access to prevContainerStatsMap - dockerClient *http.Client // HTTP client to query docker api - apiContainerList *[]container.ApiInfo // List of containers from docker host - sensorsContext context.Context // Sensors context to override sys location - sensorsWhitelist map[string]struct{} // List of sensors to monitor - systemInfo system.Info // Host system info + debug bool // true if LOG_LEVEL is set to debug + fsNames []string // List of filesystem device names being monitored + fsStats map[string]*system.FsStats // Keeps track of disk stats for each filesystem + netInterfaces map[string]struct{} // Stores all valid network interfaces + netIoStats system.NetIoStats // Keeps track of bandwidth usage + dockerManager *dockerManager // Manages Docker API requests + sensorsContext context.Context // Sensors context to override sys location + sensorsWhitelist map[string]struct{} // List of sensors to monitor + systemInfo system.Info // Host system info } func NewAgent() *Agent { return &Agent{ - containerStatsMap: make(map[string]*container.Stats), - containerStatsMutex: sync.RWMutex{}, - netIoStats: system.NetIoStats{}, - dockerClient: newDockerClient(), - sensorsContext: context.Background(), + sensorsContext: context.Background(), } } @@ -72,6 +62,7 @@ func (a *Agent) Run(pubKey []byte, addr string) { a.initializeSystemInfo() a.initializeDiskInfo() a.initializeNetIoStats() + a.dockerManager = newDockerManager() a.startServer(pubKey, addr) } @@ -82,7 +73,7 @@ func (a *Agent) gatherStats() system.CombinedData { Info: a.systemInfo, } // add docker stats - if containerStats, err := a.getDockerStats(); err == nil { + if containerStats, err := a.dockerManager.getDockerStats(); err == nil { systemData.Containers = containerStats } else { slog.Debug("Error getting docker stats", "err", err) diff --git a/beszel/internal/agent/docker.go b/beszel/internal/agent/docker.go index bc08269..5c5925a 100644 --- a/beszel/internal/agent/docker.go +++ b/beszel/internal/agent/docker.go @@ -13,90 +13,110 @@ import ( "strings" "sync" "time" + + "github.com/blang/semver" ) +type dockerManager struct { + client *http.Client // Client to query Docker API + wg sync.WaitGroup // WaitGroup to wait for all goroutines to finish + sem chan struct{} // Semaphore to limit concurrent container requests + containerStatsMutex sync.RWMutex // Mutex to prevent concurrent access to containerStatsMap + apiContainerList *[]container.ApiInfo // List of containers from Docker API + containerStatsMap map[string]*container.Stats // Keeps track of container stats + validIds map[string]struct{} // Map of valid container ids, used to prune invalid containers from containerStatsMap +} + +// Add goroutine to the queue +func (d *dockerManager) queue() { + d.sem <- struct{}{} + d.wg.Add(1) +} + +// Remove goroutine from the queue +func (d *dockerManager) dequeue() { + <-d.sem + d.wg.Done() +} + // Returns stats for all running containers -func (a *Agent) getDockerStats() ([]*container.Stats, error) { - resp, err := a.dockerClient.Get("http://localhost/containers/json") +func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) { + resp, err := dm.client.Get("http://localhost/containers/json") if err != nil { - a.closeIdleConnections(err) return nil, err } defer resp.Body.Close() - if err := json.NewDecoder(resp.Body).Decode(&a.apiContainerList); err != nil { - slog.Error("Error decoding containers", "err", err) + if err := json.NewDecoder(resp.Body).Decode(&dm.apiContainerList); err != nil { return nil, err } - containersLength := len(*a.apiContainerList) - containerStats := make([]*container.Stats, containersLength) + containersLength := len(*dm.apiContainerList) // store valid ids to clean up old container ids from map - validIds := make(map[string]struct{}, containersLength) + if dm.validIds == nil { + dm.validIds = make(map[string]struct{}, containersLength) + } else { + clear(dm.validIds) + } - var wg sync.WaitGroup - - for i, ctr := range *a.apiContainerList { + for _, ctr := range *dm.apiContainerList { ctr.IdShort = ctr.Id[:12] - validIds[ctr.IdShort] = struct{}{} + dm.validIds[ctr.IdShort] = struct{}{} // check if container is less than 1 minute old (possible restart) // note: can't use Created field because it's not updated on restart if strings.Contains(ctr.Status, "second") { // if so, remove old container data - a.deleteContainerStatsSync(ctr.IdShort) + dm.deleteContainerStatsSync(ctr.IdShort) } - wg.Add(1) + dm.queue() go func() { - defer wg.Done() - stats, err := a.getContainerStats(ctr) + defer dm.dequeue() + err := dm.updateContainerStats(ctr) if err != nil { - // close idle connections if error is a network timeout - isTimeout := a.closeIdleConnections(err) - // delete container from map if not a timeout - if !isTimeout { - a.deleteContainerStatsSync(ctr.IdShort) - } + dm.deleteContainerStatsSync(ctr.IdShort) // retry once - stats, err = a.getContainerStats(ctr) + err = dm.updateContainerStats(ctr) if err != nil { slog.Error("Error getting container stats", "err", err) } } - containerStats[i] = stats }() } - wg.Wait() + dm.wg.Wait() - // remove old / invalid container stats - for id := range a.containerStatsMap { - if _, exists := validIds[id]; !exists { - delete(a.containerStatsMap, id) + // populate final stats and remove old / invalid container stats + stats := make([]*container.Stats, 0, containersLength) + for id, v := range dm.containerStatsMap { + if _, exists := dm.validIds[id]; !exists { + delete(dm.containerStatsMap, id) + } else { + stats = append(stats, v) } } - return containerStats, nil + return stats, nil } -// Returns stats for individual container -func (a *Agent) getContainerStats(ctr container.ApiInfo) (*container.Stats, error) { +// Updates stats for individual container +func (dm *dockerManager) updateContainerStats(ctr container.ApiInfo) error { name := ctr.Names[0][1:] - resp, err := a.dockerClient.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1") + resp, err := dm.client.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1") if err != nil { - return &container.Stats{Name: name}, err + return err } defer resp.Body.Close() - a.containerStatsMutex.Lock() - defer a.containerStatsMutex.Unlock() + dm.containerStatsMutex.Lock() + defer dm.containerStatsMutex.Unlock() // add empty values if they doesn't exist in map - stats, initialized := a.containerStatsMap[ctr.IdShort] + stats, initialized := dm.containerStatsMap[ctr.IdShort] if !initialized { stats = &container.Stats{Name: name} - a.containerStatsMap[ctr.IdShort] = stats + dm.containerStatsMap[ctr.IdShort] = stats } // reset current stats @@ -108,12 +128,12 @@ func (a *Agent) getContainerStats(ctr container.ApiInfo) (*container.Stats, erro // docker host container stats response var res container.ApiStats if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { - return stats, err + return err } // check if container has valid data, otherwise may be in restart loop (#103) if res.MemoryStats.Usage == 0 { - return stats, fmt.Errorf("%s - no memory stats - see https://github.com/henrygd/beszel/issues/144", name) + return fmt.Errorf("%s - no memory stats - see https://github.com/henrygd/beszel/issues/144", name) } // memory (https://docs.docker.com/reference/cli/docker/container/stats/) @@ -128,7 +148,7 @@ func (a *Agent) getContainerStats(ctr container.ApiInfo) (*container.Stats, erro systemDelta := res.CPUStats.SystemUsage - stats.PrevCpu[1] cpuPct := float64(cpuDelta) / float64(systemDelta) * 100 if cpuPct > 100 { - return stats, fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct) + return fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct) } stats.PrevCpu = [2]uint64{res.CPUStats.CPUUsage.TotalUsage, res.CPUStats.SystemUsage} @@ -154,11 +174,18 @@ func (a *Agent) getContainerStats(ctr container.ApiInfo) (*container.Stats, erro stats.NetworkSent = bytesToMegabytes(sent_delta) stats.NetworkRecv = bytesToMegabytes(recv_delta) - return stats, nil + return nil } -// Creates a new http client for docker api -func newDockerClient() *http.Client { +// Delete container stats from map using mutex +func (dm *dockerManager) deleteContainerStatsSync(id string) { + dm.containerStatsMutex.Lock() + defer dm.containerStatsMutex.Unlock() + delete(dm.containerStatsMap, id) +} + +// Creates a new http client for Docker API +func newDockerManager() *dockerManager { dockerHost := "unix:///var/run/docker.sock" if dockerHostEnv, exists := os.LookupEnv("DOCKER_HOST"); exists { slog.Info("DOCKER_HOST", "host", dockerHostEnv) @@ -172,12 +199,8 @@ func newDockerClient() *http.Client { } transport := &http.Transport{ - ForceAttemptHTTP2: false, - IdleConnTimeout: 90 * time.Second, - DisableCompression: true, - MaxConnsPerHost: 10, - MaxIdleConnsPerHost: 10, - DisableKeepAlives: false, + DisableCompression: true, + MaxConnsPerHost: 0, } switch parsedURL.Scheme { @@ -194,18 +217,37 @@ func newDockerClient() *http.Client { os.Exit(1) } - return &http.Client{ - Timeout: time.Second, - Transport: transport, + dockerClient := &dockerManager{ + client: &http.Client{ + Timeout: time.Millisecond * 1100, + Transport: transport, + }, + containerStatsMap: make(map[string]*container.Stats), } -} -// Closes idle connections on timeouts to prevent reuse of stale connections -func (a *Agent) closeIdleConnections(err error) (isTimeout bool) { - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - slog.Warn("Closing idle connections", "err", err) - a.dockerClient.Transport.(*http.Transport).CloseIdleConnections() - return true + // Make sure sem is initialized + concurrency := 200 + defer func() { dockerClient.sem = make(chan struct{}, concurrency) }() + + // Check docker version + // (versions before 25.0.0 have a bug with one-shot which requires all requests to be made in one batch) + var versionInfo struct { + Version string `json:"Version"` } - return false + resp, err := dockerClient.client.Get("http://localhost/version") + if err != nil { + return dockerClient + } + + if err := json.NewDecoder(resp.Body).Decode(&versionInfo); err != nil { + return dockerClient + } + + // if version > 25, one-shot works correctly and we can limit concurrent connections / goroutines to 5 + if dockerVersion, err := semver.Parse(versionInfo.Version); err == nil && dockerVersion.Major > 24 { + concurrency = 5 + } + slog.Debug("Docker", "version", versionInfo.Version, "concurrency", concurrency) + + return dockerClient } diff --git a/beszel/internal/agent/utils.go b/beszel/internal/agent/utils.go index 5a46080..d2688b1 100644 --- a/beszel/internal/agent/utils.go +++ b/beszel/internal/agent/utils.go @@ -2,13 +2,6 @@ package agent import "math" -// delete container stats from map using mutex -func (a *Agent) deleteContainerStatsSync(id string) { - a.containerStatsMutex.Lock() - defer a.containerStatsMutex.Unlock() - delete(a.containerStatsMap, id) -} - func bytesToMegabytes(b float64) float64 { return twoDecimals(b / 1048576) }