mirror of
https://github.com/fankes/komari-agent.git
synced 2025-12-11 07:53:38 +08:00
feat: 优化 goroutine 启动逻辑
This commit is contained in:
12
cmd/root.go
12
cmd/root.go
@@ -1,10 +1,13 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
"github.com/komari-monitor/komari-agent/cmd/flags"
|
"github.com/komari-monitor/komari-agent/cmd/flags"
|
||||||
"github.com/komari-monitor/komari-agent/dnsresolver"
|
"github.com/komari-monitor/komari-agent/dnsresolver"
|
||||||
@@ -20,6 +23,15 @@ var RootCmd = &cobra.Command{
|
|||||||
Short: "komari agent",
|
Short: "komari agent",
|
||||||
Long: `komari agent`,
|
Long: `komari agent`,
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
|
// 捕获中止信号,优雅退出
|
||||||
|
stopCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||||
|
defer stop()
|
||||||
|
go func() {
|
||||||
|
<-stopCtx.Done()
|
||||||
|
log.Printf("shutting down gracefully...")
|
||||||
|
netstatic.Stop()
|
||||||
|
os.Exit(0)
|
||||||
|
}()
|
||||||
|
|
||||||
if flags.ShowWarning {
|
if flags.ShowWarning {
|
||||||
ShowToast()
|
ShowToast()
|
||||||
|
|||||||
@@ -245,6 +245,39 @@ func flushCacheLocked(ts uint64) {
|
|||||||
staticCache = make(map[string][]TrafficData)
|
staticCache = make(map[string][]TrafficData)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startGoroutinesLocked 启动采集和保存的 goroutines(调用前必须已持有锁)
|
||||||
|
func startGoroutinesLocked() {
|
||||||
|
// 采集 goroutine
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-detectTicker.C:
|
||||||
|
mu.Lock()
|
||||||
|
sampleOnceLocked()
|
||||||
|
mu.Unlock()
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 保存 goroutine
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case t := <-saveTicker.C:
|
||||||
|
mu.Lock()
|
||||||
|
flushCacheLocked(uint64(t.Unix()))
|
||||||
|
purgeExpiredLocked()
|
||||||
|
_ = saveToFileLocked()
|
||||||
|
mu.Unlock()
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// GetNetStatic 获取当前的所有流量统计数据
|
// GetNetStatic 获取当前的所有流量统计数据
|
||||||
func GetNetStatic() (*NetStatic, error) {
|
func GetNetStatic() (*NetStatic, error) {
|
||||||
mu.RLock()
|
mu.RLock()
|
||||||
@@ -281,35 +314,8 @@ func StartOrContinue() error {
|
|||||||
stopCh = make(chan struct{})
|
stopCh = make(chan struct{})
|
||||||
running = true
|
running = true
|
||||||
|
|
||||||
// 采集 goroutine
|
// 启动 goroutines
|
||||||
go func() {
|
startGoroutinesLocked()
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-detectTicker.C:
|
|
||||||
mu.Lock()
|
|
||||||
sampleOnceLocked()
|
|
||||||
mu.Unlock()
|
|
||||||
case <-stopCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// 保存 goroutine
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case t := <-saveTicker.C:
|
|
||||||
mu.Lock()
|
|
||||||
flushCacheLocked(uint64(t.Unix()))
|
|
||||||
purgeExpiredLocked()
|
|
||||||
_ = saveToFileLocked()
|
|
||||||
mu.Unlock()
|
|
||||||
case <-stopCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -476,14 +482,22 @@ func SetNewConfig(newCfg NetStaticConfig) error {
|
|||||||
config = cfg
|
config = cfg
|
||||||
// 重新配置 ticker(若运行中)
|
// 重新配置 ticker(若运行中)
|
||||||
if running {
|
if running {
|
||||||
|
// 先停止旧的 ticker 和 goroutines
|
||||||
if detectTicker != nil {
|
if detectTicker != nil {
|
||||||
detectTicker.Stop()
|
detectTicker.Stop()
|
||||||
}
|
}
|
||||||
if saveTicker != nil {
|
if saveTicker != nil {
|
||||||
saveTicker.Stop()
|
saveTicker.Stop()
|
||||||
}
|
}
|
||||||
|
close(stopCh)
|
||||||
|
|
||||||
|
// 重新创建 ticker 和 channel
|
||||||
detectTicker = time.NewTicker(time.Duration(cfg.DetectInterval * float64(time.Second)))
|
detectTicker = time.NewTicker(time.Duration(cfg.DetectInterval * float64(time.Second)))
|
||||||
saveTicker = time.NewTicker(time.Duration(cfg.SaveInterval * float64(time.Second)))
|
saveTicker = time.NewTicker(time.Duration(cfg.SaveInterval * float64(time.Second)))
|
||||||
|
stopCh = make(chan struct{})
|
||||||
|
|
||||||
|
// 重新启动 goroutines
|
||||||
|
startGoroutinesLocked()
|
||||||
|
|
||||||
// 当配置了指定网卡白名单时,清理不在白名单内的缓存与上次计数,避免无用数据积累
|
// 当配置了指定网卡白名单时,清理不在白名单内的缓存与上次计数,避免无用数据积累
|
||||||
if len(cfg.Nics) > 0 {
|
if len(cfg.Nics) > 0 {
|
||||||
|
|||||||
Reference in New Issue
Block a user