From 562c8ffa3cb9d3d3ed7ad8fcfcbbe2b80df09120 Mon Sep 17 00:00:00 2001 From: Akizon77 Date: Sat, 21 Jun 2025 14:43:41 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AE=89=E5=85=A8?= =?UTF-8?q?=E7=9A=84WebSocket=E8=BF=9E=E6=8E=A5=E7=AE=A1=E7=90=86=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81ping=E4=BB=BB=E5=8A=A1=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/root.go | 5 +-- go.mod | 3 ++ go.sum | 11 +++++++ server/task.go | 80 +++++++++++++++++++++++++++++++++++++++++++++ server/websocket.go | 19 +++++++---- ws/safaConn.go | 58 ++++++++++++++++++++++++++++++++ 6 files changed, 168 insertions(+), 8 deletions(-) create mode 100644 ws/safaConn.go diff --git a/cmd/root.go b/cmd/root.go index c612b27..d9b81d5 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -33,8 +33,9 @@ var RootCmd = &cobra.Command{ go update.DoUpdateWorks() } go server.DoUploadBasicInfoWorks() - server.EstablishWebSocketConnection() - os.Exit(0) + for { + server.EstablishWebSocketConnection() + } }, } diff --git a/go.mod b/go.mod index 0ae02ca..4f243ae 100644 --- a/go.mod +++ b/go.mod @@ -15,9 +15,11 @@ require ( require ( github.com/go-ole/go-ole v1.2.6 // indirect + github.com/go-ping/ping v1.2.0 // indirect github.com/golang/protobuf v1.3.2 // indirect github.com/google/go-github/v30 v30.1.0 // indirect github.com/google/go-querystring v1.0.0 // indirect + github.com/google/uuid v1.2.0 // indirect github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/spf13/pflag v1.0.6 // indirect @@ -30,5 +32,6 @@ require ( golang.org/x/crypto v0.39.0 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect google.golang.org/appengine v1.3.0 // indirect ) diff --git a/go.sum b/go.sum index 110ef3c..e644269 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-ping/ping v1.2.0 h1:vsJ8slZBZAXNCK4dPcI2PEE9eM9n9RbXbGouVQ/Y4yQ= +github.com/go-ping/ping v1.2.0/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -17,6 +19,8 @@ github.com/google/go-github/v30 v30.1.0 h1:VLDx+UolQICEOKu2m4uAoMti1SxuEBAl7RSEG github.com/google/go-github/v30 v30.1.0/go.mod h1:n8jBpHl45a/rlBUtRJMOG4GhNADUQFEufcolZ95JfU8= github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -63,6 +67,7 @@ golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -71,17 +76,23 @@ golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAG golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= diff --git a/server/task.go b/server/task.go index e7e8bb6..a9aa681 100644 --- a/server/task.go +++ b/server/task.go @@ -3,14 +3,18 @@ package server import ( "bytes" "encoding/json" + "errors" "log" + "net" "net/http" "os/exec" "runtime" "strings" "time" + ping "github.com/go-ping/ping" "github.com/komari-monitor/komari-agent/cmd/flags" + "github.com/komari-monitor/komari-agent/ws" ) func NewTask(task_id, command string) { @@ -79,3 +83,79 @@ func uploadTaskResult(taskID, result string, exitCode int, finishedAt time.Time) } } } +func icmpPing(target string, timeout time.Duration) error { + pinger, err := getPinger(target) + if err != nil { + return err + } + pinger.Count = 1 + pinger.Timeout = timeout + pinger.SetPrivileged(true) + return pinger.Run() +} + +func getPinger(target string) (*ping.Pinger, error) { + return ping.NewPinger(target) +} + +func tcpPing(target string, timeout time.Duration) error { + if !strings.Contains(target, ":") { + target += ":80" + } + conn, err := net.DialTimeout("tcp", target, timeout) + if err != nil { + return err + } + defer conn.Close() + return nil +} + +func httpPing(target string, timeout time.Duration) (int64, error) { + client := http.Client{ + Timeout: timeout, + } + start := time.Now() + resp, err := client.Get(target) + latency := time.Since(start).Milliseconds() + if err != nil { + return 0, err + } + defer resp.Body.Close() + if resp.StatusCode >= 200 && resp.StatusCode < 400 { + return latency, nil + } + return latency, errors.New("http status not ok") +} + +func NewPingTask(conn *ws.SafeConn, taskID uint, pingType, pingTarget string) { + if taskID == 0 { + return + } + pingResult := 0 + timeout := 3 * time.Second + switch pingType { + case "icmp": + start := time.Now() + if err := icmpPing(pingTarget, timeout); err == nil { + pingResult = int(time.Since(start).Milliseconds()) + } + case "tcp": + start := time.Now() + if err := tcpPing(pingTarget, timeout); err == nil { + pingResult = int(time.Since(start).Milliseconds()) + } + case "http": + if latency, err := httpPing(pingTarget, timeout); err == nil { + pingResult = int(latency) + } + default: + return + } + payload := map[string]interface{}{ + "type": "ping_result", + "task_id": taskID, + "value": pingResult, + "finished_at": time.Now(), + } + _ = conn.WriteJSON(payload) +} diff --git a/server/websocket.go b/server/websocket.go index 8394e59..3263c38 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -11,6 +11,7 @@ import ( "github.com/komari-monitor/komari-agent/cmd/flags" "github.com/komari-monitor/komari-agent/monitoring" "github.com/komari-monitor/komari-agent/terminal" + "github.com/komari-monitor/komari-agent/ws" ) func EstablishWebSocketConnection() { @@ -18,7 +19,7 @@ func EstablishWebSocketConnection() { websocketEndpoint := strings.TrimSuffix(flags.Endpoint, "/") + "/api/clients/report?token=" + flags.Token websocketEndpoint = "ws" + strings.TrimPrefix(websocketEndpoint, "http") - var conn *websocket.Conn + var conn *ws.SafeConn defer func() { if conn != nil { conn.Close() @@ -73,7 +74,7 @@ func EstablishWebSocketConnection() { } } -func connectWebSocket(websocketEndpoint string) (*websocket.Conn, error) { +func connectWebSocket(websocketEndpoint string) (*ws.SafeConn, error) { dialer := &websocket.Dialer{ HandshakeTimeout: 5 * time.Second, } @@ -85,11 +86,10 @@ func connectWebSocket(websocketEndpoint string) (*websocket.Conn, error) { return nil, err } - return conn, nil + return ws.NewSafeConn(conn), nil } -func handleWebSocketMessages(conn *websocket.Conn, done chan<- struct{}) { - +func handleWebSocketMessages(conn *ws.SafeConn, done chan<- struct{}) { defer close(done) for { _, message_raw, err := conn.ReadMessage() @@ -104,6 +104,10 @@ func handleWebSocketMessages(conn *websocket.Conn, done chan<- struct{}) { // Remote Exec ExecCommand string `json:"command,omitempty"` ExecTaskID string `json:"task_id,omitempty"` + // Ping + PingTaskID uint `json:"ping_task_id,omitempty"` + PingType string `json:"ping_type,omitempty"` + PingTarget string `json:"ping_target,omitempty"` } err = json.Unmarshal(message_raw, &message) if err != nil { @@ -119,7 +123,10 @@ func handleWebSocketMessages(conn *websocket.Conn, done chan<- struct{}) { go NewTask(message.ExecTaskID, message.ExecCommand) continue } - + if message.Message == "ping" || message.PingTaskID != 0 || message.PingType != "" || message.PingTarget != "" { + go NewPingTask(conn, message.PingTaskID, message.PingType, message.PingTarget) + continue + } } } diff --git a/ws/safaConn.go b/ws/safaConn.go new file mode 100644 index 0000000..54e1e7c --- /dev/null +++ b/ws/safaConn.go @@ -0,0 +1,58 @@ +package ws + +import ( + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type SafeConn struct { + conn *websocket.Conn + mu sync.Mutex +} + +func NewSafeConn(conn *websocket.Conn) *SafeConn { + return &SafeConn{ + conn: conn, + mu: sync.Mutex{}, + } +} + +func (sc *SafeConn) WriteMessage(messageType int, data []byte) error { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.conn.WriteMessage(messageType, data) +} + +func (sc *SafeConn) WriteJSON(v interface{}) error { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.conn.WriteJSON(v) +} + +func (sc *SafeConn) Close() error { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.conn.Close() +} +func (sc *SafeConn) ReadMessage() (int, []byte, error) { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.conn.ReadMessage() +} +func (sc *SafeConn) ReadJSON(v interface{}) error { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.conn.ReadJSON(v) +} +func (sc *SafeConn) SetReadDeadline(t time.Time) error { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.conn.SetReadDeadline(t) +} +func (sc *SafeConn) GetConn() *websocket.Conn { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.conn +}