diff --git a/beszel/cmd/agent/agent.go b/beszel/cmd/agent/agent.go index 2918644..a8e081d 100644 --- a/beszel/cmd/agent/agent.go +++ b/beszel/cmd/agent/agent.go @@ -29,20 +29,14 @@ func main() { log.Fatal("KEY environment variable is not set") } - var port string - + port := ":45876" if p, exists := os.LookupEnv("PORT"); exists { // allow passing an address in the form of "127.0.0.1:45876" - if !strings.Contains(port, ":") { - port = ":" + port + if !strings.Contains(p, ":") { + p = ":" + p } port = p - } else { - port = ":45876" } - a := agent.NewAgent(pubKey, port) - - a.Run() - + agent.NewAgent(pubKey, port).Run() } diff --git a/beszel/internal/agent/agent.go b/beszel/internal/agent/agent.go index 0392280..7a09989 100644 --- a/beszel/internal/agent/agent.go +++ b/beszel/internal/agent/agent.go @@ -35,6 +35,7 @@ type Agent struct { containerStatsMutex *sync.Mutex diskIoStats system.DiskIoStats netIoStats system.NetIoStats + dockerClient *http.Client } func NewAgent(pubKey []byte, port string) *Agent { @@ -45,6 +46,7 @@ func NewAgent(pubKey []byte, port string) *Agent { containerStatsMutex: &sync.Mutex{}, diskIoStats: system.DiskIoStats{}, netIoStats: system.NetIoStats{}, + dockerClient: newDockerClient(), } } @@ -56,11 +58,8 @@ func (a *Agent) releaseSemaphore() { <-a.sem } -// client for docker engine api -var dockerClient = newDockerClient() - -func (a *Agent) getSystemStats() (*system.SystemInfo, *system.SystemStats) { - systemStats := &system.SystemStats{} +func (a *Agent) getSystemStats() (*system.Info, *system.Stats) { + systemStats := &system.Stats{} // cpu percent cpuPct, err := cpu.Percent(0, false) @@ -127,7 +126,7 @@ func (a *Agent) getSystemStats() (*system.SystemInfo, *system.SystemStats) { a.netIoStats.Time = time.Now() } - systemInfo := &system.SystemInfo{ + systemInfo := &system.Info{ Cpu: systemStats.Cpu, MemPct: systemStats.MemPct, DiskPct: systemStats.DiskPct, @@ -153,21 +152,21 @@ func (a *Agent) getSystemStats() (*system.SystemInfo, *system.SystemStats) { } -func (a *Agent) getDockerStats() ([]*container.ContainerStats, error) { - resp, err := dockerClient.Get("http://localhost/containers/json") +func (a *Agent) getDockerStats() ([]*container.Stats, error) { + resp, err := a.dockerClient.Get("http://localhost/containers/json") if err != nil { - closeIdleConnections(err) - return []*container.ContainerStats{}, err + a.closeIdleConnections(err) + return []*container.Stats{}, err } defer resp.Body.Close() - var containers []*container.Container + var containers []*container.ApiInfo if err := json.NewDecoder(resp.Body).Decode(&containers); err != nil { log.Printf("Error decoding containers: %+v\n", err) - return []*container.ContainerStats{}, err + return []*container.Stats{}, err } - containerStats := make([]*container.ContainerStats, 0, len(containers)) + containerStats := make([]*container.Stats, 0, len(containers)) // store valid ids to clean up old container ids from map validIds := make(map[string]struct{}, len(containers)) @@ -188,12 +187,10 @@ func (a *Agent) getDockerStats() ([]*container.ContainerStats, error) { defer wg.Done() cstats, err := a.getContainerStats(ctr) if err != nil { - // Check if the error is a network timeout - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - // Close idle connections to prevent reuse of stale connections - closeIdleConnections(err) - } else { - // otherwise delete container from map + // close idle connections if error is a network timeout + isTimeout := a.closeIdleConnections(err) + // delete container from map if not a timeout + if !isTimeout { a.deleteContainerStatsSync(ctr.IdShort) } // retry once @@ -219,19 +216,19 @@ func (a *Agent) getDockerStats() ([]*container.ContainerStats, error) { return containerStats, nil } -func (a *Agent) getContainerStats(ctr *container.Container) (*container.ContainerStats, error) { +func (a *Agent) getContainerStats(ctr *container.ApiInfo) (*container.Stats, error) { // use semaphore to limit concurrency a.acquireSemaphore() defer a.releaseSemaphore() - resp, err := dockerClient.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1") + resp, err := a.dockerClient.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1") if err != nil { - return &container.ContainerStats{}, err + return &container.Stats{}, err } defer resp.Body.Close() - var statsJson system.CStats + var statsJson container.ApiStats if err := json.NewDecoder(resp.Body).Decode(&statsJson); err != nil { - panic(err) + log.Fatal(err) } name := ctr.Names[0][1:] @@ -258,7 +255,7 @@ func (a *Agent) getContainerStats(ctr *container.Container) (*container.Containe systemDelta := statsJson.CPUStats.SystemUsage - stats.Cpu[1] cpuPct := float64(cpuDelta) / float64(systemDelta) * 100 if cpuPct > 100 { - return &container.ContainerStats{}, fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct) + return &container.Stats{}, fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct) } stats.Cpu = [2]uint64{statsJson.CPUStats.CPUUsage.TotalUsage, statsJson.CPUStats.SystemUsage} @@ -280,7 +277,7 @@ func (a *Agent) getContainerStats(ctr *container.Container) (*container.Containe stats.Net.Recv = total_recv stats.Net.Time = time.Now() - cStats := &container.ContainerStats{ + cStats := &container.Stats{ Name: name, Cpu: twoDecimals(cpuPct), Mem: bytesToMegabytes(float64(usedMemory)), @@ -297,19 +294,18 @@ func (a *Agent) deleteContainerStatsSync(id string) { delete(containerStatsMap, id) } -func (a *Agent) gatherStats() *system.SystemData { +func (a *Agent) gatherStats() *system.CombinedData { systemInfo, systemStats := a.getSystemStats() - stats := &system.SystemData{ - Stats: systemStats, - Info: systemInfo, - Containers: []*container.ContainerStats{}, + systemData := &system.CombinedData{ + Stats: systemStats, + Info: systemInfo, + // Containers: []*container.Stats{}, } - containerStats, err := a.getDockerStats() - if err == nil { - stats.Containers = containerStats + if containerStats, err := a.getDockerStats(); err == nil { + systemData.Containers = containerStats } // fmt.Printf("%+v\n", stats) - return stats + return systemData } func (a *Agent) startServer(addr string, pubKey []byte) { @@ -324,8 +320,7 @@ func (a *Agent) startServer(addr string, pubKey []byte) { log.Printf("Starting SSH server on %s", addr) if err := sshServer.ListenAndServe(addr, nil, sshServer.NoPty(), sshServer.PublicKeyAuth(func(ctx sshServer.Context, key sshServer.PublicKey) bool { - data := []byte(pubKey) - allowed, _, _, _, _ := sshServer.ParseAuthorizedKey(data) + allowed, _, _, _, _ := sshServer.ParseAuthorizedKey(pubKey) return sshServer.KeysEqual(key, allowed) }), ); err != nil { @@ -334,7 +329,6 @@ func (a *Agent) startServer(addr string, pubKey []byte) { } func (a *Agent) Run() { - if filesystem, exists := os.LookupEnv("FILESYSTEM"); exists { a.diskIoStats.Filesystem = filesystem } else { @@ -452,7 +446,12 @@ func newDockerClient() *http.Client { } } -func closeIdleConnections(err error) { - log.Printf("Closing idle connections. Error: %+v\n", err) - dockerClient.Transport.(*http.Transport).CloseIdleConnections() +// closes idle connections on timeouts to prevent reuse of stale connections +func (a *Agent) closeIdleConnections(err error) (isTimeout bool) { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + log.Printf("Closing idle connections. Error: %+v\n", err) + a.dockerClient.Transport.(*http.Transport).CloseIdleConnections() + return true + } + return false } diff --git a/beszel/internal/alerts/alerts.go b/beszel/internal/alerts/alerts.go index 4e68bb1..1f4f03e 100644 --- a/beszel/internal/alerts/alerts.go +++ b/beszel/internal/alerts/alerts.go @@ -1,3 +1,4 @@ +// Package alerts handles alert management and delivery. package alerts import ( @@ -32,7 +33,7 @@ func (am *AlertManager) HandleSystemAlerts(newStatus string, newRecord *models.R return } // log.Println("found alerts", len(alertRecords)) - var systemInfo *system.SystemInfo + var systemInfo *system.Info for _, alertRecord := range alertRecords { name := alertRecord.GetString("name") switch name { @@ -56,8 +57,8 @@ func (am *AlertManager) HandleSystemAlerts(newStatus string, newRecord *models.R } } -func getSystemInfo(record *models.Record) *system.SystemInfo { - var SystemInfo system.SystemInfo +func getSystemInfo(record *models.Record) *system.Info { + var SystemInfo system.Info record.UnmarshalJSONField("info", &SystemInfo) return &SystemInfo } diff --git a/beszel/internal/entities/system/stats.go b/beszel/internal/entities/container/container.go similarity index 73% rename from beszel/internal/entities/system/stats.go rename to beszel/internal/entities/container/container.go index caafcc8..92eedc1 100644 --- a/beszel/internal/entities/system/stats.go +++ b/beszel/internal/entities/container/container.go @@ -1,36 +1,50 @@ -package system +package container import "time" -type SystemStats struct { - Cpu float64 `json:"cpu"` - Mem float64 `json:"m"` - MemUsed float64 `json:"mu"` - MemPct float64 `json:"mp"` - MemBuffCache float64 `json:"mb"` - Swap float64 `json:"s"` - SwapUsed float64 `json:"su"` - Disk float64 `json:"d"` - DiskUsed float64 `json:"du"` - DiskPct float64 `json:"dp"` - DiskRead float64 `json:"dr"` - DiskWrite float64 `json:"dw"` - NetworkSent float64 `json:"ns"` - NetworkRecv float64 `json:"nr"` +// Docker container info from /containers/json +type ApiInfo struct { + Id string + IdShort string + Names []string + Status string + // Image string + // ImageID string + // Command string + // Created int64 + // Ports []Port + // SizeRw int64 `json:",omitempty"` + // SizeRootFs int64 `json:",omitempty"` + // Labels map[string]string + // State string + // HostConfig struct { + // NetworkMode string `json:",omitempty"` + // Annotations map[string]string `json:",omitempty"` + // } + // NetworkSettings *SummaryNetworkSettings + // Mounts []MountPoint } -type DiskIoStats struct { - Read uint64 - Write uint64 - Time time.Time - Filesystem string -} +// Docker container resources from /containers/{id}/stats +type ApiStats struct { + // Common stats + // Read time.Time `json:"read"` + // PreRead time.Time `json:"preread"` -type NetIoStats struct { - BytesRecv uint64 - BytesSent uint64 - Time time.Time - Name string + // Linux specific stats, not populated on Windows. + // PidsStats PidsStats `json:"pids_stats,omitempty"` + // BlkioStats BlkioStats `json:"blkio_stats,omitempty"` + + // Windows specific stats, not populated on Linux. + // NumProcs uint32 `json:"num_procs"` + // StorageStats StorageStats `json:"storage_stats,omitempty"` + // Networks request version >=1.21 + Networks map[string]NetworkStats + + // Shared stats + CPUStats CPUStats `json:"cpu_stats,omitempty"` + // PreCPUStats CPUStats `json:"precpu_stats,omitempty"` // "Pre"="Previous" + MemoryStats MemoryStats `json:"memory_stats,omitempty"` } type CPUStats struct { @@ -70,27 +84,6 @@ type CPUUsage struct { // UsageInUsermode uint64 `json:"usage_in_usermode"` } -type CStats struct { - // Common stats - // Read time.Time `json:"read"` - // PreRead time.Time `json:"preread"` - - // Linux specific stats, not populated on Windows. - // PidsStats PidsStats `json:"pids_stats,omitempty"` - // BlkioStats BlkioStats `json:"blkio_stats,omitempty"` - - // Windows specific stats, not populated on Linux. - // NumProcs uint32 `json:"num_procs"` - // StorageStats StorageStats `json:"storage_stats,omitempty"` - // Networks request version >=1.21 - Networks map[string]NetworkStats - - // Shared stats - CPUStats CPUStats `json:"cpu_stats,omitempty"` - // PreCPUStats CPUStats `json:"precpu_stats,omitempty"` // "Pre"="Previous" - MemoryStats MemoryStats `json:"memory_stats,omitempty"` -} - type MemoryStats struct { // current res_counter usage for memory @@ -119,3 +112,22 @@ type NetworkStats struct { // Bytes sent. Windows and Linux. TxBytes uint64 `json:"tx_bytes"` } + +// Container stats to return to the hub +type Stats struct { + Name string `json:"n"` + Cpu float64 `json:"c"` + Mem float64 `json:"m"` + NetworkSent float64 `json:"ns"` + NetworkRecv float64 `json:"nr"` +} + +// Keeps track of container stats from previous run +type PrevContainerStats struct { + Cpu [2]uint64 + Net struct { + Sent uint64 + Recv uint64 + Time time.Time + } +} diff --git a/beszel/internal/entities/container/stats.go b/beszel/internal/entities/container/stats.go deleted file mode 100644 index bec7a5d..0000000 --- a/beszel/internal/entities/container/stats.go +++ /dev/null @@ -1,45 +0,0 @@ -package container - -import "time" - -// Docker container resources info from /containers/id/stats -type Container struct { - Id string - IdShort string - Names []string - Status string - // Image string - // ImageID string - // Command string - // Created int64 - // Ports []Port - // SizeRw int64 `json:",omitempty"` - // SizeRootFs int64 `json:",omitempty"` - // Labels map[string]string - // State string - // HostConfig struct { - // NetworkMode string `json:",omitempty"` - // Annotations map[string]string `json:",omitempty"` - // } - // NetworkSettings *SummaryNetworkSettings - // Mounts []MountPoint -} - -// Stats to return to the hub -type ContainerStats struct { - Name string `json:"n"` - Cpu float64 `json:"c"` - Mem float64 `json:"m"` - NetworkSent float64 `json:"ns"` - NetworkRecv float64 `json:"nr"` -} - -// Keeps track of container stats from previous run -type PrevContainerStats struct { - Cpu [2]uint64 - Net struct { - Sent uint64 - Recv uint64 - Time time.Time - } -} diff --git a/beszel/internal/entities/system/system.go b/beszel/internal/entities/system/system.go index 941ce7f..6c9550c 100644 --- a/beszel/internal/entities/system/system.go +++ b/beszel/internal/entities/system/system.go @@ -1,8 +1,42 @@ package system -import "beszel/internal/entities/container" +import ( + "beszel/internal/entities/container" + "time" +) -type SystemInfo struct { +type Stats struct { + Cpu float64 `json:"cpu"` + Mem float64 `json:"m"` + MemUsed float64 `json:"mu"` + MemPct float64 `json:"mp"` + MemBuffCache float64 `json:"mb"` + Swap float64 `json:"s"` + SwapUsed float64 `json:"su"` + Disk float64 `json:"d"` + DiskUsed float64 `json:"du"` + DiskPct float64 `json:"dp"` + DiskRead float64 `json:"dr"` + DiskWrite float64 `json:"dw"` + NetworkSent float64 `json:"ns"` + NetworkRecv float64 `json:"nr"` +} + +type DiskIoStats struct { + Read uint64 + Write uint64 + Time time.Time + Filesystem string +} + +type NetIoStats struct { + BytesRecv uint64 + BytesSent uint64 + Time time.Time + Name string +} + +type Info struct { Cores int `json:"c"` Threads int `json:"t"` CpuModel string `json:"m"` @@ -13,8 +47,9 @@ type SystemInfo struct { DiskPct float64 `json:"dp"` } -type SystemData struct { - Stats *SystemStats `json:"stats"` - Info *SystemInfo `json:"info"` - Containers []*container.ContainerStats `json:"container"` +// Final data structure to return to the hub +type CombinedData struct { + Stats *Stats `json:"stats"` + Info *Info `json:"info"` + Containers []*container.Stats `json:"container"` } diff --git a/beszel/internal/hub/hub.go b/beszel/internal/hub/hub.go index 2a53c7e..8a4a4af 100644 --- a/beszel/internal/hub/hub.go +++ b/beszel/internal/hub/hub.go @@ -97,7 +97,7 @@ func (h *Hub) Run() { Scheme: "http", Host: "localhost:5173", }) - e.Router.GET("/static/*", apis.StaticDirectoryHandler(os.DirFS("./site/public/static"), false)) + e.Router.GET("/static/*", apis.StaticDirectoryHandler(os.DirFS("../../site/public/static"), false)) e.Router.Any("/*", echo.WrapHandler(proxy)) // e.Router.Any("/", echo.WrapHandler(proxy)) default: @@ -162,7 +162,7 @@ func (h *Hub) Run() { // system creation defaults h.app.OnModelBeforeCreate("systems").Add(func(e *core.ModelEvent) error { record := e.Model.(*models.Record) - record.Set("info", system.SystemInfo{}) + record.Set("info", system.Info{}) record.Set("status", "pending") return nil }) @@ -367,10 +367,10 @@ func (h *Hub) createSSHClientConfig() error { return nil } -func requestJson(client *ssh.Client) (system.SystemData, error) { +func requestJson(client *ssh.Client) (system.CombinedData, error) { session, err := client.NewSession() if err != nil { - return system.SystemData{}, errors.New("retry") + return system.CombinedData{}, errors.New("retry") } defer session.Close() @@ -379,19 +379,19 @@ func requestJson(client *ssh.Client) (system.SystemData, error) { session.Stdout = &outputBuffer if err := session.Shell(); err != nil { - return system.SystemData{}, err + return system.CombinedData{}, err } err = session.Wait() if err != nil { - return system.SystemData{}, err + return system.CombinedData{}, err } // Unmarshal the output into our struct - var systemData system.SystemData + var systemData system.CombinedData err = json.Unmarshal(outputBuffer.Bytes(), &systemData) if err != nil { - return system.SystemData{}, err + return system.CombinedData{}, err } return systemData, nil diff --git a/beszel/internal/records/records.go b/beszel/internal/records/records.go index be8d028..58c595d 100644 --- a/beszel/internal/records/records.go +++ b/beszel/internal/records/records.go @@ -1,3 +1,4 @@ +// Package records handles creating longer records and deleting old records. package records import ( @@ -95,12 +96,12 @@ func (rm *RecordManager) CreateLongerRecords(collectionName string, shorterRecor } // calculate the average stats of a list of system_stats records -func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.SystemStats { +func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Stats { count := float64(len(records)) - sum := reflect.New(reflect.TypeOf(system.SystemStats{})).Elem() + sum := reflect.New(reflect.TypeOf(system.Stats{})).Elem() for _, record := range records { - var stats system.SystemStats + var stats system.Stats record.UnmarshalJSONField("stats", &stats) statValue := reflect.ValueOf(stats) for i := 0; i < statValue.NumField(); i++ { @@ -109,24 +110,24 @@ func (rm *RecordManager) AverageSystemStats(records []*models.Record) system.Sys } } - average := reflect.New(reflect.TypeOf(system.SystemStats{})).Elem() + average := reflect.New(reflect.TypeOf(system.Stats{})).Elem() for i := 0; i < sum.NumField(); i++ { average.Field(i).SetFloat(twoDecimals(sum.Field(i).Float() / count)) } - return average.Interface().(system.SystemStats) + return average.Interface().(system.Stats) } // calculate the average stats of a list of container_stats records -func (rm *RecordManager) AverageContainerStats(records []*models.Record) (stats []container.ContainerStats) { - sums := make(map[string]*container.ContainerStats) +func (rm *RecordManager) AverageContainerStats(records []*models.Record) (stats []container.Stats) { + sums := make(map[string]*container.Stats) count := float64(len(records)) for _, record := range records { - var stats []container.ContainerStats + var stats []container.Stats record.UnmarshalJSONField("stats", &stats) for _, stat := range stats { if _, ok := sums[stat.Name]; !ok { - sums[stat.Name] = &container.ContainerStats{Name: stat.Name, Cpu: 0, Mem: 0} + sums[stat.Name] = &container.Stats{Name: stat.Name, Cpu: 0, Mem: 0} } sums[stat.Name].Cpu += stat.Cpu sums[stat.Name].Mem += stat.Mem @@ -135,7 +136,7 @@ func (rm *RecordManager) AverageContainerStats(records []*models.Record) (stats } } for _, value := range sums { - stats = append(stats, container.ContainerStats{ + stats = append(stats, container.Stats{ Name: value.Name, Cpu: twoDecimals(value.Cpu / count), Mem: twoDecimals(value.Mem / count),