reorganize agent package

This commit is contained in:
Henry Dollman
2024-09-26 15:08:26 -04:00
parent e88e2bf3dc
commit 06b1c2200b
5 changed files with 328 additions and 294 deletions

View File

@@ -5,18 +5,12 @@ import (
"beszel"
"beszel/internal/entities/container"
"beszel/internal/entities/system"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"math"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
@@ -29,7 +23,6 @@ import (
"github.com/shirou/gopsutil/v4/mem"
"github.com/shirou/gopsutil/v4/sensors"
sshServer "github.com/gliderlabs/ssh"
psutilNet "github.com/shirou/gopsutil/v4/net"
)
@@ -60,14 +53,6 @@ func NewAgent(pubKey []byte, addr string) *Agent {
}
}
func (a *Agent) acquireSemaphore() {
a.sem <- struct{}{}
}
func (a *Agent) releaseSemaphore() {
<-a.sem
}
func (a *Agent) getSystemStats() (system.Info, system.Stats) {
systemStats := system.Stats{}
@@ -367,13 +352,6 @@ func (a *Agent) getContainerStats(ctr container.ApiInfo) (container.Stats, error
return cStats, nil
}
// delete container stats from map using mutex
func (a *Agent) deleteContainerStatsSync(id string) {
a.containerStatsMutex.Lock()
defer a.containerStatsMutex.Unlock()
delete(a.containerStatsMap, id)
}
func (a *Agent) gatherStats() system.CombinedData {
systemInfo, systemStats := a.getSystemStats()
systemData := system.CombinedData{
@@ -395,31 +373,6 @@ func (a *Agent) gatherStats() system.CombinedData {
return systemData
}
func (a *Agent) startServer() {
sshServer.Handle(a.handleSession)
log.Printf("Starting SSH server on %s", a.addr)
if err := sshServer.ListenAndServe(a.addr, nil, sshServer.NoPty(),
sshServer.PublicKeyAuth(func(ctx sshServer.Context, key sshServer.PublicKey) bool {
allowed, _, _, _, _ := sshServer.ParseAuthorizedKey(a.pubKey)
return sshServer.KeysEqual(key, allowed)
}),
); err != nil {
log.Fatal(err)
}
}
func (a *Agent) handleSession(s sshServer.Session) {
stats := a.gatherStats()
encoder := json.NewEncoder(s)
if err := encoder.Encode(stats); err != nil {
log.Println("Error encoding stats:", err.Error())
s.Exit(1)
return
}
s.Exit(0)
}
func (a *Agent) Run() {
a.fsStats = make(map[string]*system.FsStats)
@@ -435,252 +388,5 @@ func (a *Agent) Run() {
a.initializeDiskIoStats()
a.initializeNetIoStats()
// log.Printf("Filesystems: %+v\n", a.fsStats)
a.startServer()
}
// Sets up the filesystems to monitor for disk usage and I/O.
func (a *Agent) initializeDiskInfo() error {
filesystem := os.Getenv("FILESYSTEM")
hasRoot := false
// add values from EXTRA_FILESYSTEMS env var to fsStats
if extraFilesystems, exists := os.LookupEnv("EXTRA_FILESYSTEMS"); exists {
for _, filesystem := range strings.Split(extraFilesystems, ",") {
a.fsStats[filepath.Base(filesystem)] = &system.FsStats{}
}
}
partitions, err := disk.Partitions(false)
if err != nil {
return err
}
// if FILESYSTEM env var is set, use it to find root filesystem
if filesystem != "" {
for _, v := range partitions {
// use filesystem env var if matching partition is found
if strings.HasSuffix(v.Device, filesystem) || v.Mountpoint == filesystem {
a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: v.Mountpoint}
hasRoot = true
break
}
}
if !hasRoot {
// if no match, log available partition details
log.Printf("Partition details not found for %s:\n", filesystem)
for _, v := range partitions {
fmt.Printf("%+v\n", v)
}
}
}
for _, v := range partitions {
// binary root fallback - use root mountpoint
if !hasRoot && v.Mountpoint == "/" {
a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: "/"}
hasRoot = true
}
// docker root fallback - use /etc/hosts device if not mapped
if !hasRoot && v.Mountpoint == "/etc/hosts" && strings.HasPrefix(v.Device, "/dev") && !strings.Contains(v.Device, "mapper") {
a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: "/"}
hasRoot = true
}
// check if device is in /extra-filesystem
if strings.HasPrefix(v.Mountpoint, "/extra-filesystem") {
// add to fsStats if not already there
if _, exists := a.fsStats[filepath.Base(v.Device)]; !exists {
a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Mountpoint: v.Mountpoint}
}
continue
}
// set mountpoints for extra filesystems if passed in via env var
for name, stats := range a.fsStats {
if strings.HasSuffix(v.Device, name) {
stats.Mountpoint = v.Mountpoint
break
}
}
}
// remove extra filesystems that don't have a mountpoint
for name, stats := range a.fsStats {
if stats.Root {
log.Println("Detected root fs:", name)
}
if stats.Mountpoint == "" {
log.Printf("Ignoring %s. No mountpoint found.\n", name)
delete(a.fsStats, name)
}
}
// if no root filesystem set, use most read device in /proc/diskstats
if !hasRoot {
rootDevice := findFallbackIoDevice(filepath.Base(filesystem))
log.Printf("Using / as mountpoint and %s for I/O\n", rootDevice)
a.fsStats[rootDevice] = &system.FsStats{Root: true, Mountpoint: "/"}
}
return nil
}
// Sets start values for disk I/O stats.
func (a *Agent) initializeDiskIoStats() {
// create slice of fs names to pass to disk.IOCounters
a.fsNames = make([]string, 0, len(a.fsStats))
for name := range a.fsStats {
a.fsNames = append(a.fsNames, name)
}
if ioCounters, err := disk.IOCounters(a.fsNames...); err == nil {
for _, d := range ioCounters {
if a.fsStats[d.Name] == nil {
continue
}
a.fsStats[d.Name].Time = time.Now()
a.fsStats[d.Name].TotalRead = d.ReadBytes
a.fsStats[d.Name].TotalWrite = d.WriteBytes
}
}
}
func (a *Agent) initializeNetIoStats() {
// reset valid network interfaces
a.netInterfaces = make(map[string]struct{}, 0)
// map of network interface names passed in via NICS env var
var nicsMap map[string]struct{}
nics, nicsEnvExists := os.LookupEnv("NICS")
if nicsEnvExists {
nicsMap = make(map[string]struct{}, 0)
for _, nic := range strings.Split(nics, ",") {
nicsMap[nic] = struct{}{}
}
}
// reset network I/O stats
a.netIoStats.BytesSent = 0
a.netIoStats.BytesRecv = 0
// get intial network I/O stats
if netIO, err := psutilNet.IOCounters(true); err == nil {
a.netIoStats.Time = time.Now()
for _, v := range netIO {
switch {
// skip if nics exists and the interface is not in the list
case nicsEnvExists:
if _, nameInNics := nicsMap[v.Name]; !nameInNics {
continue
}
// otherwise run the interface name through the skipNetworkInterface function
default:
if a.skipNetworkInterface(v) {
continue
}
}
log.Printf("Detected network interface: %+v (%+v recv, %+v sent)\n", v.Name, v.BytesRecv, v.BytesSent)
a.netIoStats.BytesSent += v.BytesSent
a.netIoStats.BytesRecv += v.BytesRecv
// store as a valid network interface
a.netInterfaces[v.Name] = struct{}{}
}
}
}
func bytesToMegabytes(b float64) float64 {
return twoDecimals(b / 1048576)
}
func bytesToGigabytes(b uint64) float64 {
return twoDecimals(float64(b) / 1073741824)
}
func twoDecimals(value float64) float64 {
return math.Round(value*100) / 100
}
func (a *Agent) skipNetworkInterface(v psutilNet.IOCountersStat) bool {
switch {
case strings.HasPrefix(v.Name, "lo"),
strings.HasPrefix(v.Name, "docker"),
strings.HasPrefix(v.Name, "br-"),
strings.HasPrefix(v.Name, "veth"),
v.BytesRecv == 0,
v.BytesSent == 0:
return true
default:
return false
}
}
func newDockerClient() *http.Client {
dockerHost := "unix:///var/run/docker.sock"
if dockerHostEnv, exists := os.LookupEnv("DOCKER_HOST"); exists {
dockerHost = dockerHostEnv
}
parsedURL, err := url.Parse(dockerHost)
if err != nil {
log.Fatal("Error parsing DOCKER_HOST: " + err.Error())
}
transport := &http.Transport{
ForceAttemptHTTP2: false,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
MaxConnsPerHost: 20,
MaxIdleConnsPerHost: 20,
DisableKeepAlives: false,
}
switch parsedURL.Scheme {
case "unix":
transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", parsedURL.Path)
}
case "tcp", "http", "https":
log.Println("Using DOCKER_HOST: " + dockerHost)
transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "tcp", parsedURL.Host)
}
default:
log.Fatal("Unsupported DOCKER_HOST: " + parsedURL.Scheme)
}
return &http.Client{
Timeout: time.Second,
Transport: transport,
}
}
// closes idle connections on timeouts to prevent reuse of stale connections
func (a *Agent) closeIdleConnections(err error) (isTimeout bool) {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Printf("Closing idle connections. Error: %+v\n", err)
a.dockerClient.Transport.(*http.Transport).CloseIdleConnections()
return true
}
return false
}
// Returns the device with the most reads in /proc/diskstats,
// or the device specified by the filesystem argument if it exists
// (fallback in case the root device is not supplied or detected)
func findFallbackIoDevice(filesystem string) string {
var maxReadBytes uint64
maxReadDevice := "/"
counters, err := disk.IOCounters()
if err != nil {
return maxReadDevice
}
for _, d := range counters {
if d.Name == filesystem {
return d.Name
}
if d.ReadBytes > maxReadBytes {
maxReadBytes = d.ReadBytes
maxReadDevice = d.Name
}
}
return maxReadDevice
}

