From 1cfd3cdd304fcc6dbaff838f7ec745d73f148c11 Mon Sep 17 00:00:00 2001 From: Henry Dollman Date: Sun, 1 Sep 2024 18:23:57 -0400 Subject: [PATCH] add support for multiple disks --- beszel/internal/agent/agent.go | 271 ++++++++++++++++------ beszel/internal/entities/system/system.go | 53 +++-- beszel/internal/records/records.go | 61 ++--- 3 files changed, 258 insertions(+), 127 deletions(-) diff --git a/beszel/internal/agent/agent.go b/beszel/internal/agent/agent.go index ce699e8..743dbe6 100644 --- a/beszel/internal/agent/agent.go +++ b/beszel/internal/agent/agent.go @@ -36,10 +36,10 @@ type Agent struct { sem chan struct{} containerStatsMap map[string]*container.PrevContainerStats containerStatsMutex *sync.Mutex - diskIoStats *system.DiskIoStats + fsNames []string + fsStats map[string]*system.FsStats netIoStats *system.NetIoStats dockerClient *http.Client - containerStatsPool *sync.Pool bufferPool *sync.Pool } @@ -50,14 +50,8 @@ func NewAgent(pubKey []byte, addr string) *Agent { sem: make(chan struct{}, 15), containerStatsMap: make(map[string]*container.PrevContainerStats), containerStatsMutex: &sync.Mutex{}, - diskIoStats: &system.DiskIoStats{}, netIoStats: &system.NetIoStats{}, dockerClient: newDockerClient(), - containerStatsPool: &sync.Pool{ - New: func() interface{} { - return new(container.Stats) - }, - }, bufferPool: &sync.Pool{ New: func() interface{} { return new(bytes.Buffer) @@ -74,8 +68,8 @@ func (a *Agent) releaseSemaphore() { <-a.sem } -func (a *Agent) getSystemStats() (*system.Info, *system.Stats) { - systemStats := &system.Stats{} +func (a *Agent) getSystemStats() (system.Info, system.Stats) { + systemStats := system.Stats{} // cpu percent cpuPct, err := cpu.Percent(0, false) @@ -96,25 +90,46 @@ func (a *Agent) getSystemStats() (*system.Info, *system.Stats) { } // disk usage - if d, err := disk.Usage("/"); err == nil { - systemStats.Disk = bytesToGigabytes(d.Total) - systemStats.DiskUsed = bytesToGigabytes(d.Used) - systemStats.DiskPct = twoDecimals(d.UsedPercent) + for _, stats := range a.fsStats { + // log.Println("Reading filesystem:", fs.Mountpoint) + if d, err := disk.Usage(stats.Mountpoint); err == nil { + stats.DiskTotal = bytesToGigabytes(d.Total) + stats.DiskUsed = bytesToGigabytes(d.Used) + if stats.Root { + systemStats.DiskTotal = bytesToGigabytes(d.Total) + systemStats.DiskUsed = bytesToGigabytes(d.Used) + systemStats.DiskPct = twoDecimals(d.UsedPercent) + } + } else { + // reset stats if error (likely unmounted) + log.Printf("Error reading %s: %+v\n", stats.Mountpoint, err) + stats.DiskTotal = 0 + stats.DiskUsed = 0 + stats.TotalRead = 0 + stats.TotalWrite = 0 + } } // disk i/o - if io, err := disk.IOCounters(a.diskIoStats.Filesystem); err == nil { - for _, d := range io { - // add to systemStats - secondsElapsed := time.Since(a.diskIoStats.Time).Seconds() - readPerSecond := float64(d.ReadBytes-a.diskIoStats.Read) / secondsElapsed - systemStats.DiskRead = bytesToMegabytes(readPerSecond) - writePerSecond := float64(d.WriteBytes-a.diskIoStats.Write) / secondsElapsed - systemStats.DiskWrite = bytesToMegabytes(writePerSecond) - // update diskIoStats - a.diskIoStats.Time = time.Now() - a.diskIoStats.Read = d.ReadBytes - a.diskIoStats.Write = d.WriteBytes + if ioCounters, err := disk.IOCounters(a.fsNames...); err == nil { + for _, d := range ioCounters { + stats := a.fsStats[d.Name] + if stats == nil { + continue + } + secondsElapsed := time.Since(stats.Time).Seconds() + readPerSecond := float64(d.ReadBytes-stats.TotalRead) / secondsElapsed + writePerSecond := float64(d.WriteBytes-stats.TotalWrite) / secondsElapsed + stats.Time = time.Now() + stats.DiskReadPs = bytesToMegabytes(readPerSecond) + stats.DiskWritePs = bytesToMegabytes(writePerSecond) + stats.TotalRead = d.ReadBytes + stats.TotalWrite = d.WriteBytes + // if root filesystem, update system stats + if stats.Root { + systemStats.DiskReadPs = stats.DiskReadPs + systemStats.DiskWritePs = stats.DiskWritePs + } } } @@ -157,7 +172,7 @@ func (a *Agent) getSystemStats() (*system.Info, *system.Stats) { // log.Printf("Temperature map: %+v\n", systemStats.Temperatures) } - systemInfo := &system.Info{ + systemInfo := system.Info{ Cpu: systemStats.Cpu, MemPct: systemStats.MemPct, DiskPct: systemStats.DiskPct, @@ -183,7 +198,7 @@ func (a *Agent) getSystemStats() (*system.Info, *system.Stats) { return systemInfo, systemStats } -func (a *Agent) getDockerStats() ([]*container.Stats, error) { +func (a *Agent) getDockerStats() ([]container.Stats, error) { resp, err := a.dockerClient.Get("http://localhost/containers/json") if err != nil { a.closeIdleConnections(err) @@ -191,13 +206,14 @@ func (a *Agent) getDockerStats() ([]*container.Stats, error) { } defer resp.Body.Close() - var containers []*container.ApiInfo + var containers []container.ApiInfo if err := json.NewDecoder(resp.Body).Decode(&containers); err != nil { log.Printf("Error decoding containers: %+v\n", err) return nil, err } - containerStats := make([]*container.Stats, 0, len(containers)) + containerStats := make([]container.Stats, 0, len(containers)) + containerStatsMutex := sync.Mutex{} // store valid ids to clean up old container ids from map validIds := make(map[string]struct{}, len(containers)) @@ -214,7 +230,9 @@ func (a *Agent) getDockerStats() ([]*container.Stats, error) { a.deleteContainerStatsSync(ctr.IdShort) } wg.Add(1) + a.acquireSemaphore() go func() { + defer a.releaseSemaphore() defer wg.Done() cstats, err := a.getContainerStats(ctr) if err != nil { @@ -231,6 +249,8 @@ func (a *Agent) getDockerStats() ([]*container.Stats, error) { return } } + containerStatsMutex.Lock() + defer containerStatsMutex.Unlock() containerStats = append(containerStats, cstats) }() } @@ -247,38 +267,35 @@ func (a *Agent) getDockerStats() ([]*container.Stats, error) { return containerStats, nil } -func (a *Agent) getContainerStats(ctr *container.ApiInfo) (*container.Stats, error) { - // use semaphore to limit concurrency - a.acquireSemaphore() - defer a.releaseSemaphore() +func (a *Agent) getContainerStats(ctr container.ApiInfo) (container.Stats, error) { + cStats := container.Stats{} resp, err := a.dockerClient.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1") if err != nil { - return nil, err + return cStats, err } defer resp.Body.Close() - // get a buffer from the pool + // use a pooled buffer to store the response body buf := a.bufferPool.Get().(*bytes.Buffer) defer a.bufferPool.Put(buf) buf.Reset() - // read the response body into the buffer _, err = io.Copy(buf, resp.Body) if err != nil { - return nil, err + return cStats, err } // unmarshal the json data from the buffer var statsJson container.ApiStats if err := json.Unmarshal(buf.Bytes(), &statsJson); err != nil { - return nil, err + return cStats, err } name := ctr.Names[0][1:] // check if container has valid data, otherwise may be in restart loop (#103) if statsJson.MemoryStats.Usage == 0 { - return nil, fmt.Errorf("%s - invalid data", name) + return cStats, fmt.Errorf("%s - invalid data", name) } // memory (https://docs.docker.com/reference/cli/docker/container/stats/) @@ -303,7 +320,7 @@ func (a *Agent) getContainerStats(ctr *container.ApiInfo) (*container.Stats, err systemDelta := statsJson.CPUStats.SystemUsage - stats.Cpu[1] cpuPct := float64(cpuDelta) / float64(systemDelta) * 100 if cpuPct > 100 { - return nil, fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct) + return cStats, fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct) } stats.Cpu = [2]uint64{statsJson.CPUStats.CPUUsage.TotalUsage, statsJson.CPUStats.SystemUsage} @@ -325,7 +342,7 @@ func (a *Agent) getContainerStats(ctr *container.ApiInfo) (*container.Stats, err stats.Net.Recv = total_recv stats.Net.Time = time.Now() - cStats := a.containerStatsPool.Get().(*container.Stats) + // cStats := a.containerStatsPool.Get().(*container.Stats) cStats.Name = name cStats.Cpu = twoDecimals(cpuPct) cStats.Mem = bytesToMegabytes(float64(usedMemory)) @@ -342,24 +359,25 @@ func (a *Agent) deleteContainerStatsSync(id string) { delete(a.containerStatsMap, id) } -func (a *Agent) gatherStats() *system.CombinedData { +func (a *Agent) gatherStats() system.CombinedData { systemInfo, systemStats := a.getSystemStats() - systemData := &system.CombinedData{ + systemData := system.CombinedData{ Stats: systemStats, Info: systemInfo, } + // add docker stats if containerStats, err := a.getDockerStats(); err == nil { systemData.Containers = containerStats } - // fmt.Printf("%+v\n", systemData) - return systemData -} - -// return container stats to pool -func (a *Agent) returnStatsToPool(containerStats []*container.Stats) { - for _, stats := range containerStats { - a.containerStatsPool.Put(stats) + // add extra filesystems + systemData.Stats.ExtraFs = make(map[string]*system.FsStats) + for name, stats := range a.fsStats { + if !stats.Root && stats.DiskTotal > 0 { + systemData.Stats.ExtraFs[name] = stats + } } + // log.Printf("%+v\n", systemData) + return systemData } func (a *Agent) startServer() { @@ -378,7 +396,6 @@ func (a *Agent) startServer() { func (a *Agent) handleSession(s sshServer.Session) { stats := a.gatherStats() - defer a.returnStatsToPool(stats.Containers) encoder := json.NewEncoder(s) if err := encoder.Encode(stats); err != nil { log.Println("Error encoding stats:", err.Error()) @@ -389,24 +406,103 @@ func (a *Agent) handleSession(s sshServer.Session) { } func (a *Agent) Run() { - if filesystem, exists := os.LookupEnv("FILESYSTEM"); exists { - a.diskIoStats.Filesystem = filesystem - } else { - a.diskIoStats.Filesystem = findDefaultFilesystem() + a.fsStats = make(map[string]*system.FsStats) + + filesystem, fsEnvVarExists := os.LookupEnv("FILESYSTEM") + if fsEnvVarExists { + a.fsStats[filesystem] = &system.FsStats{Root: true, Mountpoint: "/"} } + if extraFilesystems, exists := os.LookupEnv("EXTRA_FILESYSTEMS"); exists { + // parse comma separated list of filesystems + for _, filesystem := range strings.Split(extraFilesystems, ",") { + a.fsStats[filesystem] = &system.FsStats{} + } + } + + a.initializeDiskInfo(fsEnvVarExists) a.initializeDiskIoStats() a.initializeNetIoStats() + // log.Printf("Filesystems: %+v\n", a.fsStats) a.startServer() } +// Sets up the filesystems to monitor for disk usage and I/O. +func (a *Agent) initializeDiskInfo(fsEnvVarExists bool) error { + partitions, err := disk.Partitions(false) + if err != nil { + return err + } + + // log.Printf("Partitions: %+v\n", partitions) + for _, v := range partitions { + // binary - use root mountpoint if not already set by env var + if !fsEnvVarExists && v.Mountpoint == "/" { + a.fsStats[v.Device] = &system.FsStats{Root: true, Mountpoint: "/"} + } + // docker - use /etc/hosts device as root if not mapped + if !fsEnvVarExists && v.Mountpoint == "/etc/hosts" && strings.HasPrefix(v.Device, "/dev") && !strings.Contains(v.Device, "mapper") { + a.fsStats[v.Device] = &system.FsStats{Root: true, Mountpoint: "/"} + } + // check if device is in /extra-filesystem + if strings.HasPrefix(v.Mountpoint, "/extra-filesystem") { + // todo: may be able to tweak this to be able to mount custom root at /extra-filesystems/root + a.fsStats[v.Device] = &system.FsStats{Mountpoint: v.Mountpoint} + continue + } + // set mountpoints for extra filesystems if passed in via env var + for name, stats := range a.fsStats { + if strings.HasSuffix(v.Device, name) { + stats.Mountpoint = v.Mountpoint + break + } + } + } + + // remove extra filesystems that don't have a mountpoint + hasRoot := false + for name, stats := range a.fsStats { + if stats.Root { + hasRoot = true + log.Println("Detected root fs:", name) + } + if stats.Mountpoint == "" { + log.Printf("Ignoring %s. No mountpoint found.\n", name) + delete(a.fsStats, name) + } + } + + // if no root filesystem set, use most read device in /proc/diskstats + if !hasRoot { + rootDevice := findMaxReadsDevice() + log.Printf("Detected root fs: %+s\n", rootDevice) + a.fsStats[rootDevice] = &system.FsStats{Root: true, Mountpoint: "/"} + } + + return nil +} + +// Sets start values for disk I/O stats. func (a *Agent) initializeDiskIoStats() { - if io, err := disk.IOCounters(a.diskIoStats.Filesystem); err == nil { - for _, d := range io { - a.diskIoStats.Time = time.Now() - a.diskIoStats.Read = d.ReadBytes - a.diskIoStats.Write = d.WriteBytes + // create slice of fs names to pass to disk.IOCounters later + a.fsNames = make([]string, 0, len(a.fsStats)) + + for name, stats := range a.fsStats { + if io, err := disk.IOCounters(name); err == nil { + for _, d := range io { + // add name to slice + a.fsNames = append(a.fsNames, d.Name) + // normalize name with io counters + if name != d.Name { + // log.Println("Normalizing disk I/O stats:", name, d.Name) + a.fsStats[d.Name] = stats + delete(a.fsStats, name) + } + stats.Time = time.Now() + stats.TotalRead = d.ReadBytes + stats.TotalWrite = d.WriteBytes + } } } } @@ -419,7 +515,7 @@ func (a *Agent) initializeNetIoStats() { if skipNetworkInterface(&v) { continue } - log.Printf("Found network interface: %+v (%+v recv, %+v sent)\n", v.Name, v.BytesRecv, v.BytesSent) + log.Printf("Detected network interface: %+v (%+v recv, %+v sent)\n", v.Name, v.BytesRecv, v.BytesSent) bytesSent += v.BytesSent bytesRecv += v.BytesRecv } @@ -441,18 +537,6 @@ func twoDecimals(value float64) float64 { return math.Round(value*100) / 100 } -func findDefaultFilesystem() string { - if partitions, err := disk.Partitions(false); err == nil { - for _, v := range partitions { - if v.Mountpoint == "/" { - log.Printf("Using filesystem: %+v\n", v.Device) - return v.Device - } - } - } - return "" -} - func skipNetworkInterface(v *psutilNet.IOCountersStat) bool { switch { case strings.HasPrefix(v.Name, "lo"), @@ -516,3 +600,40 @@ func (a *Agent) closeIdleConnections(err error) (isTimeout bool) { } return false } + +// Returns the device with the most reads in /proc/diskstats +// (fallback in case the root device is not supplied or detected) +func findMaxReadsDevice() string { + content, err := os.ReadFile("/proc/diskstats") + if err != nil { + return "/" + } + + lines := strings.Split(string(content), "\n") + var maxReadsSectors int64 + var maxReadsDevice string + + for _, line := range lines { + fields := strings.Fields(line) + if len(fields) < 7 { + continue + } + + deviceName := fields[2] + readsSectors, err := strconv.ParseInt(fields[5], 10, 64) + if err != nil { + continue + } + + if readsSectors > maxReadsSectors { + maxReadsSectors = readsSectors + maxReadsDevice = deviceName + } + } + + if maxReadsDevice == "" { + return "/" + } + + return maxReadsDevice +} diff --git a/beszel/internal/entities/system/system.go b/beszel/internal/entities/system/system.go index 2284dd0..d36a915 100644 --- a/beszel/internal/entities/system/system.go +++ b/beszel/internal/entities/system/system.go @@ -6,28 +6,35 @@ import ( ) type Stats struct { - Cpu float64 `json:"cpu"` - Mem float64 `json:"m"` - MemUsed float64 `json:"mu"` - MemPct float64 `json:"mp"` - MemBuffCache float64 `json:"mb"` - Swap float64 `json:"s"` - SwapUsed float64 `json:"su"` - Disk float64 `json:"d"` - DiskUsed float64 `json:"du"` - DiskPct float64 `json:"dp"` - DiskRead float64 `json:"dr"` - DiskWrite float64 `json:"dw"` - NetworkSent float64 `json:"ns"` - NetworkRecv float64 `json:"nr"` - Temperatures map[string]float64 `json:"t,omitempty"` + Cpu float64 `json:"cpu"` + Mem float64 `json:"m"` + MemUsed float64 `json:"mu"` + MemPct float64 `json:"mp"` + MemBuffCache float64 `json:"mb"` + Swap float64 `json:"s,omitempty"` + SwapUsed float64 `json:"su,omitempty"` + DiskTotal float64 `json:"d"` + DiskUsed float64 `json:"du"` + DiskPct float64 `json:"dp"` + DiskReadPs float64 `json:"dr"` + DiskWritePs float64 `json:"dw"` + NetworkSent float64 `json:"ns"` + NetworkRecv float64 `json:"nr"` + Temperatures map[string]float64 `json:"t,omitempty"` + ExtraFs map[string]*FsStats `json:"efs,omitempty"` } -type DiskIoStats struct { - Read uint64 - Write uint64 - Time time.Time - Filesystem string +type FsStats struct { + Time time.Time `json:"-"` + Device string `json:"-"` + Root bool `json:"-"` + Mountpoint string `json:"-"` + DiskTotal float64 `json:"d"` + DiskUsed float64 `json:"du"` + TotalRead uint64 `json:"-"` + TotalWrite uint64 `json:"-"` + DiskWritePs float64 `json:"w"` + DiskReadPs float64 `json:"r"` } type NetIoStats struct { @@ -51,7 +58,7 @@ type Info struct { // Final data structure to return to the hub type CombinedData struct { - Stats *Stats `json:"stats"` - Info *Info `json:"info"` - Containers []*container.Stats `json:"container"` + Stats Stats `json:"stats"` + Info Info `json:"info"` + Containers []container.Stats `json:"container"` } diff --git a/beszel/internal/records/records.go b/beszel/internal/records/records.go index dcff1fd..36a6312 100644 --- a/beszel/internal/records/records.go +++ b/beszel/internal/records/records.go @@ -140,33 +140,11 @@ func (rm *RecordManager) CreateLongerRecords() { // log.Println("finished creating longer records", "time (ms)", time.Since(start).Milliseconds()) } -// Calculate the average stats of a list of system_stats records with reflect -// func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Stats { -// count := float64(len(records)) -// sum := reflect.New(reflect.TypeOf(system.Stats{})).Elem() - -// var stats system.Stats -// for _, record := range records { -// record.UnmarshalJSONField("stats", &stats) -// statValue := reflect.ValueOf(stats) -// for i := 0; i < statValue.NumField(); i++ { -// field := sum.Field(i) -// field.SetFloat(field.Float() + statValue.Field(i).Float()) -// } -// } - -// average := reflect.New(reflect.TypeOf(system.Stats{})).Elem() -// for i := 0; i < sum.NumField(); i++ { -// average.Field(i).SetFloat(twoDecimals(sum.Field(i).Float() / count)) -// } - -// return average.Interface().(system.Stats) -// } - // Calculate the average stats of a list of system_stats records without reflect func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Stats { var sum system.Stats sum.Temperatures = make(map[string]float64) + sum.ExtraFs = make(map[string]*system.FsStats) count := float64(len(records)) // use different counter for temps in case some records don't have them @@ -182,13 +160,14 @@ func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Sta sum.MemBuffCache += stats.MemBuffCache sum.Swap += stats.Swap sum.SwapUsed += stats.SwapUsed - sum.Disk += stats.Disk + sum.DiskTotal += stats.DiskTotal sum.DiskUsed += stats.DiskUsed sum.DiskPct += stats.DiskPct - sum.DiskRead += stats.DiskRead - sum.DiskWrite += stats.DiskWrite + sum.DiskReadPs += stats.DiskReadPs + sum.DiskWritePs += stats.DiskWritePs sum.NetworkSent += stats.NetworkSent sum.NetworkRecv += stats.NetworkRecv + // add temps to sum if stats.Temperatures != nil { tempCount++ for key, value := range stats.Temperatures { @@ -198,6 +177,18 @@ func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Sta sum.Temperatures[key] += value } } + // add extra fs to sum + if stats.ExtraFs != nil { + for key, value := range stats.ExtraFs { + if _, ok := sum.ExtraFs[key]; !ok { + sum.ExtraFs[key] = &system.FsStats{} + } + sum.ExtraFs[key].DiskTotal += value.DiskTotal + sum.ExtraFs[key].DiskUsed += value.DiskUsed + sum.ExtraFs[key].DiskWritePs += value.DiskWritePs + sum.ExtraFs[key].DiskReadPs += value.DiskReadPs + } + } } stats = system.Stats{ @@ -208,11 +199,11 @@ func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Sta MemBuffCache: twoDecimals(sum.MemBuffCache / count), Swap: twoDecimals(sum.Swap / count), SwapUsed: twoDecimals(sum.SwapUsed / count), - Disk: twoDecimals(sum.Disk / count), + DiskTotal: twoDecimals(sum.DiskTotal / count), DiskUsed: twoDecimals(sum.DiskUsed / count), DiskPct: twoDecimals(sum.DiskPct / count), - DiskRead: twoDecimals(sum.DiskRead / count), - DiskWrite: twoDecimals(sum.DiskWrite / count), + DiskReadPs: twoDecimals(sum.DiskReadPs / count), + DiskWritePs: twoDecimals(sum.DiskWritePs / count), NetworkSent: twoDecimals(sum.NetworkSent / count), NetworkRecv: twoDecimals(sum.NetworkRecv / count), } @@ -224,6 +215,18 @@ func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Sta } } + if len(sum.ExtraFs) != 0 { + stats.ExtraFs = make(map[string]*system.FsStats) + for key, value := range sum.ExtraFs { + stats.ExtraFs[key] = &system.FsStats{ + DiskTotal: twoDecimals(value.DiskTotal / count), + DiskUsed: twoDecimals(value.DiskUsed / count), + DiskWritePs: twoDecimals(value.DiskWritePs / count), + DiskReadPs: twoDecimals(value.DiskReadPs / count), + } + } + } + return stats }