Fix concurency and db load

This commit is contained in:
ferreo 2024-07-29 23:38:23 +01:00
parent d02c6ae6ce
commit b9950ade44
6 changed files with 93 additions and 64 deletions

View File

@ -3,13 +3,17 @@ package buildqueue
import ( import (
"brunel/domain" "brunel/domain"
"brunel/fastmap" "brunel/fastmap"
"sync"
"errors" "errors"
) )
var queue = fastmap.New[string, domain.BuildQueueItem]() var queue = fastmap.New[string, domain.BuildQueueItem]()
var queueLock = sync.RWMutex{}
func Add(buildItem domain.BuildQueueItem) error { func Add(buildItem domain.BuildQueueItem) error {
queueLock.Lock()
defer queueLock.Unlock()
if _, ok := queue.Get(buildItem.Source.Name); ok { if _, ok := queue.Get(buildItem.Source.Name); ok {
return errors.New("package already in queue") return errors.New("package already in queue")
} }
@ -19,11 +23,15 @@ func Add(buildItem domain.BuildQueueItem) error {
} }
func Get(name string) (domain.BuildQueueItem, bool) { func Get(name string) (domain.BuildQueueItem, bool) {
queueLock.RLock()
defer queueLock.RUnlock()
item, ok := queue.Get(name) item, ok := queue.Get(name)
return item, ok return item, ok
} }
func Remove(name string) error { func Remove(name string) error {
queueLock.Lock()
defer queueLock.Unlock()
_, ok := queue.Get(name) _, ok := queue.Get(name)
if !ok { if !ok {
return errors.New("package not in queue") return errors.New("package not in queue")
@ -33,6 +41,8 @@ func Remove(name string) error {
} }
func Update(buildItem domain.BuildQueueItem) error { func Update(buildItem domain.BuildQueueItem) error {
queueLock.Lock()
defer queueLock.Unlock()
item, ok := queue.Get(buildItem.Source.Name) item, ok := queue.Get(buildItem.Source.Name)
if !ok { if !ok {
return errors.New("package not in queue") return errors.New("package not in queue")
@ -44,11 +54,33 @@ func Update(buildItem domain.BuildQueueItem) error {
return nil return nil
} }
func GetQueue() *fastmap.Fastmap[string, domain.BuildQueueItem] { func GetQueueCopy() *fastmap.Fastmap[string, domain.BuildQueueItem] {
return queue queueLock.RLock()
defer queueLock.RUnlock()
returnQueue := fastmap.New[string, domain.BuildQueueItem]()
queue.Iter(func(k string, v domain.BuildQueueItem) bool {
returnQueue.Set(k, v)
return true
})
return returnQueue
}
func GetBuildingQueue() *fastmap.Fastmap[string, domain.BuildQueueItem] {
queueLock.RLock()
defer queueLock.RUnlock()
returnQueue := fastmap.New[string, domain.BuildQueueItem]()
queue.Iter(func(k string, v domain.BuildQueueItem) bool {
if v.Status == domain.Building {
returnQueue.Set(k, v)
}
return true
})
return returnQueue
} }
func GetCounts() domain.BuildQueueCount { func GetCounts() domain.BuildQueueCount {
queueLock.RLock()
defer queueLock.RUnlock()
count := domain.BuildQueueCount{ count := domain.BuildQueueCount{
Queued: 0, Queued: 0,
Building: 0, Building: 0,
@ -65,6 +97,7 @@ func GetCounts() domain.BuildQueueCount {
} }
func ProcessNext() error { func ProcessNext() error {
queueLock.RLock()
var item *domain.BuildQueueItem var item *domain.BuildQueueItem
queue.Iter(func(k string, v domain.BuildQueueItem) bool { queue.Iter(func(k string, v domain.BuildQueueItem) bool {
if v.Status == domain.Queued { if v.Status == domain.Queued {
@ -76,10 +109,13 @@ func ProcessNext() error {
if item == nil { if item == nil {
return errors.New("no packages in queue") return errors.New("no packages in queue")
} }
queueLock.RUnlock()
err := UpdateBuildFile(*item) err := UpdateBuildFile(*item)
if err != nil { if err != nil {
return err return err
} }
queueLock.Lock()
defer queueLock.Unlock()
item.Status = domain.Building item.Status = domain.Building
err = Update(*item) err = Update(*item)
return err return err

View File

@ -92,31 +92,18 @@ func processStatus(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
q := GetQueue() q := GetBuildingQueue()
itemsToRemove := make([]string, 0) itemsToRemove := make([]string, 0)
q.Iter(func(k string, item domain.BuildQueueItem) bool { q.Iter(func(k string, item domain.BuildQueueItem) bool {
if item.Status == domain.Building { complete, err := CheckIfBuildComplete(ctx, item)
complete, err := CheckIfBuildComplete(ctx, item) if err != nil && !complete {
if err != nil && !complete { slog.Error("unable to check if build is complete: " + err.Error())
slog.Error("unable to check if build is complete: " + err.Error()) }
} if complete {
if complete { if err != nil {
if err != nil {
item.Source.Packages.Iter(func(k string, v domain.PackageInfo) bool {
v.Status = domain.Error
v.LastBuildStatus = domain.Error
item.Source.Packages.Set(k, v)
return true
})
packages.UpdateSourcePackage(item.Source)
itemsToRemove = append(itemsToRemove, k)
return true
}
item.Source.Packages.Iter(func(k string, v domain.PackageInfo) bool { item.Source.Packages.Iter(func(k string, v domain.PackageInfo) bool {
v.Status = domain.Built v.Status = domain.Error
v.LastBuildStatus = domain.Built v.LastBuildStatus = domain.Error
v.Version = item.BuildVersion
v.NewVersion = ""
item.Source.Packages.Set(k, v) item.Source.Packages.Set(k, v)
return true return true
}) })
@ -124,13 +111,24 @@ func processStatus(ctx context.Context) {
itemsToRemove = append(itemsToRemove, k) itemsToRemove = append(itemsToRemove, k)
return true return true
} }
item.Source.Packages.Iter(func(k string, v domain.PackageInfo) bool {
v.Status = domain.Built
v.LastBuildStatus = domain.Built
v.Version = item.BuildVersion
v.NewVersion = ""
item.Source.Packages.Set(k, v)
return true
})
packages.UpdateSourcePackage(item.Source)
itemsToRemove = append(itemsToRemove, k)
return true
} }
return true return true
}) })
for _, item := range itemsToRemove { for _, item := range itemsToRemove {
Remove(item) Remove(item)
} }
time.Sleep(5 * time.Second) time.Sleep(10 * time.Second)
} }
} }
} }

View File

@ -7,6 +7,7 @@ import (
"github.com/jinzhu/now" "github.com/jinzhu/now"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause"
) )
const MaxBatchSize = 1999 const MaxBatchSize = 1999
@ -65,16 +66,36 @@ func (r *Repository) UpdatePackage(pkg domain.SourcePackage) error {
} }
func (r *Repository) GetPackages() ([]domain.SourcePackage, error) { func (r *Repository) GetPackages() ([]domain.SourcePackage, error) {
var packages []domain.SourcePackageDTO var allPackages []domain.SourcePackage
tx := r.db.Find(&packages) offset := 0
if tx.Error != nil {
return nil, tx.Error for {
var batchPackages []domain.SourcePackageDTO
tx := r.db.Preload(clause.Associations).
Offset(offset).
Limit(MaxBatchSize).
Find(&batchPackages)
if tx.Error != nil {
return nil, tx.Error
}
// Convert DTOs to domain objects
for _, v := range batchPackages {
allPackages = append(allPackages, sourcePackageDtoToDomain(v))
}
// If we've retrieved fewer records than MaxBatchSize, we're done
if len(batchPackages) < MaxBatchSize {
break
}
// Move to the next batch
offset += MaxBatchSize
} }
var output []domain.SourcePackage
for _, v := range packages { return allPackages, nil
output = append(output, sourcePackageDtoToDomain(v))
}
return output, nil
} }
func (r *Repository) GetPackage(name string) (domain.SourcePackage, error) { func (r *Repository) GetPackage(name string) (domain.SourcePackage, error) {

View File

@ -3,7 +3,6 @@ package fastmap
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync"
"slices" "slices"
@ -11,7 +10,6 @@ import (
) )
type Fastmap[K comparable, V any] struct { type Fastmap[K comparable, V any] struct {
mu sync.RWMutex
idx map[K]int idx map[K]int
store []fastmapValue[K, V] store []fastmapValue[K, V]
} }
@ -29,8 +27,6 @@ func New[K comparable, V any]() *Fastmap[K, V] {
} }
func (m *Fastmap[K, V]) Set(key K, value V) { func (m *Fastmap[K, V]) Set(key K, value V) {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.idx[key]; ok { if _, ok := m.idx[key]; ok {
m.store[m.idx[key]].Value = value m.store[m.idx[key]].Value = value
return return
@ -40,8 +36,6 @@ func (m *Fastmap[K, V]) Set(key K, value V) {
} }
func (m *Fastmap[K, V]) Get(key K) (value V, ok bool) { func (m *Fastmap[K, V]) Get(key K) (value V, ok bool) {
m.mu.RLock()
defer m.mu.RUnlock()
idx, ok := m.idx[key] idx, ok := m.idx[key]
if !ok { if !ok {
return return
@ -50,8 +44,6 @@ func (m *Fastmap[K, V]) Get(key K) (value V, ok bool) {
} }
func (m *Fastmap[K, V]) Delete(key K) { func (m *Fastmap[K, V]) Delete(key K) {
m.mu.Lock()
defer m.mu.Unlock()
idx, ok := m.idx[key] idx, ok := m.idx[key]
if !ok { if !ok {
return return
@ -62,21 +54,15 @@ func (m *Fastmap[K, V]) Delete(key K) {
} }
func (m *Fastmap[K, V]) Has(key K) bool { func (m *Fastmap[K, V]) Has(key K) bool {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok := m.idx[key] _, ok := m.idx[key]
return ok return ok
} }
func (m *Fastmap[K, V]) Len() int { func (m *Fastmap[K, V]) Len() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.idx) return len(m.idx)
} }
func (m *Fastmap[K, V]) GetPage(pageNum int, pageSize int) *Fastmap[K, V] { func (m *Fastmap[K, V]) GetPage(pageNum int, pageSize int) *Fastmap[K, V] {
m.mu.RLock()
defer m.mu.RUnlock()
start := pageSize * pageNum start := pageSize * pageNum
end := start + pageSize end := start + pageSize
if end > len(m.store) { if end > len(m.store) {
@ -91,15 +77,11 @@ func (m *Fastmap[K, V]) GetPage(pageNum int, pageSize int) *Fastmap[K, V] {
} }
func (m *Fastmap[K, V]) Clear() { func (m *Fastmap[K, V]) Clear() {
m.mu.Lock()
defer m.mu.Unlock()
m.idx = make(map[K]int) m.idx = make(map[K]int)
m.store = make([]fastmapValue[K, V], 0) m.store = make([]fastmapValue[K, V], 0)
} }
func (m *Fastmap[K, V]) Iter(fn func(key K, value V) bool) { func (m *Fastmap[K, V]) Iter(fn func(key K, value V) bool) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, v := range m.store { for _, v := range m.store {
if !fn(v.Key, v.Value) { if !fn(v.Key, v.Value) {
break break
@ -108,8 +90,6 @@ func (m *Fastmap[K, V]) Iter(fn func(key K, value V) bool) {
} }
func (m *Fastmap[K, V]) StableSortByKey() { func (m *Fastmap[K, V]) StableSortByKey() {
m.mu.Lock()
defer m.mu.Unlock()
slices.SortStableFunc(m.store, func(a, b fastmapValue[K, V]) int { slices.SortStableFunc(m.store, func(a, b fastmapValue[K, V]) int {
aKey := fmt.Sprint(a.Key) aKey := fmt.Sprint(a.Key)
bKey := fmt.Sprint(b.Key) bKey := fmt.Sprint(b.Key)
@ -123,8 +103,6 @@ func (m *Fastmap[K, V]) StableSortByKey() {
} }
func (m *Fastmap[K, V]) MarshalText() ([]byte, error) { func (m *Fastmap[K, V]) MarshalText() ([]byte, error) {
m.mu.RLock()
defer m.mu.RUnlock()
var builder strings.Builder var builder strings.Builder
for _, v := range m.store { for _, v := range m.store {
builder.WriteString(fmt.Sprintf("%v:%v\n", v.Key, v.Value)) builder.WriteString(fmt.Sprintf("%v:%v\n", v.Key, v.Value))
@ -133,8 +111,6 @@ func (m *Fastmap[K, V]) MarshalText() ([]byte, error) {
} }
func (m *Fastmap[K, V]) UnmarshalText(text []byte) error { func (m *Fastmap[K, V]) UnmarshalText(text []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
m.Clear() m.Clear()
lines := strings.Split(string(text), "\n") lines := strings.Split(string(text), "\n")
for _, line := range lines { for _, line := range lines {
@ -159,8 +135,6 @@ func (m *Fastmap[K, V]) UnmarshalText(text []byte) error {
} }
func (m *Fastmap[K, V]) MarshalJSON() ([]byte, error) { func (m *Fastmap[K, V]) MarshalJSON() ([]byte, error) {
m.mu.RLock()
defer m.mu.RUnlock()
temp := make(map[K]V) temp := make(map[K]V)
for _, v := range m.store { for _, v := range m.store {
temp[v.Key] = v.Value temp[v.Key] = v.Value
@ -169,8 +143,6 @@ func (m *Fastmap[K, V]) MarshalJSON() ([]byte, error) {
} }
func (m *Fastmap[K, V]) UnmarshalJSON(data []byte) error { func (m *Fastmap[K, V]) UnmarshalJSON(data []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
temp := make(map[K]V) temp := make(map[K]V)
if err := json.Unmarshal(data, &temp); err != nil { if err := json.Unmarshal(data, &temp); err != nil {
return err return err

View File

@ -26,7 +26,7 @@ func Queue(c *fiber.Ctx) error {
adjustedPageNum = 0 adjustedPageNum = 0
} }
packs := buildqueue.GetQueue() packs := buildqueue.GetQueueCopy()
packs.StableSortByKey() packs.StableSortByKey()
finalReturn := fastmap.New[string, domain.BuildQueueItem]() finalReturn := fastmap.New[string, domain.BuildQueueItem]()

View File

@ -56,11 +56,13 @@ func runServer(ctx context.Context) error {
} }
}() }()
start := time.Now()
err = packages.LoadFromDb() err = packages.LoadFromDb()
if err != nil { if err != nil {
slog.Error("unable to load packages from db: " + err.Error()) slog.Error("unable to load packages from db: " + err.Error())
return err return err
} }
slog.Info("packages loaded in " + time.Since(start).String())
buildqueue.StartPackageQueueWorker(ctx) buildqueue.StartPackageQueueWorker(ctx)
buildqueue.StartQueueWorker(ctx) buildqueue.StartQueueWorker(ctx)
@ -91,7 +93,7 @@ func runServer(ctx context.Context) error {
server.Get("/api/counts", handlers_packages.Counts) server.Get("/api/counts", handlers_packages.Counts)
server.Get("/api/packages", handlers_packages.Packages) server.Get("/api/packages", handlers_packages.Packages)
server.Get("/api/buildqueue", handlers_build.Queue) server.Get("/api/queue", handlers_build.Queue)
server.Post("/api/login", handlers_auth.Login) server.Post("/api/login", handlers_auth.Login)
adminRoutes.Post("/register", handlers_auth.Register) adminRoutes.Post("/register", handlers_auth.Register)