View File

@@ -0,0 +1,144 @@
package agent
import (
"beszel/internal/entities/system"
"time"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"github.com/shirou/gopsutil/v4/disk"
)
// problem: device is in partitions, but not in io counters
// solution: if filesystem exists, always use for io counters, even if root is
// Sets up the filesystems to monitor for disk usage and I/O.
func (a *Agent) initializeDiskInfo() error {
filesystem := os.Getenv("FILESYSTEM")
hasRoot := false
// add values from EXTRA_FILESYSTEMS env var to fsStats
if extraFilesystems, exists := os.LookupEnv("EXTRA_FILESYSTEMS"); exists {
for _, filesystem := range strings.Split(extraFilesystems, ",") {
a.fsStats[filepath.Base(filesystem)] = &system.FsStats{}
}
}
partitions, err := disk.Partitions(false)
if err != nil {
return err
}
// if FILESYSTEM env var is set, use it to find root filesystem
if filesystem != "" {
for _, v := range partitions {
// use filesystem env var if matching partition is found
if strings.HasSuffix(v.Device, filesystem) || v.Mountpoint == filesystem {
a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: v.Mountpoint}
hasRoot = true
break
}
}
if !hasRoot {
// if no match, log available partition details
log.Printf("Partition details not found for %s:\n", filesystem)
for _, v := range partitions {
fmt.Printf("%+v\n", v)
}
}
}
for _, v := range partitions {
// binary root fallback - use root mountpoint
if !hasRoot && v.Mountpoint == "/" {
a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: "/"}
hasRoot = true
}
// docker root fallback - use /etc/hosts device if not mapped
if !hasRoot && v.Mountpoint == "/etc/hosts" && strings.HasPrefix(v.Device, "/dev") && !strings.Contains(v.Device, "mapper") {
a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Root: true, Mountpoint: "/"}
hasRoot = true
}
// check if device is in /extra-filesystem
if strings.HasPrefix(v.Mountpoint, "/extra-filesystem") {
// add to fsStats if not already there
if _, exists := a.fsStats[filepath.Base(v.Device)]; !exists {
a.fsStats[filepath.Base(v.Device)] = &system.FsStats{Mountpoint: v.Mountpoint}
}
continue
}
// set mountpoints for extra filesystems if passed in via env var
for name, stats := range a.fsStats {
if strings.HasSuffix(v.Device, name) {
stats.Mountpoint = v.Mountpoint
break
}
}
}
// remove extra filesystems that don't have a mountpoint
for name, stats := range a.fsStats {
if stats.Root {
log.Println("Detected root fs:", name)
}
if stats.Mountpoint == "" {
log.Printf("Ignoring %s. No mountpoint found.\n", name)
delete(a.fsStats, name)
}
}
// if no root filesystem set, use most read device in /proc/diskstats
if !hasRoot {
rootDevice := findFallbackIoDevice(filepath.Base(filesystem))
log.Printf("Using / as mountpoint and %s for I/O\n", rootDevice)
a.fsStats[rootDevice] = &system.FsStats{Root: true, Mountpoint: "/"}
}
return nil
}
// Returns the device with the most reads in /proc/diskstats,
// or the device specified by the filesystem argument if it exists
// (fallback in case the root device is not supplied or detected)
func findFallbackIoDevice(filesystem string) string {
var maxReadBytes uint64
maxReadDevice := "/"
counters, err := disk.IOCounters()
if err != nil {
return maxReadDevice
}
for _, d := range counters {
if d.Name == filesystem {
return d.Name
}
if d.ReadBytes > maxReadBytes {
maxReadBytes = d.ReadBytes
maxReadDevice = d.Name
}
}
return maxReadDevice
}
// Sets start values for disk I/O stats.
func (a *Agent) initializeDiskIoStats() {
// create slice of fs names to pass to disk.IOCounters
a.fsNames = make([]string, 0, len(a.fsStats))
for name := range a.fsStats {
a.fsNames = append(a.fsNames, name)
}
if ioCounters, err := disk.IOCounters(a.fsNames...); err == nil {
for _, d := range ioCounters {
if a.fsStats[d.Name] == nil {
continue
}
a.fsStats[d.Name].Time = time.Now()
a.fsStats[d.Name].TotalRead = d.ReadBytes
a.fsStats[d.Name].TotalWrite = d.WriteBytes
}
}
}

