update logic for batch updating servers

This commit is contained in:
Henry Dollman
2024-07-23 15:47:15 -04:00
parent f6967eab35
commit 78dc269538

View File

@@ -15,16 +15,17 @@ import (
"net/url" "net/url"
"os" "os"
"strings" "strings"
"sync"
"time" "time"
"github.com/labstack/echo/v5" "github.com/labstack/echo/v5"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase" "github.com/pocketbase/pocketbase"
"github.com/pocketbase/pocketbase/apis" "github.com/pocketbase/pocketbase/apis"
"github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/models" "github.com/pocketbase/pocketbase/models"
"github.com/pocketbase/pocketbase/plugins/migratecmd" "github.com/pocketbase/pocketbase/plugins/migratecmd"
"github.com/pocketbase/pocketbase/tools/cron" "github.com/pocketbase/pocketbase/tools/cron"
"github.com/pocketbase/pocketbase/tools/types"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
) )
@@ -32,7 +33,8 @@ import (
var Version = "0.0.1-alpha.6" var Version = "0.0.1-alpha.6"
var app *pocketbase.PocketBase var app *pocketbase.PocketBase
var serverConnections = make(map[string]Server) var serverConnections = make(map[string]*Server)
var serverConnectionsLock = sync.Mutex{}
func main() { func main() {
app = pocketbase.NewWithConfig(pocketbase.Config{ app = pocketbase.NewWithConfig(pocketbase.Config{
@@ -207,49 +209,53 @@ func startSystemUpdateTicker() {
} }
func updateSystems() { func updateSystems() {
// handle max of 1/3 + 1 servers at a time
numServers := len(serverConnections)/3 + 1
// find systems that are not paused and updated more than 58 seconds ago
fiftyEightSecondsAgo := time.Now().UTC().Add(-58 * time.Second).Format("2006-01-02 15:04:05")
records, err := app.Dao().FindRecordsByFilter( records, err := app.Dao().FindRecordsByFilter(
"2hz5ncl8tizk5nx", // collection "2hz5ncl8tizk5nx", // collection
"status != 'paused' && updated < {:updated}", // filter "status != 'paused'", // filter
"updated", // sort "updated", // sort
numServers, // limit -1, // limit
0, // offset 0, // offset
dbx.Params{"updated": fiftyEightSecondsAgo},
) )
// log.Println("records", len(records))
if err != nil { if err != nil {
app.Logger().Error("Failed to query systems: ", "err", err.Error()) app.Logger().Error("Failed to query systems: ", "err", err.Error())
return return
} }
for _, record := range records { fiftyFiveSecondsAgo := time.Now().UTC().Add(-55 * time.Second)
updateSystem(record) batchSize := len(records)/4 + 1
for i := 0; i < batchSize; i++ {
if records[i].Get("updated").(types.DateTime).Time().After(fiftyFiveSecondsAgo) {
break
}
// log.Println("updating", records[i].Get(("name")))
go updateSystem(records[i])
} }
} }
func updateSystem(record *models.Record) { func updateSystem(record *models.Record) {
var server Server var server *Server
// check if server connection data exists // check if server connection data exists
if _, ok := serverConnections[record.Id]; ok { if _, ok := serverConnections[record.Id]; ok {
server = serverConnections[record.Id] server = serverConnections[record.Id]
} else { } else {
// create server connection struct // create server connection struct
server = Server{ server = &Server{
Host: record.Get("host").(string), Host: record.Get("host").(string),
Port: record.Get("port").(string), Port: record.Get("port").(string),
} }
client, err := getServerConnection(&server) client, err := getServerConnection(server)
if err != nil { if err != nil {
app.Logger().Error("Failed to connect:", "err", err.Error(), "server", server.Host, "port", server.Port) app.Logger().Error("Failed to connect:", "err", err.Error(), "server", server.Host, "port", server.Port)
updateServerStatus(record, "down") updateServerStatus(record, "down")
return return
} }
server.Client = client server.Client = client
serverConnectionsLock.Lock()
serverConnections[record.Id] = server serverConnections[record.Id] = server
serverConnectionsLock.Unlock()
} }
// get server stats from agent // get server stats from agent
systemData, err := requestJson(&server) systemData, err := requestJson(server)
if err != nil { if err != nil {
if err.Error() == "retry" { if err.Error() == "retry" {
// if previous connection was closed, try again // if previous connection was closed, try again
@@ -310,6 +316,8 @@ func deleteServerConnection(record *models.Record) {
if serverConnections[record.Id].Client != nil { if serverConnections[record.Id].Client != nil {
serverConnections[record.Id].Client.Close() serverConnections[record.Id].Client.Close()
} }
serverConnectionsLock.Lock()
defer serverConnectionsLock.Unlock()
delete(serverConnections, record.Id) delete(serverConnections, record.Id)
} }
} }