refactor agent gpu code to make it easier to add intel / jetson

This commit is contained in:
Henry Dollman
2024-12-17 17:12:58 -05:00
parent dd10fb97c0
commit b08219dacf

View File

@@ -14,6 +14,7 @@ import (
"golang.org/x/exp/slog" "golang.org/x/exp/slog"
) )
// GPUManager manages data collection for GPUs (either Nvidia or AMD)
type GPUManager struct { type GPUManager struct {
nvidiaSmi bool nvidiaSmi bool
rocmSmi bool rocmSmi bool
@@ -21,6 +22,7 @@ type GPUManager struct {
mutex sync.Mutex mutex sync.Mutex
} }
// RocmSmiJson represents the JSON structure of rocm-smi output
type RocmSmiJson struct { type RocmSmiJson struct {
ID string `json:"Device ID"` ID string `json:"Device ID"`
Name string `json:"Card series"` Name string `json:"Card series"`
@@ -31,48 +33,68 @@ type RocmSmiJson struct {
Power string `json:"Current Socket Graphics Package Power (W)"` Power string `json:"Current Socket Graphics Package Power (W)"`
} }
// startNvidiaCollector oversees collectNvidiaStats and restarts nvidia-smi if it fails // gpuCollector defines a collector for a specific GPU management utility (nvidia-smi or rocm-smi)
func (gm *GPUManager) startNvidiaCollector() error { type gpuCollector struct {
name string
cmd *exec.Cmd
parse func([]byte) bool // returns true if valid data was found
}
var errNoValidData = fmt.Errorf("no valid GPU data found") // Error for missing data
// starts and manages the ongoing collection of GPU data for the specified GPU management utility
func (c *gpuCollector) start() {
for { for {
if err := gm.collectNvidiaStats(); err != nil { err := c.collect()
slog.Warn("Restarting nvidia-smi", "err", err) if err != nil {
time.Sleep(time.Second) // Wait before retrying if err == errNoValidData {
slog.Warn(c.name + " found no valid GPU data, stopping")
break
}
slog.Warn(c.name+" failed, restarting", "err", err)
time.Sleep(time.Second * 5)
continue continue
} }
} }
} }
// collectNvidiaStats runs nvidia-smi in a loop and passes the output to parseNvidiaData // collect executes the command, parses output with the assigned parser function
func (gm *GPUManager) collectNvidiaStats() error { func (c *gpuCollector) collect() error {
// Set up the command stdout, err := c.cmd.StdoutPipe()
cmd := exec.Command("nvidia-smi", "-l", "4", "--query-gpu=index,name,temperature.gpu,memory.used,memory.total,utilization.gpu,power.draw", "--format=csv,noheader,nounits")
// Set up a pipe to capture stdout
stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return err return err
} }
// Start the command if err := c.cmd.Start(); err != nil {
if err := cmd.Start(); err != nil {
return err return err
} }
// Use a scanner to read each line of output
scanner := bufio.NewScanner(stdout) scanner := bufio.NewScanner(stdout)
buf := make([]byte, 0, 8*1024) // 8KB buffer buf := make([]byte, 0, 8*1024)
scanner.Buffer(buf, bufio.MaxScanTokenSize) scanner.Buffer(buf, bufio.MaxScanTokenSize)
hasValidData := false
for scanner.Scan() { for scanner.Scan() {
line := scanner.Bytes() if c.parse(scanner.Bytes()) {
gm.parseNvidiaData(line) // Run your function on each new line hasValidData = true
}
} }
// Check for any errors encountered during scanning
if !hasValidData {
return errNoValidData
}
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
return err return fmt.Errorf("scanner error: %w", err)
} }
// Wait for the command to complete return c.cmd.Wait()
return cmd.Wait()
} }
// parseNvidiaData parses the output of nvidia-smi and updates the GPUData map // parseNvidiaData parses the output of nvidia-smi and updates the GPUData map
func (gm *GPUManager) parseNvidiaData(output []byte) { func (gm *GPUManager) parseNvidiaData(output []byte) bool {
fields := strings.Split(string(output), ", ")
if len(fields) < 7 {
return false
}
gm.mutex.Lock() gm.mutex.Lock()
defer gm.mutex.Unlock() defer gm.mutex.Unlock()
lines := strings.Split(string(output), "\n") lines := strings.Split(string(output), "\n")
@@ -102,60 +124,18 @@ func (gm *GPUManager) parseNvidiaData(output []byte) {
} }
} }
} }
} return true
// startAmdCollector oversees collectAmdStats and restarts rocm-smi if it fails
func (gm *GPUManager) startAmdCollector() {
for {
if err := gm.collectAmdStats(); err != nil {
slog.Warn("Restarting rocm-smi", "err", err)
time.Sleep(time.Second) // Wait before retrying
continue
} else {
// break if no error (command runs but no card found)
break
}
}
}
// collectAmdStats runs rocm-smi in a loop and passes the output to parseAmdData
func (gm *GPUManager) collectAmdStats() error {
cmd := exec.Command("/bin/sh", "-c", "while true; do rocm-smi --showid --showtemp --showuse --showpower --showproductname --showmeminfo vram --json; sleep 4.3; done")
// Set up a pipe to capture stdout
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
// Start the command
if err := cmd.Start(); err != nil {
return err
}
// Use a scanner to read each line of output
scanner := bufio.NewScanner(stdout)
buf := make([]byte, 0, 8*1024) // 8KB buffer
scanner.Buffer(buf, bufio.MaxScanTokenSize)
for scanner.Scan() {
var rocmSmiInfo map[string]RocmSmiJson
if err := json.Unmarshal(scanner.Bytes(), &rocmSmiInfo); err != nil {
return err
}
if len(rocmSmiInfo) > 0 {
// slog.Info("rocm-smi", "data", rocmSmiInfo)
gm.parseAmdData(&rocmSmiInfo)
} else {
slog.Warn("rocm-smi returned no GPU")
return nil
}
}
if err := scanner.Err(); err != nil {
return err
}
return cmd.Wait()
} }
// parseAmdData parses the output of rocm-smi and updates the GPUData map // parseAmdData parses the output of rocm-smi and updates the GPUData map
func (gm *GPUManager) parseAmdData(rocmSmiInfo *map[string]RocmSmiJson) { func (gm *GPUManager) parseAmdData(output []byte) bool {
for _, v := range *rocmSmiInfo { var rocmSmiInfo map[string]RocmSmiJson
if err := json.Unmarshal(output, &rocmSmiInfo); err != nil || len(rocmSmiInfo) == 0 {
return false
}
gm.mutex.Lock()
defer gm.mutex.Unlock()
for _, v := range rocmSmiInfo {
temp, _ := strconv.ParseFloat(v.Temperature, 64) temp, _ := strconv.ParseFloat(v.Temperature, 64)
memoryUsage, _ := strconv.ParseFloat(v.MemoryUsed, 64) memoryUsage, _ := strconv.ParseFloat(v.MemoryUsed, 64)
totalMemory, _ := strconv.ParseFloat(v.MemoryTotal, 64) totalMemory, _ := strconv.ParseFloat(v.MemoryTotal, 64)
@@ -175,6 +155,7 @@ func (gm *GPUManager) parseAmdData(rocmSmiInfo *map[string]RocmSmiJson) {
gpu.Power += power gpu.Power += power
gpu.Count++ gpu.Count++
} }
return true
} }
// sums and resets the current GPU utilization data since the last update // sums and resets the current GPU utilization data since the last update
@@ -197,9 +178,9 @@ func (gm *GPUManager) GetCurrentData() map[string]system.GPUData {
return gpuData return gpuData
} }
// detectGPU returns the GPU brand (nvidia or amd) or an error if none is found // detectGPUs returns the GPU brand (nvidia or amd) or an error if none is found
// todo: make sure there's actually a GPU, not just if the command exists // todo: make sure there's actually a GPU, not just if the command exists
func (gm *GPUManager) detectGPU() error { func (gm *GPUManager) detectGPUs() error {
if err := exec.Command("nvidia-smi").Run(); err == nil { if err := exec.Command("nvidia-smi").Run(); err == nil {
gm.nvidiaSmi = true gm.nvidiaSmi = true
} }
@@ -212,18 +193,43 @@ func (gm *GPUManager) detectGPU() error {
return fmt.Errorf("no GPU found - install nvidia-smi or rocm-smi") return fmt.Errorf("no GPU found - install nvidia-smi or rocm-smi")
} }
// NewGPUManager returns a new GPUManager // startCollector starts the appropriate GPU data collector based on the command
func (gm *GPUManager) startCollector(command string) {
switch command {
case "nvidia-smi":
nvidia := gpuCollector{
name: "nvidia-smi",
cmd: exec.Command("nvidia-smi", "-l", "4",
"--query-gpu=index,name,temperature.gpu,memory.used,memory.total,utilization.gpu,power.draw",
"--format=csv,noheader,nounits"),
parse: gm.parseNvidiaData,
}
go nvidia.start()
case "rocm-smi":
amdCollector := gpuCollector{
name: "rocm-smi",
cmd: exec.Command("/bin/sh", "-c",
"while true; do rocm-smi --showid --showtemp --showuse --showpower --showproductname --showmeminfo vram --json; sleep 4.3; done"),
parse: gm.parseAmdData,
}
go amdCollector.start()
}
}
// NewGPUManager creates and initializes a new GPUManager
func NewGPUManager() (*GPUManager, error) { func NewGPUManager() (*GPUManager, error) {
var gm GPUManager var gm GPUManager
if err := gm.detectGPU(); err != nil { if err := gm.detectGPUs(); err != nil {
return nil, err return nil, err
} }
gm.GpuDataMap = make(map[string]*system.GPUData, 1) gm.GpuDataMap = make(map[string]*system.GPUData, 1)
if gm.nvidiaSmi { if gm.nvidiaSmi {
go gm.startNvidiaCollector() gm.startCollector("nvidia-smi")
} }
if gm.rocmSmi { if gm.rocmSmi {
go gm.startAmdCollector() gm.startCollector("rocm-smi")
} }
return &gm, nil return &gm, nil
} }