Simplify Workers

This commit is contained in:
ferreo 2024-07-30 01:30:26 +01:00
parent 8f7657cf38
commit fd9e9418b2
2 changed files with 39 additions and 63 deletions

View File

@ -82,15 +82,7 @@ func StartPackageQueueWorker(ctx context.Context) {
}() }()
} }
func StartQueueWorker(ctx context.Context) { func processQueueAndStatus(ctx context.Context) {
go processQueue(ctx)
}
func StartStatusWorker(ctx context.Context) {
go processStatus(ctx)
}
func processStatus(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -98,74 +90,59 @@ func processStatus(ctx context.Context) {
default: default:
q := GetQueue() q := GetQueue()
itemsToRemove := make([]string, 0) itemsToRemove := make([]string, 0)
buildingFound := false
q.ForEach(func(k string, item domain.BuildQueueItem) bool { q.ForEach(func(k string, item domain.BuildQueueItem) bool {
if item.Status != domain.Building { if item.Status == domain.Building {
return true buildingFound = true
}
complete, err := CheckIfBuildComplete(ctx, item) complete, err := CheckIfBuildComplete(ctx, item)
if err != nil && !complete { if err != nil && !complete {
slog.Error("unable to check if build is complete: " + err.Error()) slog.Error("unable to check if build is complete: " + err.Error())
} }
if complete { if complete {
if err != nil { if err != nil {
item.Source.Packages.ForEach(func(k string, v domain.PackageInfo) bool { updatePackageStatus(&item, domain.Error, domain.Error)
v.Status = domain.Error } else {
v.LastBuildStatus = domain.Error updatePackageStatus(&item, domain.Current, domain.Built)
item.Source.Packages.Set(k, v)
return true
})
packages.UpdateSourcePackage(item.Source)
itemsToRemove = append(itemsToRemove, k)
return true
} }
item.Source.Packages.ForEach(func(k string, v domain.PackageInfo) bool {
v.Status = domain.Current
v.LastBuildStatus = domain.Built
v.Version = item.BuildVersion
v.NewVersion = ""
item.Source.Packages.Set(k, v)
return true
})
packages.UpdateSourcePackage(item.Source) packages.UpdateSourcePackage(item.Source)
itemsToRemove = append(itemsToRemove, k) itemsToRemove = append(itemsToRemove, k)
return true }
} }
return true return true
}) })
for _, item := range itemsToRemove { for _, item := range itemsToRemove {
Remove(item) Remove(item)
} }
time.Sleep(10 * time.Second)
}
}
}
func processQueue(ctx context.Context) { if !buildingFound {
for {
select {
case <-ctx.Done():
return
default:
q := GetQueue()
buildingFound := false
q.ForEach(func(k string, item domain.BuildQueueItem) bool {
if item.Status == domain.Building {
buildingFound = true
return false
}
return true
})
if buildingFound {
time.Sleep(30 * time.Second)
continue
}
err := ProcessNext() err := ProcessNext()
if err != nil { if err != nil {
slog.Error("unable to process queue: " + err.Error()) slog.Error("unable to process queue: " + err.Error())
} }
} }
time.Sleep(30 * time.Second)
time.Sleep(10 * time.Second)
} }
}
}
func updatePackageStatus(item *domain.BuildQueueItem, status domain.PackageStatus, buildStatus domain.PackageStatus) {
item.Source.Packages.ForEach(func(k string, v domain.PackageInfo) bool {
v.Status = status
v.LastBuildStatus = domain.PackageStatus(buildStatus)
if status == domain.Current {
v.Version = item.BuildVersion
v.NewVersion = ""
}
item.Source.Packages.Set(k, v)
return true
})
}
func StartQueueAndStatusWorker(ctx context.Context) {
go processQueueAndStatus(ctx)
} }
func CheckIfBuildComplete(ctx context.Context, item domain.BuildQueueItem) (bool, error) { func CheckIfBuildComplete(ctx context.Context, item domain.BuildQueueItem) (bool, error) {

View File

@ -65,8 +65,7 @@ func runServer(ctx context.Context) error {
slog.Info("packages loaded in " + time.Since(start).String()) slog.Info("packages loaded in " + time.Since(start).String())
buildqueue.StartPackageQueueWorker(ctx) buildqueue.StartPackageQueueWorker(ctx)
buildqueue.StartQueueWorker(ctx) buildqueue.StartQueueAndStatusWorker(ctx)
buildqueue.StartStatusWorker(ctx)
cfg := fiber.Config{ cfg := fiber.Config{
JSONEncoder: json.Marshal, JSONEncoder: json.Marshal,