From b9950ade44f3f52a42673c138c6f0b3891556a0e Mon Sep 17 00:00:00 2001 From: ferreo Date: Mon, 29 Jul 2024 23:38:23 +0100 Subject: [PATCH] Fix concurency and db load --- buildqueue/queue.go | 40 +++++++++++++++++++++++++++++++++++-- buildqueue/worker.go | 44 ++++++++++++++++++++--------------------- db/repository.go | 39 +++++++++++++++++++++++++++--------- fastmap/fastmap.go | 28 -------------------------- handlers/build/queue.go | 2 +- server.go | 4 +++- 6 files changed, 93 insertions(+), 64 deletions(-) diff --git a/buildqueue/queue.go b/buildqueue/queue.go index 3ec81f4..f64bbb0 100644 --- a/buildqueue/queue.go +++ b/buildqueue/queue.go @@ -3,13 +3,17 @@ package buildqueue import ( "brunel/domain" "brunel/fastmap" + "sync" "errors" ) var queue = fastmap.New[string, domain.BuildQueueItem]() +var queueLock = sync.RWMutex{} func Add(buildItem domain.BuildQueueItem) error { + queueLock.Lock() + defer queueLock.Unlock() if _, ok := queue.Get(buildItem.Source.Name); ok { return errors.New("package already in queue") } @@ -19,11 +23,15 @@ func Add(buildItem domain.BuildQueueItem) error { } func Get(name string) (domain.BuildQueueItem, bool) { + queueLock.RLock() + defer queueLock.RUnlock() item, ok := queue.Get(name) return item, ok } func Remove(name string) error { + queueLock.Lock() + defer queueLock.Unlock() _, ok := queue.Get(name) if !ok { return errors.New("package not in queue") @@ -33,6 +41,8 @@ func Remove(name string) error { } func Update(buildItem domain.BuildQueueItem) error { + queueLock.Lock() + defer queueLock.Unlock() item, ok := queue.Get(buildItem.Source.Name) if !ok { return errors.New("package not in queue") @@ -44,11 +54,33 @@ func Update(buildItem domain.BuildQueueItem) error { return nil } -func GetQueue() *fastmap.Fastmap[string, domain.BuildQueueItem] { - return queue +func GetQueueCopy() *fastmap.Fastmap[string, domain.BuildQueueItem] { + queueLock.RLock() + defer queueLock.RUnlock() + returnQueue := fastmap.New[string, domain.BuildQueueItem]() + queue.Iter(func(k string, v domain.BuildQueueItem) bool { + returnQueue.Set(k, v) + return true + }) + return returnQueue +} + +func GetBuildingQueue() *fastmap.Fastmap[string, domain.BuildQueueItem] { + queueLock.RLock() + defer queueLock.RUnlock() + returnQueue := fastmap.New[string, domain.BuildQueueItem]() + queue.Iter(func(k string, v domain.BuildQueueItem) bool { + if v.Status == domain.Building { + returnQueue.Set(k, v) + } + return true + }) + return returnQueue } func GetCounts() domain.BuildQueueCount { + queueLock.RLock() + defer queueLock.RUnlock() count := domain.BuildQueueCount{ Queued: 0, Building: 0, @@ -65,6 +97,7 @@ func GetCounts() domain.BuildQueueCount { } func ProcessNext() error { + queueLock.RLock() var item *domain.BuildQueueItem queue.Iter(func(k string, v domain.BuildQueueItem) bool { if v.Status == domain.Queued { @@ -76,10 +109,13 @@ func ProcessNext() error { if item == nil { return errors.New("no packages in queue") } + queueLock.RUnlock() err := UpdateBuildFile(*item) if err != nil { return err } + queueLock.Lock() + defer queueLock.Unlock() item.Status = domain.Building err = Update(*item) return err diff --git a/buildqueue/worker.go b/buildqueue/worker.go index c30351b..4533187 100644 --- a/buildqueue/worker.go +++ b/buildqueue/worker.go @@ -92,31 +92,18 @@ func processStatus(ctx context.Context) { case <-ctx.Done(): return default: - q := GetQueue() + q := GetBuildingQueue() itemsToRemove := make([]string, 0) q.Iter(func(k string, item domain.BuildQueueItem) bool { - if item.Status == domain.Building { - complete, err := CheckIfBuildComplete(ctx, item) - if err != nil && !complete { - slog.Error("unable to check if build is complete: " + err.Error()) - } - if complete { - if err != nil { - item.Source.Packages.Iter(func(k string, v domain.PackageInfo) bool { - v.Status = domain.Error - v.LastBuildStatus = domain.Error - item.Source.Packages.Set(k, v) - return true - }) - packages.UpdateSourcePackage(item.Source) - itemsToRemove = append(itemsToRemove, k) - return true - } + complete, err := CheckIfBuildComplete(ctx, item) + if err != nil && !complete { + slog.Error("unable to check if build is complete: " + err.Error()) + } + if complete { + if err != nil { item.Source.Packages.Iter(func(k string, v domain.PackageInfo) bool { - v.Status = domain.Built - v.LastBuildStatus = domain.Built - v.Version = item.BuildVersion - v.NewVersion = "" + v.Status = domain.Error + v.LastBuildStatus = domain.Error item.Source.Packages.Set(k, v) return true }) @@ -124,13 +111,24 @@ func processStatus(ctx context.Context) { itemsToRemove = append(itemsToRemove, k) return true } + item.Source.Packages.Iter(func(k string, v domain.PackageInfo) bool { + v.Status = domain.Built + v.LastBuildStatus = domain.Built + v.Version = item.BuildVersion + v.NewVersion = "" + item.Source.Packages.Set(k, v) + return true + }) + packages.UpdateSourcePackage(item.Source) + itemsToRemove = append(itemsToRemove, k) + return true } return true }) for _, item := range itemsToRemove { Remove(item) } - time.Sleep(5 * time.Second) + time.Sleep(10 * time.Second) } } } diff --git a/db/repository.go b/db/repository.go index b99f20a..160c9cb 100644 --- a/db/repository.go +++ b/db/repository.go @@ -7,6 +7,7 @@ import ( "github.com/jinzhu/now" "gorm.io/gorm" + "gorm.io/gorm/clause" ) const MaxBatchSize = 1999 @@ -65,16 +66,36 @@ func (r *Repository) UpdatePackage(pkg domain.SourcePackage) error { } func (r *Repository) GetPackages() ([]domain.SourcePackage, error) { - var packages []domain.SourcePackageDTO - tx := r.db.Find(&packages) - if tx.Error != nil { - return nil, tx.Error + var allPackages []domain.SourcePackage + offset := 0 + + for { + var batchPackages []domain.SourcePackageDTO + + tx := r.db.Preload(clause.Associations). + Offset(offset). + Limit(MaxBatchSize). + Find(&batchPackages) + + if tx.Error != nil { + return nil, tx.Error + } + + // Convert DTOs to domain objects + for _, v := range batchPackages { + allPackages = append(allPackages, sourcePackageDtoToDomain(v)) + } + + // If we've retrieved fewer records than MaxBatchSize, we're done + if len(batchPackages) < MaxBatchSize { + break + } + + // Move to the next batch + offset += MaxBatchSize } - var output []domain.SourcePackage - for _, v := range packages { - output = append(output, sourcePackageDtoToDomain(v)) - } - return output, nil + + return allPackages, nil } func (r *Repository) GetPackage(name string) (domain.SourcePackage, error) { diff --git a/fastmap/fastmap.go b/fastmap/fastmap.go index 4eb6750..0d24c95 100644 --- a/fastmap/fastmap.go +++ b/fastmap/fastmap.go @@ -3,7 +3,6 @@ package fastmap import ( "fmt" "strings" - "sync" "slices" @@ -11,7 +10,6 @@ import ( ) type Fastmap[K comparable, V any] struct { - mu sync.RWMutex idx map[K]int store []fastmapValue[K, V] } @@ -29,8 +27,6 @@ func New[K comparable, V any]() *Fastmap[K, V] { } func (m *Fastmap[K, V]) Set(key K, value V) { - m.mu.Lock() - defer m.mu.Unlock() if _, ok := m.idx[key]; ok { m.store[m.idx[key]].Value = value return @@ -40,8 +36,6 @@ func (m *Fastmap[K, V]) Set(key K, value V) { } func (m *Fastmap[K, V]) Get(key K) (value V, ok bool) { - m.mu.RLock() - defer m.mu.RUnlock() idx, ok := m.idx[key] if !ok { return @@ -50,8 +44,6 @@ func (m *Fastmap[K, V]) Get(key K) (value V, ok bool) { } func (m *Fastmap[K, V]) Delete(key K) { - m.mu.Lock() - defer m.mu.Unlock() idx, ok := m.idx[key] if !ok { return @@ -62,21 +54,15 @@ func (m *Fastmap[K, V]) Delete(key K) { } func (m *Fastmap[K, V]) Has(key K) bool { - m.mu.RLock() - defer m.mu.RUnlock() _, ok := m.idx[key] return ok } func (m *Fastmap[K, V]) Len() int { - m.mu.RLock() - defer m.mu.RUnlock() return len(m.idx) } func (m *Fastmap[K, V]) GetPage(pageNum int, pageSize int) *Fastmap[K, V] { - m.mu.RLock() - defer m.mu.RUnlock() start := pageSize * pageNum end := start + pageSize if end > len(m.store) { @@ -91,15 +77,11 @@ func (m *Fastmap[K, V]) GetPage(pageNum int, pageSize int) *Fastmap[K, V] { } func (m *Fastmap[K, V]) Clear() { - m.mu.Lock() - defer m.mu.Unlock() m.idx = make(map[K]int) m.store = make([]fastmapValue[K, V], 0) } func (m *Fastmap[K, V]) Iter(fn func(key K, value V) bool) { - m.mu.RLock() - defer m.mu.RUnlock() for _, v := range m.store { if !fn(v.Key, v.Value) { break @@ -108,8 +90,6 @@ func (m *Fastmap[K, V]) Iter(fn func(key K, value V) bool) { } func (m *Fastmap[K, V]) StableSortByKey() { - m.mu.Lock() - defer m.mu.Unlock() slices.SortStableFunc(m.store, func(a, b fastmapValue[K, V]) int { aKey := fmt.Sprint(a.Key) bKey := fmt.Sprint(b.Key) @@ -123,8 +103,6 @@ func (m *Fastmap[K, V]) StableSortByKey() { } func (m *Fastmap[K, V]) MarshalText() ([]byte, error) { - m.mu.RLock() - defer m.mu.RUnlock() var builder strings.Builder for _, v := range m.store { builder.WriteString(fmt.Sprintf("%v:%v\n", v.Key, v.Value)) @@ -133,8 +111,6 @@ func (m *Fastmap[K, V]) MarshalText() ([]byte, error) { } func (m *Fastmap[K, V]) UnmarshalText(text []byte) error { - m.mu.Lock() - defer m.mu.Unlock() m.Clear() lines := strings.Split(string(text), "\n") for _, line := range lines { @@ -159,8 +135,6 @@ func (m *Fastmap[K, V]) UnmarshalText(text []byte) error { } func (m *Fastmap[K, V]) MarshalJSON() ([]byte, error) { - m.mu.RLock() - defer m.mu.RUnlock() temp := make(map[K]V) for _, v := range m.store { temp[v.Key] = v.Value @@ -169,8 +143,6 @@ func (m *Fastmap[K, V]) MarshalJSON() ([]byte, error) { } func (m *Fastmap[K, V]) UnmarshalJSON(data []byte) error { - m.mu.Lock() - defer m.mu.Unlock() temp := make(map[K]V) if err := json.Unmarshal(data, &temp); err != nil { return err diff --git a/handlers/build/queue.go b/handlers/build/queue.go index fd23026..82138fb 100644 --- a/handlers/build/queue.go +++ b/handlers/build/queue.go @@ -26,7 +26,7 @@ func Queue(c *fiber.Ctx) error { adjustedPageNum = 0 } - packs := buildqueue.GetQueue() + packs := buildqueue.GetQueueCopy() packs.StableSortByKey() finalReturn := fastmap.New[string, domain.BuildQueueItem]() diff --git a/server.go b/server.go index 076e06e..9f83141 100644 --- a/server.go +++ b/server.go @@ -56,11 +56,13 @@ func runServer(ctx context.Context) error { } }() + start := time.Now() err = packages.LoadFromDb() if err != nil { slog.Error("unable to load packages from db: " + err.Error()) return err } + slog.Info("packages loaded in " + time.Since(start).String()) buildqueue.StartPackageQueueWorker(ctx) buildqueue.StartQueueWorker(ctx) @@ -91,7 +93,7 @@ func runServer(ctx context.Context) error { server.Get("/api/counts", handlers_packages.Counts) server.Get("/api/packages", handlers_packages.Packages) - server.Get("/api/buildqueue", handlers_build.Queue) + server.Get("/api/queue", handlers_build.Queue) server.Post("/api/login", handlers_auth.Login) adminRoutes.Post("/register", handlers_auth.Register)