This commit is contained in:
Henry Dollman
2024-07-19 15:32:57 -04:00
parent 2c26368460
commit 8a6b25ddae
5 changed files with 100 additions and 42 deletions

53
main.go
View File

@@ -160,7 +160,7 @@ func main() {
// immediately create connection for new servers
app.OnModelAfterCreate("systems").Add(func(e *core.ModelEvent) error {
go updateServer(e.Model.(*models.Record))
go updateSystem(e.Model.(*models.Record))
return nil
})
@@ -175,10 +175,11 @@ func main() {
deleteServerConnection(newRecord)
}
// if server is set to pending (unpause), try to connect
if newStatus == "pending" {
go updateServer(newRecord)
}
// if server is set to pending (unpause), try to connect immediately
// commenting out because we don't want to get off of the one min schedule
// if newStatus == "pending" {
// go updateSystem(newRecord)
// }
// alerts
handleStatusAlerts(newStatus, oldRecord)
@@ -198,33 +199,35 @@ func main() {
}
func serverUpdateTicker() {
ticker := time.NewTicker(60 * time.Second)
ticker := time.NewTicker(15 * time.Second)
for range ticker.C {
updateServers()
updateSystems()
}
}
func updateServers() {
// serverCount := len(serverConnections)
// fmt.Println("server count: ", serverCount)
query := app.Dao().RecordQuery("systems").
Where(dbx.NewExp("status != \"paused\"")).
OrderBy("updated ASC").
// todo get total count of servers and divide by 4 or something
Limit(5)
records := []*models.Record{}
if err := query.All(&records); err != nil {
app.Logger().Error("Failed to get servers: ", "err", err.Error())
// return nil, err
func updateSystems() {
// handle max of 1/3 + 1 servers at a time
numServers := len(serverConnections)/3 + 1
// find systems that are not paused and updated more than 58 seconds ago
fiftyEightSecondsAgo := time.Now().UTC().Add(-58 * time.Second).Format("2006-01-02 15:04:05")
records, err := app.Dao().FindRecordsByFilter(
"2hz5ncl8tizk5nx", // collection
"status != 'paused' && updated < {:updated}", // filter
"updated", // sort
numServers, // limit
0, // offset
dbx.Params{"updated": fiftyEightSecondsAgo},
)
if err != nil {
app.Logger().Error("Failed to query systems: ", "err", err.Error())
return
}
for _, record := range records {
updateServer(record)
updateSystem(record)
}
}
func updateServer(record *models.Record) {
func updateSystem(record *models.Record) {
var server Server
// check if server connection data exists
if _, ok := serverConnections[record.Id]; ok {
@@ -251,7 +254,7 @@ func updateServer(record *models.Record) {
// if previous connection was closed, try again
app.Logger().Error("Existing SSH connection closed. Retrying...", "host", server.Host, "port", server.Port)
deleteServerConnection(record)
updateServer(record)
updateSystem(record)
return
}
app.Logger().Error("Failed to get server stats: ", "err", err.Error())
@@ -269,6 +272,7 @@ func updateServer(record *models.Record) {
system_stats_record := models.NewRecord(system_stats)
system_stats_record.Set("system", record.Id)
system_stats_record.Set("stats", systemData.Stats)
system_stats_record.Set("type", "1m")
if err := app.Dao().SaveRecord(system_stats_record); err != nil {
app.Logger().Error("Failed to save record: ", "err", err.Error())
}
@@ -278,6 +282,7 @@ func updateServer(record *models.Record) {
container_stats_record := models.NewRecord(container_stats)
container_stats_record.Set("system", record.Id)
container_stats_record.Set("stats", systemData.Containers)
container_stats_record.Set("type", "1m")
if err := app.Dao().SaveRecord(container_stats_record); err != nil {
app.Logger().Error("Failed to save record: ", "err", err.Error())
}