From 78dc26953868eb1ab5f5abc64dc13f935a2a1a05 Mon Sep 17 00:00:00 2001 From: Henry Dollman Date: Tue, 23 Jul 2024 15:47:15 -0400 Subject: [PATCH] update logic for batch updating servers --- hub/main.go | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/hub/main.go b/hub/main.go index 5f41edb..ece9d63 100644 --- a/hub/main.go +++ b/hub/main.go @@ -15,16 +15,17 @@ import ( "net/url" "os" "strings" + "sync" "time" "github.com/labstack/echo/v5" - "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase" "github.com/pocketbase/pocketbase/apis" "github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/models" "github.com/pocketbase/pocketbase/plugins/migratecmd" "github.com/pocketbase/pocketbase/tools/cron" + "github.com/pocketbase/pocketbase/tools/types" "github.com/spf13/cobra" "golang.org/x/crypto/ssh" ) @@ -32,7 +33,8 @@ import ( var Version = "0.0.1-alpha.6" var app *pocketbase.PocketBase -var serverConnections = make(map[string]Server) +var serverConnections = make(map[string]*Server) +var serverConnectionsLock = sync.Mutex{} func main() { app = pocketbase.NewWithConfig(pocketbase.Config{ @@ -207,49 +209,53 @@ func startSystemUpdateTicker() { } func updateSystems() { - // handle max of 1/3 + 1 servers at a time - numServers := len(serverConnections)/3 + 1 - // find systems that are not paused and updated more than 58 seconds ago - fiftyEightSecondsAgo := time.Now().UTC().Add(-58 * time.Second).Format("2006-01-02 15:04:05") records, err := app.Dao().FindRecordsByFilter( - "2hz5ncl8tizk5nx", // collection - "status != 'paused' && updated < {:updated}", // filter - "updated", // sort - numServers, // limit - 0, // offset - dbx.Params{"updated": fiftyEightSecondsAgo}, + "2hz5ncl8tizk5nx", // collection + "status != 'paused'", // filter + "updated", // sort + -1, // limit + 0, // offset ) + // log.Println("records", len(records)) if err != nil { app.Logger().Error("Failed to query systems: ", "err", err.Error()) return } - for _, record := range records { - updateSystem(record) + fiftyFiveSecondsAgo := time.Now().UTC().Add(-55 * time.Second) + batchSize := len(records)/4 + 1 + for i := 0; i < batchSize; i++ { + if records[i].Get("updated").(types.DateTime).Time().After(fiftyFiveSecondsAgo) { + break + } + // log.Println("updating", records[i].Get(("name"))) + go updateSystem(records[i]) } } func updateSystem(record *models.Record) { - var server Server + var server *Server // check if server connection data exists if _, ok := serverConnections[record.Id]; ok { server = serverConnections[record.Id] } else { // create server connection struct - server = Server{ + server = &Server{ Host: record.Get("host").(string), Port: record.Get("port").(string), } - client, err := getServerConnection(&server) + client, err := getServerConnection(server) if err != nil { app.Logger().Error("Failed to connect:", "err", err.Error(), "server", server.Host, "port", server.Port) updateServerStatus(record, "down") return } server.Client = client + serverConnectionsLock.Lock() serverConnections[record.Id] = server + serverConnectionsLock.Unlock() } // get server stats from agent - systemData, err := requestJson(&server) + systemData, err := requestJson(server) if err != nil { if err.Error() == "retry" { // if previous connection was closed, try again @@ -310,6 +316,8 @@ func deleteServerConnection(record *models.Record) { if serverConnections[record.Id].Client != nil { serverConnections[record.Id].Client.Close() } + serverConnectionsLock.Lock() + defer serverConnectionsLock.Unlock() delete(serverConnections, record.Id) } }