use semaphore to limit concurrency in agent

subtract mem cache from container stats
This commit is contained in:
Henry Dollman
2024-07-23 22:40:39 -04:00
parent 76cfaaa179
commit 5e255f8f69

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"
sshServer "github.com/gliderlabs/ssh"
@@ -24,8 +25,17 @@ import (
var Version = "0.0.1-alpha.8"
var containerCpuMap = make(map[string][2]uint64)
var containerCpuMutex = &sync.Mutex{}
// var containerCpuMutex = &sync.Mutex{}
var sem = make(chan struct{}, 15)
func acquireSemaphore() {
sem <- struct{}{}
}
func releaseSemaphore() {
<-sem
}
var diskIoStats = DiskIoStats{
Read: 0,
@@ -48,11 +58,11 @@ var client = &http.Client{
Dial: func(proto, addr string) (net.Conn, error) {
return net.Dial("unix", "/var/run/docker.sock")
},
ForceAttemptHTTP2: false,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
MaxIdleConns: 10,
DisableKeepAlives: false,
ForceAttemptHTTP2: false,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
MaxIdleConnsPerHost: 50,
DisableKeepAlives: false,
},
}
@@ -156,27 +166,35 @@ func getDockerStats() ([]*ContainerStats, error) {
containerStats := make([]*ContainerStats, 0, len(containers))
// store valid ids to clean up old container ids from map
validIds := make(map[string]struct{}, len(containers))
var wg sync.WaitGroup
for _, ctr := range containers {
ctr.IdShort = ctr.ID[:12]
cstats, err := getContainerStats(ctr)
if err != nil {
// retry once
cstats, err = getContainerStats(ctr)
validIds[ctr.IdShort] = struct{}{}
wg.Add(1)
go func() {
defer wg.Done()
cstats, err := getContainerStats(ctr)
if err != nil {
log.Printf("Error getting container stats: %+v\n", err)
continue
// retry once
cstats, err = getContainerStats(ctr)
if err != nil {
log.Printf("Error getting container stats: %+v\n", err)
return
}
}
}
containerStats = append(containerStats, cstats)
containerStats = append(containerStats, cstats)
}()
}
// clean up old container ids from map
validIds := make(map[string]struct{}, len(containers))
for _, ctr := range containers {
validIds[ctr.IdShort] = struct{}{}
}
wg.Wait()
for id := range containerCpuMap {
if _, exists := validIds[id]; !exists {
// log.Printf("Removing container cpu map entry: %+v\n", id)
delete(containerCpuMap, id)
}
}
@@ -185,6 +203,9 @@ func getDockerStats() ([]*ContainerStats, error) {
}
func getContainerStats(ctr *Container) (*ContainerStats, error) {
// use semaphore to limit concurrency
acquireSemaphore()
defer releaseSemaphore()
resp, err := client.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1")
if err != nil {
return &ContainerStats{}, err
@@ -198,14 +219,18 @@ func getContainerStats(ctr *Container) (*ContainerStats, error) {
name := ctr.Names[0][1:]
// memory
usedMemory := statsJson.MemoryStats.Usage - statsJson.MemoryStats.Cache
// memory (https://docs.docker.com/reference/cli/docker/container/stats/)
memCache := statsJson.MemoryStats.Stats["inactive_file"]
if memCache == 0 {
memCache = statsJson.MemoryStats.Stats["cache"]
}
usedMemory := statsJson.MemoryStats.Usage - memCache
// pctMemory := float64(usedMemory) / float64(statsJson.MemoryStats.Limit) * 100
// cpu
// add default values to containerCpu if it doesn't exist
// containerCpuMutex.Lock()
// defer containerCpuMutex.Unlock()
containerCpuMutex.Lock()
defer containerCpuMutex.Unlock()
if _, ok := containerCpuMap[ctr.IdShort]; !ok {
containerCpuMap[ctr.IdShort] = [2]uint64{0, 0}
}