diff --git a/beszel/internal/agent/agent.go b/beszel/internal/agent/agent.go index 3c9af21..5f4ff0a 100644 --- a/beszel/internal/agent/agent.go +++ b/beszel/internal/agent/agent.go @@ -5,18 +5,12 @@ import ( "beszel" "beszel/internal/entities/container" "beszel/internal/entities/system" - "bytes" "context" "encoding/json" "fmt" - "io" "log" - "math" - "net" "net/http" - "net/url" "os" - "path/filepath" "strconv" "strings" "sync" @@ -29,7 +23,6 @@ import ( "github.com/shirou/gopsutil/v4/mem" "github.com/shirou/gopsutil/v4/sensors" - sshServer "github.com/gliderlabs/ssh" psutilNet "github.com/shirou/gopsutil/v4/net" ) @@ -60,14 +53,6 @@ func NewAgent(pubKey []byte, addr string) *Agent { } } -func (a *Agent) acquireSemaphore() { - a.sem <- struct{}{} -} - -func (a *Agent) releaseSemaphore() { - <-a.sem -} - func (a *Agent) getSystemStats() (system.Info, system.Stats) { systemStats := system.Stats{} @@ -367,13 +352,6 @@ func (a *Agent) getContainerStats(ctr container.ApiInfo) (container.Stats, error return cStats, nil } -// delete container stats from map using mutex -func (a *Agent) deleteContainerStatsSync(id string) { - a.containerStatsMutex.Lock() - defer a.containerStatsMutex.Unlock() - delete(a.containerStatsMap, id) -} - func (a *Agent) gatherStats() system.CombinedData { systemInfo, systemStats := a.getSystemStats() systemData := system.CombinedData{ @@ -395,31 +373,6 @@ func (a *Agent) gatherStats() system.CombinedData { return systemData } -func (a *Agent) startServer() { - sshServer.Handle(a.handleSession) - - log.Printf("Starting SSH server on %s", a.addr) - if err := sshServer.ListenAndServe(a.addr, nil, sshServer.NoPty(), - sshServer.PublicKeyAuth(func(ctx sshServer.Context, key sshServer.PublicKey) bool { - allowed, _, _, _, _ := sshServer.ParseAuthorizedKey(a.pubKey) - return sshServer.KeysEqual(key, allowed) - }), - ); err != nil { - log.Fatal(err) - } -} - -func (a *Agent) handleSession(s sshServer.Session) { - stats := a.gatherStats() - encoder := json.NewEncoder(s) - if err := encoder.Encode(stats); err != nil { - log.Println("Error encoding stats:", err.Error()) - s.Exit(1) - return - } - s.Exit(0) -} - func (a *Agent) Run() { a.fsStats = make(map[string]*system.FsStats) @@ -435,252 +388,5 @@ func (a *Agent) Run() { a.initializeDiskIoStats() a.initializeNetIoStats() - // log.Printf("Filesystems: %+v\n", a.fsStats) a.startServer() } - -// Sets up the filesystems to monitor for disk usage and I/O. -func (a *Agent) initializeDiskInfo() error { - filesystem := os.Getenv("FILESYSTEM") - hasRoot := false - - // add values from EXTRA_FILESYSTEMS env var to fsStats - if extraFilesystems, exists := os.LookupEnv("EXTRA_FILESYSTEMS"); exists { - for _, filesystem := range strings.Split(extraFilesystems, ",") { - a.fsStats[filepath.Base(filesystem)] = &system.FsStats{} - } - } - - partitions, err := disk.Partitions(false) - if err != nil { - return err - } - - // if FILESYSTEM env var is set, use it to find root filesystem - if filesystem != "" { - for _, v := range partitions { - // use filesystem env var if matching partition is found - if strings.HasSuffix(v.Device, filesystem) || v.Mountpoint == filesystem { - a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: v.Mountpoint} - hasRoot = true - break - } - } - if !hasRoot { - // if no match, log available partition details - log.Printf("Partition details not found for %s:\n", filesystem) - for _, v := range partitions { - fmt.Printf("%+v\n", v) - } - } - } - - for _, v := range partitions { - // binary root fallback - use root mountpoint - if !hasRoot && v.Mountpoint == "/" { - a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: "/"} - hasRoot = true - } - // docker root fallback - use /etc/hosts device if not mapped - if !hasRoot && v.Mountpoint == "/etc/hosts" && strings.HasPrefix(v.Device, "/dev") && !strings.Contains(v.Device, "mapper") { - a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: "/"} - hasRoot = true - } - // check if device is in /extra-filesystem - if strings.HasPrefix(v.Mountpoint, "/extra-filesystem") { - // add to fsStats if not already there - if _, exists := a.fsStats[filepath.Base(v.Device)]; !exists { - a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Mountpoint: v.Mountpoint} - } - continue - } - // set mountpoints for extra filesystems if passed in via env var - for name, stats := range a.fsStats { - if strings.HasSuffix(v.Device, name) { - stats.Mountpoint = v.Mountpoint - break - } - } - } - - // remove extra filesystems that don't have a mountpoint - for name, stats := range a.fsStats { - if stats.Root { - log.Println("Detected root fs:", name) - } - if stats.Mountpoint == "" { - log.Printf("Ignoring %s. No mountpoint found.\n", name) - delete(a.fsStats, name) - } - } - - // if no root filesystem set, use most read device in /proc/diskstats - if !hasRoot { - rootDevice := findFallbackIoDevice(filepath.Base(filesystem)) - log.Printf("Using / as mountpoint and %s for I/O\n", rootDevice) - a.fsStats[rootDevice] = &system.FsStats{Root: true, Mountpoint: "/"} - } - - return nil -} - -// Sets start values for disk I/O stats. -func (a *Agent) initializeDiskIoStats() { - // create slice of fs names to pass to disk.IOCounters - a.fsNames = make([]string, 0, len(a.fsStats)) - for name := range a.fsStats { - a.fsNames = append(a.fsNames, name) - } - - if ioCounters, err := disk.IOCounters(a.fsNames...); err == nil { - for _, d := range ioCounters { - if a.fsStats[d.Name] == nil { - continue - } - a.fsStats[d.Name].Time = time.Now() - a.fsStats[d.Name].TotalRead = d.ReadBytes - a.fsStats[d.Name].TotalWrite = d.WriteBytes - } - } -} - -func (a *Agent) initializeNetIoStats() { - // reset valid network interfaces - a.netInterfaces = make(map[string]struct{}, 0) - - // map of network interface names passed in via NICS env var - var nicsMap map[string]struct{} - nics, nicsEnvExists := os.LookupEnv("NICS") - if nicsEnvExists { - nicsMap = make(map[string]struct{}, 0) - for _, nic := range strings.Split(nics, ",") { - nicsMap[nic] = struct{}{} - } - } - - // reset network I/O stats - a.netIoStats.BytesSent = 0 - a.netIoStats.BytesRecv = 0 - - // get intial network I/O stats - if netIO, err := psutilNet.IOCounters(true); err == nil { - a.netIoStats.Time = time.Now() - for _, v := range netIO { - switch { - // skip if nics exists and the interface is not in the list - case nicsEnvExists: - if _, nameInNics := nicsMap[v.Name]; !nameInNics { - continue - } - // otherwise run the interface name through the skipNetworkInterface function - default: - if a.skipNetworkInterface(v) { - continue - } - } - log.Printf("Detected network interface: %+v (%+v recv, %+v sent)\n", v.Name, v.BytesRecv, v.BytesSent) - a.netIoStats.BytesSent += v.BytesSent - a.netIoStats.BytesRecv += v.BytesRecv - // store as a valid network interface - a.netInterfaces[v.Name] = struct{}{} - } - } -} - -func bytesToMegabytes(b float64) float64 { - return twoDecimals(b / 1048576) -} - -func bytesToGigabytes(b uint64) float64 { - return twoDecimals(float64(b) / 1073741824) -} - -func twoDecimals(value float64) float64 { - return math.Round(value*100) / 100 -} - -func (a *Agent) skipNetworkInterface(v psutilNet.IOCountersStat) bool { - switch { - case strings.HasPrefix(v.Name, "lo"), - strings.HasPrefix(v.Name, "docker"), - strings.HasPrefix(v.Name, "br-"), - strings.HasPrefix(v.Name, "veth"), - v.BytesRecv == 0, - v.BytesSent == 0: - return true - default: - return false - } -} - -func newDockerClient() *http.Client { - dockerHost := "unix:///var/run/docker.sock" - if dockerHostEnv, exists := os.LookupEnv("DOCKER_HOST"); exists { - dockerHost = dockerHostEnv - } - - parsedURL, err := url.Parse(dockerHost) - if err != nil { - log.Fatal("Error parsing DOCKER_HOST: " + err.Error()) - } - - transport := &http.Transport{ - ForceAttemptHTTP2: false, - IdleConnTimeout: 90 * time.Second, - DisableCompression: true, - MaxConnsPerHost: 20, - MaxIdleConnsPerHost: 20, - DisableKeepAlives: false, - } - - switch parsedURL.Scheme { - case "unix": - transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) { - return (&net.Dialer{}).DialContext(ctx, "unix", parsedURL.Path) - } - case "tcp", "http", "https": - log.Println("Using DOCKER_HOST: " + dockerHost) - transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) { - return (&net.Dialer{}).DialContext(ctx, "tcp", parsedURL.Host) - } - default: - log.Fatal("Unsupported DOCKER_HOST: " + parsedURL.Scheme) - } - - return &http.Client{ - Timeout: time.Second, - Transport: transport, - } -} - -// closes idle connections on timeouts to prevent reuse of stale connections -func (a *Agent) closeIdleConnections(err error) (isTimeout bool) { - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - log.Printf("Closing idle connections. Error: %+v\n", err) - a.dockerClient.Transport.(*http.Transport).CloseIdleConnections() - return true - } - return false -} - -// Returns the device with the most reads in /proc/diskstats, -// or the device specified by the filesystem argument if it exists -// (fallback in case the root device is not supplied or detected) -func findFallbackIoDevice(filesystem string) string { - var maxReadBytes uint64 - maxReadDevice := "/" - counters, err := disk.IOCounters() - if err != nil { - return maxReadDevice - } - for _, d := range counters { - if d.Name == filesystem { - return d.Name - } - if d.ReadBytes > maxReadBytes { - maxReadBytes = d.ReadBytes - maxReadDevice = d.Name - } - } - return maxReadDevice -} diff --git a/beszel/internal/agent/disk.go b/beszel/internal/agent/disk.go new file mode 100644 index 0000000..75e9cf8 --- /dev/null +++ b/beszel/internal/agent/disk.go @@ -0,0 +1,144 @@ +package agent + +import ( + "beszel/internal/entities/system" + "time" + + "fmt" + "log" + "os" + "path/filepath" + "strings" + + "github.com/shirou/gopsutil/v4/disk" +) + +// problem: device is in partitions, but not in io counters +// solution: if filesystem exists, always use for io counters, even if root is + +// Sets up the filesystems to monitor for disk usage and I/O. +func (a *Agent) initializeDiskInfo() error { + filesystem := os.Getenv("FILESYSTEM") + hasRoot := false + + // add values from EXTRA_FILESYSTEMS env var to fsStats + if extraFilesystems, exists := os.LookupEnv("EXTRA_FILESYSTEMS"); exists { + for _, filesystem := range strings.Split(extraFilesystems, ",") { + a.fsStats[filepath.Base(filesystem)] = &system.FsStats{} + } + } + + partitions, err := disk.Partitions(false) + if err != nil { + return err + } + + // if FILESYSTEM env var is set, use it to find root filesystem + if filesystem != "" { + for _, v := range partitions { + // use filesystem env var if matching partition is found + if strings.HasSuffix(v.Device, filesystem) || v.Mountpoint == filesystem { + a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: v.Mountpoint} + hasRoot = true + break + } + } + if !hasRoot { + // if no match, log available partition details + log.Printf("Partition details not found for %s:\n", filesystem) + for _, v := range partitions { + fmt.Printf("%+v\n", v) + } + } + } + + for _, v := range partitions { + // binary root fallback - use root mountpoint + if !hasRoot && v.Mountpoint == "/" { + a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: "/"} + hasRoot = true + } + // docker root fallback - use /etc/hosts device if not mapped + if !hasRoot && v.Mountpoint == "/etc/hosts" && strings.HasPrefix(v.Device, "/dev") && !strings.Contains(v.Device, "mapper") { + a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: "/"} + hasRoot = true + } + // check if device is in /extra-filesystem + if strings.HasPrefix(v.Mountpoint, "/extra-filesystem") { + // add to fsStats if not already there + if _, exists := a.fsStats[filepath.Base(v.Device)]; !exists { + a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Mountpoint: v.Mountpoint} + } + continue + } + // set mountpoints for extra filesystems if passed in via env var + for name, stats := range a.fsStats { + if strings.HasSuffix(v.Device, name) { + stats.Mountpoint = v.Mountpoint + break + } + } + } + + // remove extra filesystems that don't have a mountpoint + for name, stats := range a.fsStats { + if stats.Root { + log.Println("Detected root fs:", name) + } + if stats.Mountpoint == "" { + log.Printf("Ignoring %s. No mountpoint found.\n", name) + delete(a.fsStats, name) + } + } + + // if no root filesystem set, use most read device in /proc/diskstats + if !hasRoot { + rootDevice := findFallbackIoDevice(filepath.Base(filesystem)) + log.Printf("Using / as mountpoint and %s for I/O\n", rootDevice) + a.fsStats[rootDevice] = &system.FsStats{Root: true, Mountpoint: "/"} + } + + return nil +} + +// Returns the device with the most reads in /proc/diskstats, +// or the device specified by the filesystem argument if it exists +// (fallback in case the root device is not supplied or detected) +func findFallbackIoDevice(filesystem string) string { + var maxReadBytes uint64 + maxReadDevice := "/" + counters, err := disk.IOCounters() + if err != nil { + return maxReadDevice + } + for _, d := range counters { + if d.Name == filesystem { + return d.Name + } + if d.ReadBytes > maxReadBytes { + maxReadBytes = d.ReadBytes + maxReadDevice = d.Name + } + } + return maxReadDevice +} + +// Sets start values for disk I/O stats. +func (a *Agent) initializeDiskIoStats() { + // create slice of fs names to pass to disk.IOCounters + a.fsNames = make([]string, 0, len(a.fsStats)) + for name := range a.fsStats { + a.fsNames = append(a.fsNames, name) + } + + if ioCounters, err := disk.IOCounters(a.fsNames...); err == nil { + for _, d := range ioCounters { + if a.fsStats[d.Name] == nil { + continue + } + a.fsStats[d.Name].Time = time.Now() + a.fsStats[d.Name].TotalRead = d.ReadBytes + a.fsStats[d.Name].TotalWrite = d.WriteBytes + } + } +} diff --git a/beszel/internal/agent/network.go b/beszel/internal/agent/network.go new file mode 100644 index 0000000..b4daffe --- /dev/null +++ b/beszel/internal/agent/network.go @@ -0,0 +1,121 @@ +package agent + +import ( + "context" + "log" + "net" + "net/http" + "net/url" + "os" + "strings" + "time" + + psutilNet "github.com/shirou/gopsutil/v4/net" +) + +func (a *Agent) initializeNetIoStats() { + // reset valid network interfaces + a.netInterfaces = make(map[string]struct{}, 0) + + // map of network interface names passed in via NICS env var + var nicsMap map[string]struct{} + nics, nicsEnvExists := os.LookupEnv("NICS") + if nicsEnvExists { + nicsMap = make(map[string]struct{}, 0) + for _, nic := range strings.Split(nics, ",") { + nicsMap[nic] = struct{}{} + } + } + + // reset network I/O stats + a.netIoStats.BytesSent = 0 + a.netIoStats.BytesRecv = 0 + + // get intial network I/O stats + if netIO, err := psutilNet.IOCounters(true); err == nil { + a.netIoStats.Time = time.Now() + for _, v := range netIO { + switch { + // skip if nics exists and the interface is not in the list + case nicsEnvExists: + if _, nameInNics := nicsMap[v.Name]; !nameInNics { + continue + } + // otherwise run the interface name through the skipNetworkInterface function + default: + if a.skipNetworkInterface(v) { + continue + } + } + log.Printf("Detected network interface: %+v (%+v recv, %+v sent)\n", v.Name, v.BytesRecv, v.BytesSent) + a.netIoStats.BytesSent += v.BytesSent + a.netIoStats.BytesRecv += v.BytesRecv + // store as a valid network interface + a.netInterfaces[v.Name] = struct{}{} + } + } +} + +func (a *Agent) skipNetworkInterface(v psutilNet.IOCountersStat) bool { + switch { + case strings.HasPrefix(v.Name, "lo"), + strings.HasPrefix(v.Name, "docker"), + strings.HasPrefix(v.Name, "br-"), + strings.HasPrefix(v.Name, "veth"), + v.BytesRecv == 0, + v.BytesSent == 0: + return true + default: + return false + } +} + +func newDockerClient() *http.Client { + dockerHost := "unix:///var/run/docker.sock" + if dockerHostEnv, exists := os.LookupEnv("DOCKER_HOST"); exists { + dockerHost = dockerHostEnv + } + + parsedURL, err := url.Parse(dockerHost) + if err != nil { + log.Fatal("Error parsing DOCKER_HOST: " + err.Error()) + } + + transport := &http.Transport{ + ForceAttemptHTTP2: false, + IdleConnTimeout: 90 * time.Second, + DisableCompression: true, + MaxConnsPerHost: 20, + MaxIdleConnsPerHost: 20, + DisableKeepAlives: false, + } + + switch parsedURL.Scheme { + case "unix": + transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, "unix", parsedURL.Path) + } + case "tcp", "http", "https": + log.Println("Using DOCKER_HOST: " + dockerHost) + transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, "tcp", parsedURL.Host) + } + default: + log.Fatal("Unsupported DOCKER_HOST: " + parsedURL.Scheme) + } + + return &http.Client{ + Timeout: time.Second, + Transport: transport, + } +} + +// closes idle connections on timeouts to prevent reuse of stale connections +func (a *Agent) closeIdleConnections(err error) (isTimeout bool) { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + log.Printf("Closing idle connections. Error: %+v\n", err) + a.dockerClient.Transport.(*http.Transport).CloseIdleConnections() + return true + } + return false +} diff --git a/beszel/internal/agent/server.go b/beszel/internal/agent/server.go new file mode 100644 index 0000000..64ca9f3 --- /dev/null +++ b/beszel/internal/agent/server.go @@ -0,0 +1,33 @@ +package agent + +import ( + "encoding/json" + "log" + + sshServer "github.com/gliderlabs/ssh" +) + +func (a *Agent) startServer() { + sshServer.Handle(a.handleSession) + + log.Printf("Starting SSH server on %s", a.addr) + if err := sshServer.ListenAndServe(a.addr, nil, sshServer.NoPty(), + sshServer.PublicKeyAuth(func(ctx sshServer.Context, key sshServer.PublicKey) bool { + allowed, _, _, _, _ := sshServer.ParseAuthorizedKey(a.pubKey) + return sshServer.KeysEqual(key, allowed) + }), + ); err != nil { + log.Fatal(err) + } +} + +func (a *Agent) handleSession(s sshServer.Session) { + stats := a.gatherStats() + encoder := json.NewEncoder(s) + if err := encoder.Encode(stats); err != nil { + log.Println("Error encoding stats:", err.Error()) + s.Exit(1) + return + } + s.Exit(0) +} diff --git a/beszel/internal/agent/utils.go b/beszel/internal/agent/utils.go new file mode 100644 index 0000000..8d47efb --- /dev/null +++ b/beszel/internal/agent/utils.go @@ -0,0 +1,30 @@ +package agent + +import "math" + +func (a *Agent) acquireSemaphore() { + a.sem <- struct{}{} +} + +func (a *Agent) releaseSemaphore() { + <-a.sem +} + +// delete container stats from map using mutex +func (a *Agent) deleteContainerStatsSync(id string) { + a.containerStatsMutex.Lock() + defer a.containerStatsMutex.Unlock() + delete(a.containerStatsMap, id) +} + +func bytesToMegabytes(b float64) float64 { + return twoDecimals(b / 1048576) +} + +func bytesToGigabytes(b uint64) float64 { + return twoDecimals(float64(b) / 1073741824) +} + +func twoDecimals(value float64) float64 { + return math.Round(value*100) / 100 +}