View File

@@ -0,0 +1,121 @@
package agent
import (
"context"
"log"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
psutilNet "github.com/shirou/gopsutil/v4/net"
)
func (a *Agent) initializeNetIoStats() {
// reset valid network interfaces
a.netInterfaces = make(map[string]struct{}, 0)
// map of network interface names passed in via NICS env var
var nicsMap map[string]struct{}
nics, nicsEnvExists := os.LookupEnv("NICS")
if nicsEnvExists {
nicsMap = make(map[string]struct{}, 0)
for _, nic := range strings.Split(nics, ",") {
nicsMap[nic] = struct{}{}
}
}
// reset network I/O stats
a.netIoStats.BytesSent = 0
a.netIoStats.BytesRecv = 0
// get intial network I/O stats
if netIO, err := psutilNet.IOCounters(true); err == nil {
a.netIoStats.Time = time.Now()
for _, v := range netIO {
switch {
// skip if nics exists and the interface is not in the list
case nicsEnvExists:
if _, nameInNics := nicsMap[v.Name]; !nameInNics {
continue
}
// otherwise run the interface name through the skipNetworkInterface function
default:
if a.skipNetworkInterface(v) {
continue
}
}
log.Printf("Detected network interface: %+v (%+v recv, %+v sent)\n", v.Name, v.BytesRecv, v.BytesSent)
a.netIoStats.BytesSent += v.BytesSent
a.netIoStats.BytesRecv += v.BytesRecv
// store as a valid network interface
a.netInterfaces[v.Name] = struct{}{}
}
}
}
func (a *Agent) skipNetworkInterface(v psutilNet.IOCountersStat) bool {
switch {
case strings.HasPrefix(v.Name, "lo"),
strings.HasPrefix(v.Name, "docker"),
strings.HasPrefix(v.Name, "br-"),
strings.HasPrefix(v.Name, "veth"),
v.BytesRecv == 0,
v.BytesSent == 0:
return true
default:
return false
}
}
func newDockerClient() *http.Client {
dockerHost := "unix:///var/run/docker.sock"
if dockerHostEnv, exists := os.LookupEnv("DOCKER_HOST"); exists {
dockerHost = dockerHostEnv
}
parsedURL, err := url.Parse(dockerHost)
if err != nil {
log.Fatal("Error parsing DOCKER_HOST: " + err.Error())
}
transport := &http.Transport{
ForceAttemptHTTP2: false,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
MaxConnsPerHost: 20,
MaxIdleConnsPerHost: 20,
DisableKeepAlives: false,
}
switch parsedURL.Scheme {
case "unix":
transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", parsedURL.Path)
}
case "tcp", "http", "https":
log.Println("Using DOCKER_HOST: " + dockerHost)
transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "tcp", parsedURL.Host)
}
default:
log.Fatal("Unsupported DOCKER_HOST: " + parsedURL.Scheme)
}
return &http.Client{
Timeout: time.Second,
Transport: transport,
}
}
// closes idle connections on timeouts to prevent reuse of stale connections
func (a *Agent) closeIdleConnections(err error) (isTimeout bool) {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Printf("Closing idle connections. Error: %+v\n", err)
a.dockerClient.Transport.(*http.Transport).CloseIdleConnections()
return true
}
return false
}

