add dockerManager / fix for Docker 24 and older

* dockerManager now handles all docker api interaction and container metrics tracking
* sets unlimited concurrency for docker 24 and older
This commit is contained in:
Henry Dollman
2024-10-02 19:45:26 -04:00
parent b9f142c28c
commit f051f6a5f8
3 changed files with 115 additions and 89 deletions

View File

@@ -2,40 +2,30 @@
package agent
import (
"beszel/internal/entities/container"
"beszel/internal/entities/system"
"context"
"log/slog"
"net/http"
"os"
"strings"
"sync"
"github.com/shirou/gopsutil/v4/common"
)
type Agent struct {
debug bool // true if LOG_LEVEL is set to debug
fsNames []string // List of filesystem device names being monitored
fsStats map[string]*system.FsStats // Keeps track of disk stats for each filesystem
netInterfaces map[string]struct{} // Stores all valid network interfaces
netIoStats system.NetIoStats // Keeps track of bandwidth usage
containerStatsMap map[string]*container.Stats // Keeps track of container stats
containerStatsMutex sync.RWMutex // Mutex to prevent concurrent access to prevContainerStatsMap
dockerClient *http.Client // HTTP client to query docker api
apiContainerList *[]container.ApiInfo // List of containers from docker host
sensorsContext context.Context // Sensors context to override sys location
sensorsWhitelist map[string]struct{} // List of sensors to monitor
systemInfo system.Info // Host system info
debug bool // true if LOG_LEVEL is set to debug
fsNames []string // List of filesystem device names being monitored
fsStats map[string]*system.FsStats // Keeps track of disk stats for each filesystem
netInterfaces map[string]struct{} // Stores all valid network interfaces
netIoStats system.NetIoStats // Keeps track of bandwidth usage
dockerManager *dockerManager // Manages Docker API requests
sensorsContext context.Context // Sensors context to override sys location
sensorsWhitelist map[string]struct{} // List of sensors to monitor
systemInfo system.Info // Host system info
}
func NewAgent() *Agent {
return &Agent{
containerStatsMap: make(map[string]*container.Stats),
containerStatsMutex: sync.RWMutex{},
netIoStats: system.NetIoStats{},
dockerClient: newDockerClient(),
sensorsContext: context.Background(),
sensorsContext: context.Background(),
}
}
@@ -72,6 +62,7 @@ func (a *Agent) Run(pubKey []byte, addr string) {
a.initializeSystemInfo()
a.initializeDiskInfo()
a.initializeNetIoStats()
a.dockerManager = newDockerManager()
a.startServer(pubKey, addr)
}
@@ -82,7 +73,7 @@ func (a *Agent) gatherStats() system.CombinedData {
Info: a.systemInfo,
}
// add docker stats
if containerStats, err := a.getDockerStats(); err == nil {
if containerStats, err := a.dockerManager.getDockerStats(); err == nil {
systemData.Containers = containerStats
} else {
slog.Debug("Error getting docker stats", "err", err)

View File

@@ -13,90 +13,110 @@ import (
"strings"
"sync"
"time"
"github.com/blang/semver"
)
type dockerManager struct {
client *http.Client // Client to query Docker API
wg sync.WaitGroup // WaitGroup to wait for all goroutines to finish
sem chan struct{} // Semaphore to limit concurrent container requests
containerStatsMutex sync.RWMutex // Mutex to prevent concurrent access to containerStatsMap
apiContainerList *[]container.ApiInfo // List of containers from Docker API
containerStatsMap map[string]*container.Stats // Keeps track of container stats
validIds map[string]struct{} // Map of valid container ids, used to prune invalid containers from containerStatsMap
}
// Add goroutine to the queue
func (d *dockerManager) queue() {
d.sem <- struct{}{}
d.wg.Add(1)
}
// Remove goroutine from the queue
func (d *dockerManager) dequeue() {
<-d.sem
d.wg.Done()
}
// Returns stats for all running containers
func (a *Agent) getDockerStats() ([]*container.Stats, error) {
resp, err := a.dockerClient.Get("http://localhost/containers/json")
func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) {
resp, err := dm.client.Get("http://localhost/containers/json")
if err != nil {
a.closeIdleConnections(err)
return nil, err
}
defer resp.Body.Close()
if err := json.NewDecoder(resp.Body).Decode(&a.apiContainerList); err != nil {
slog.Error("Error decoding containers", "err", err)
if err := json.NewDecoder(resp.Body).Decode(&dm.apiContainerList); err != nil {
return nil, err
}
containersLength := len(*a.apiContainerList)
containerStats := make([]*container.Stats, containersLength)
containersLength := len(*dm.apiContainerList)
// store valid ids to clean up old container ids from map
validIds := make(map[string]struct{}, containersLength)
if dm.validIds == nil {
dm.validIds = make(map[string]struct{}, containersLength)
} else {
clear(dm.validIds)
}
var wg sync.WaitGroup
for i, ctr := range *a.apiContainerList {
for _, ctr := range *dm.apiContainerList {
ctr.IdShort = ctr.Id[:12]
validIds[ctr.IdShort] = struct{}{}
dm.validIds[ctr.IdShort] = struct{}{}
// check if container is less than 1 minute old (possible restart)
// note: can't use Created field because it's not updated on restart
if strings.Contains(ctr.Status, "second") {
// if so, remove old container data
a.deleteContainerStatsSync(ctr.IdShort)
dm.deleteContainerStatsSync(ctr.IdShort)
}
wg.Add(1)
dm.queue()
go func() {
defer wg.Done()
stats, err := a.getContainerStats(ctr)
defer dm.dequeue()
err := dm.updateContainerStats(ctr)
if err != nil {
// close idle connections if error is a network timeout
isTimeout := a.closeIdleConnections(err)
// delete container from map if not a timeout
if !isTimeout {
a.deleteContainerStatsSync(ctr.IdShort)
}
dm.deleteContainerStatsSync(ctr.IdShort)
// retry once
stats, err = a.getContainerStats(ctr)
err = dm.updateContainerStats(ctr)
if err != nil {
slog.Error("Error getting container stats", "err", err)
}
}
containerStats[i] = stats
}()
}
wg.Wait()
dm.wg.Wait()
// remove old / invalid container stats
for id := range a.containerStatsMap {
if _, exists := validIds[id]; !exists {
delete(a.containerStatsMap, id)
// populate final stats and remove old / invalid container stats
stats := make([]*container.Stats, 0, containersLength)
for id, v := range dm.containerStatsMap {
if _, exists := dm.validIds[id]; !exists {
delete(dm.containerStatsMap, id)
} else {
stats = append(stats, v)
}
}
return containerStats, nil
return stats, nil
}
// Returns stats for individual container
func (a *Agent) getContainerStats(ctr container.ApiInfo) (*container.Stats, error) {
// Updates stats for individual container
func (dm *dockerManager) updateContainerStats(ctr container.ApiInfo) error {
name := ctr.Names[0][1:]
resp, err := a.dockerClient.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1")
resp, err := dm.client.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1")
if err != nil {
return &container.Stats{Name: name}, err
return err
}
defer resp.Body.Close()
a.containerStatsMutex.Lock()
defer a.containerStatsMutex.Unlock()
dm.containerStatsMutex.Lock()
defer dm.containerStatsMutex.Unlock()
// add empty values if they doesn't exist in map
stats, initialized := a.containerStatsMap[ctr.IdShort]
stats, initialized := dm.containerStatsMap[ctr.IdShort]
if !initialized {
stats = &container.Stats{Name: name}
a.containerStatsMap[ctr.IdShort] = stats
dm.containerStatsMap[ctr.IdShort] = stats
}
// reset current stats
@@ -108,12 +128,12 @@ func (a *Agent) getContainerStats(ctr container.ApiInfo) (*container.Stats, erro
// docker host container stats response
var res container.ApiStats
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
return stats, err
return err
}
// check if container has valid data, otherwise may be in restart loop (#103)
if res.MemoryStats.Usage == 0 {
return stats, fmt.Errorf("%s - no memory stats - see https://github.com/henrygd/beszel/issues/144", name)
return fmt.Errorf("%s - no memory stats - see https://github.com/henrygd/beszel/issues/144", name)
}
// memory (https://docs.docker.com/reference/cli/docker/container/stats/)
@@ -128,7 +148,7 @@ func (a *Agent) getContainerStats(ctr container.ApiInfo) (*container.Stats, erro
systemDelta := res.CPUStats.SystemUsage - stats.PrevCpu[1]
cpuPct := float64(cpuDelta) / float64(systemDelta) * 100
if cpuPct > 100 {
return stats, fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct)
return fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct)
}
stats.PrevCpu = [2]uint64{res.CPUStats.CPUUsage.TotalUsage, res.CPUStats.SystemUsage}
@@ -154,11 +174,18 @@ func (a *Agent) getContainerStats(ctr container.ApiInfo) (*container.Stats, erro
stats.NetworkSent = bytesToMegabytes(sent_delta)
stats.NetworkRecv = bytesToMegabytes(recv_delta)
return stats, nil
return nil
}
// Creates a new http client for docker api
func newDockerClient() *http.Client {
// Delete container stats from map using mutex
func (dm *dockerManager) deleteContainerStatsSync(id string) {
dm.containerStatsMutex.Lock()
defer dm.containerStatsMutex.Unlock()
delete(dm.containerStatsMap, id)
}
// Creates a new http client for Docker API
func newDockerManager() *dockerManager {
dockerHost := "unix:///var/run/docker.sock"
if dockerHostEnv, exists := os.LookupEnv("DOCKER_HOST"); exists {
slog.Info("DOCKER_HOST", "host", dockerHostEnv)
@@ -172,12 +199,8 @@ func newDockerClient() *http.Client {
}
transport := &http.Transport{
ForceAttemptHTTP2: false,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
MaxConnsPerHost: 10,
MaxIdleConnsPerHost: 10,
DisableKeepAlives: false,
DisableCompression: true,
MaxConnsPerHost: 0,
}
switch parsedURL.Scheme {
@@ -194,18 +217,37 @@ func newDockerClient() *http.Client {
os.Exit(1)
}
return &http.Client{
Timeout: time.Second,
Transport: transport,
dockerClient := &dockerManager{
client: &http.Client{
Timeout: time.Millisecond * 1100,
Transport: transport,
},
containerStatsMap: make(map[string]*container.Stats),
}
}
// Closes idle connections on timeouts to prevent reuse of stale connections
func (a *Agent) closeIdleConnections(err error) (isTimeout bool) {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
slog.Warn("Closing idle connections", "err", err)
a.dockerClient.Transport.(*http.Transport).CloseIdleConnections()
return true
// Make sure sem is initialized
concurrency := 200
defer func() { dockerClient.sem = make(chan struct{}, concurrency) }()
// Check docker version
// (versions before 25.0.0 have a bug with one-shot which requires all requests to be made in one batch)
var versionInfo struct {
Version string `json:"Version"`
}
return false
resp, err := dockerClient.client.Get("http://localhost/version")
if err != nil {
return dockerClient
}
if err := json.NewDecoder(resp.Body).Decode(&versionInfo); err != nil {
return dockerClient
}
// if version > 25, one-shot works correctly and we can limit concurrent connections / goroutines to 5
if dockerVersion, err := semver.Parse(versionInfo.Version); err == nil && dockerVersion.Major > 24 {
concurrency = 5
}
slog.Debug("Docker", "version", versionInfo.Version, "concurrency", concurrency)
return dockerClient
}

View File

@@ -2,13 +2,6 @@ package agent
import "math"
// delete container stats from map using mutex
func (a *Agent) deleteContainerStatsSync(id string) {
a.containerStatsMutex.Lock()
defer a.containerStatsMutex.Unlock()
delete(a.containerStatsMap, id)
}
func bytesToMegabytes(b float64) float64 {
return twoDecimals(b / 1048576)
}