Fix concurrency again

This commit is contained in:
ferreo 2024-07-29 23:49:30 +01:00
parent b9950ade44
commit 9ac297d0c8
5 changed files with 28 additions and 50 deletions

View File

@ -2,18 +2,15 @@ package buildqueue
import ( import (
"brunel/domain" "brunel/domain"
"brunel/fastmap"
"sync"
"errors" "errors"
"github.com/alphadose/haxmap"
) )
var queue = fastmap.New[string, domain.BuildQueueItem]() var queue = haxmap.New[string, domain.BuildQueueItem]()
var queueLock = sync.RWMutex{}
func Add(buildItem domain.BuildQueueItem) error { func Add(buildItem domain.BuildQueueItem) error {
queueLock.Lock()
defer queueLock.Unlock()
if _, ok := queue.Get(buildItem.Source.Name); ok { if _, ok := queue.Get(buildItem.Source.Name); ok {
return errors.New("package already in queue") return errors.New("package already in queue")
} }
@ -23,26 +20,20 @@ func Add(buildItem domain.BuildQueueItem) error {
} }
func Get(name string) (domain.BuildQueueItem, bool) { func Get(name string) (domain.BuildQueueItem, bool) {
queueLock.RLock()
defer queueLock.RUnlock()
item, ok := queue.Get(name) item, ok := queue.Get(name)
return item, ok return item, ok
} }
func Remove(name string) error { func Remove(name string) error {
queueLock.Lock()
defer queueLock.Unlock()
_, ok := queue.Get(name) _, ok := queue.Get(name)
if !ok { if !ok {
return errors.New("package not in queue") return errors.New("package not in queue")
} }
queue.Delete(name) queue.Del(name)
return nil return nil
} }
func Update(buildItem domain.BuildQueueItem) error { func Update(buildItem domain.BuildQueueItem) error {
queueLock.Lock()
defer queueLock.Unlock()
item, ok := queue.Get(buildItem.Source.Name) item, ok := queue.Get(buildItem.Source.Name)
if !ok { if !ok {
return errors.New("package not in queue") return errors.New("package not in queue")
@ -54,38 +45,16 @@ func Update(buildItem domain.BuildQueueItem) error {
return nil return nil
} }
func GetQueueCopy() *fastmap.Fastmap[string, domain.BuildQueueItem] { func GetQueue() *haxmap.Map[string, domain.BuildQueueItem] {
queueLock.RLock() return queue
defer queueLock.RUnlock()
returnQueue := fastmap.New[string, domain.BuildQueueItem]()
queue.Iter(func(k string, v domain.BuildQueueItem) bool {
returnQueue.Set(k, v)
return true
})
return returnQueue
}
func GetBuildingQueue() *fastmap.Fastmap[string, domain.BuildQueueItem] {
queueLock.RLock()
defer queueLock.RUnlock()
returnQueue := fastmap.New[string, domain.BuildQueueItem]()
queue.Iter(func(k string, v domain.BuildQueueItem) bool {
if v.Status == domain.Building {
returnQueue.Set(k, v)
}
return true
})
return returnQueue
} }
func GetCounts() domain.BuildQueueCount { func GetCounts() domain.BuildQueueCount {
queueLock.RLock()
defer queueLock.RUnlock()
count := domain.BuildQueueCount{ count := domain.BuildQueueCount{
Queued: 0, Queued: 0,
Building: 0, Building: 0,
} }
queue.Iter(func(k string, v domain.BuildQueueItem) bool { queue.ForEach(func(k string, v domain.BuildQueueItem) bool {
if v.Status == domain.Queued { if v.Status == domain.Queued {
count.Queued++ count.Queued++
} else if v.Status == domain.Building { } else if v.Status == domain.Building {
@ -97,9 +66,9 @@ func GetCounts() domain.BuildQueueCount {
} }
func ProcessNext() error { func ProcessNext() error {
queueLock.RLock()
var item *domain.BuildQueueItem var item *domain.BuildQueueItem
queue.Iter(func(k string, v domain.BuildQueueItem) bool { queue.ForEach(func(k string, v domain.BuildQueueItem) bool {
if v.Status == domain.Queued { if v.Status == domain.Queued {
item = &v item = &v
return false return false
@ -109,13 +78,10 @@ func ProcessNext() error {
if item == nil { if item == nil {
return errors.New("no packages in queue") return errors.New("no packages in queue")
} }
queueLock.RUnlock()
err := UpdateBuildFile(*item) err := UpdateBuildFile(*item)
if err != nil { if err != nil {
return err return err
} }
queueLock.Lock()
defer queueLock.Unlock()
item.Status = domain.Building item.Status = domain.Building
err = Update(*item) err = Update(*item)
return err return err

View File

@ -35,18 +35,22 @@ func StartPackageQueueWorker(ctx context.Context) {
if v.Status == domain.Current { if v.Status == domain.Current {
return true return true
} }
version := v.NewVersion
if version == "" {
version = v.Version
}
if v.LastBuildStatus == domain.Error { if v.LastBuildStatus == domain.Error {
errPreviously = true errPreviously = true
buildAttempt = 1 buildAttempt = 1
} }
if v.Status == domain.Missing { if v.Status == domain.Missing {
needsBuild = true needsBuild = true
buildVersion = v.NewVersion buildVersion = version
return false return false
} }
if v.Status == domain.Stale { if v.Status == domain.Stale {
needsBuild = true needsBuild = true
buildVersion = v.NewVersion buildVersion = version
return false return false
} }
return true return true
@ -92,9 +96,12 @@ func processStatus(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
q := GetBuildingQueue() q := GetQueue()
itemsToRemove := make([]string, 0) itemsToRemove := make([]string, 0)
q.Iter(func(k string, item domain.BuildQueueItem) bool { q.ForEach(func(k string, item domain.BuildQueueItem) bool {
if item.Status != domain.Building {
return true
}
complete, err := CheckIfBuildComplete(ctx, item) complete, err := CheckIfBuildComplete(ctx, item)
if err != nil && !complete { if err != nil && !complete {
slog.Error("unable to check if build is complete: " + err.Error()) slog.Error("unable to check if build is complete: " + err.Error())

2
go.mod
View File

@ -3,6 +3,7 @@ module brunel
go 1.22.5 go 1.22.5
require ( require (
github.com/alphadose/haxmap v1.4.0
github.com/go-git/go-git/v5 v5.12.0 github.com/go-git/go-git/v5 v5.12.0
github.com/goccy/go-json v0.10.3 github.com/goccy/go-json v0.10.3
github.com/gofiber/fiber/v2 v2.52.5 github.com/gofiber/fiber/v2 v2.52.5
@ -49,6 +50,7 @@ require (
github.com/xanzy/ssh-agent v0.3.3 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/mod v0.12.0 // indirect golang.org/x/mod v0.12.0 // indirect
golang.org/x/sys v0.18.0 // indirect golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect

4
go.sum
View File

@ -5,6 +5,8 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/ProtonMail/go-crypto v1.0.0 h1:LRuvITjQWX+WIfr930YHG2HNfjR1uOfyf5vE0kC2U78= github.com/ProtonMail/go-crypto v1.0.0 h1:LRuvITjQWX+WIfr930YHG2HNfjR1uOfyf5vE0kC2U78=
github.com/ProtonMail/go-crypto v1.0.0/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0= github.com/ProtonMail/go-crypto v1.0.0/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0=
github.com/alphadose/haxmap v1.4.0 h1:1yn+oGzy2THJj1DMuJBzRanE3sMnDAjJVbU0L31Jp3w=
github.com/alphadose/haxmap v1.4.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
@ -122,6 +124,8 @@ golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2Uz
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=

View File

@ -26,11 +26,10 @@ func Queue(c *fiber.Ctx) error {
adjustedPageNum = 0 adjustedPageNum = 0
} }
packs := buildqueue.GetQueueCopy() packs := buildqueue.GetQueue()
packs.StableSortByKey()
finalReturn := fastmap.New[string, domain.BuildQueueItem]() finalReturn := fastmap.New[string, domain.BuildQueueItem]()
packs.Iter(func(k string, source domain.BuildQueueItem) bool { packs.ForEach(func(k string, source domain.BuildQueueItem) bool {
matchesSearch := search == "" || strings.Contains(strings.ToLower(k), search) matchesSearch := search == "" || strings.Contains(strings.ToLower(k), search)
matchesFilter := filter == "" || source.Status == domain.BuildStatus(filter) matchesFilter := filter == "" || source.Status == domain.BuildStatus(filter)
source.Source.Packages.Iter(func(key string, value domain.PackageInfo) bool { source.Source.Packages.Iter(func(key string, value domain.PackageInfo) bool {