View File

@@ -0,0 +1,33 @@
package agent
import (
"encoding/json"
"log"
sshServer "github.com/gliderlabs/ssh"
)
func (a *Agent) startServer() {
sshServer.Handle(a.handleSession)
log.Printf("Starting SSH server on %s", a.addr)
if err := sshServer.ListenAndServe(a.addr, nil, sshServer.NoPty(),
sshServer.PublicKeyAuth(func(ctx sshServer.Context, key sshServer.PublicKey) bool {
allowed, _, _, _, _ := sshServer.ParseAuthorizedKey(a.pubKey)
return sshServer.KeysEqual(key, allowed)
}),
); err != nil {
log.Fatal(err)
}
}
func (a *Agent) handleSession(s sshServer.Session) {
stats := a.gatherStats()
encoder := json.NewEncoder(s)
if err := encoder.Encode(stats); err != nil {
log.Println("Error encoding stats:", err.Error())
s.Exit(1)
return
}
s.Exit(0)
}

View File

@@ -0,0 +1,30 @@
package agent
import "math"
func (a *Agent) acquireSemaphore() {
a.sem <- struct{}{}
}
func (a *Agent) releaseSemaphore() {
<-a.sem
}
// delete container stats from map using mutex
func (a *Agent) deleteContainerStatsSync(id string) {
a.containerStatsMutex.Lock()
defer a.containerStatsMutex.Unlock()
delete(a.containerStatsMap, id)
}
func bytesToMegabytes(b float64) float64 {
return twoDecimals(b / 1048576)
}
func bytesToGigabytes(b uint64) float64 {
return twoDecimals(float64(b) / 1073741824)
}
func twoDecimals(value float64) float64 {
return math.Round(value*100) / 100
}