Fix queue issues

This commit is contained in:
ferreo 2024-07-30 01:24:18 +01:00
parent 8782af6428
commit 8f7657cf38
6 changed files with 149 additions and 67 deletions

View File

@ -8,7 +8,7 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"strings" "sort"
"time" "time"
"golang.org/x/net/html" "golang.org/x/net/html"
@ -26,12 +26,12 @@ func StartPackageQueueWorker(ctx context.Context) {
slog.Error("unable to process packages: " + err.Error()) slog.Error("unable to process packages: " + err.Error())
} }
packs := packages.GetPackages() packs := packages.GetPackages()
packs.Iter(func(k string, v domain.SourcePackage) bool { packs.ForEach(func(k string, v domain.SourcePackage) bool {
needsBuild := false needsBuild := false
buildVersion := "" buildVersion := ""
buildAttempt := 0 buildAttempt := 0
errPreviously := false errPreviously := false
v.Packages.Iter(func(k string, v domain.PackageInfo) bool { v.Packages.ForEach(func(k string, v domain.PackageInfo) bool {
if v.Status == domain.Current { if v.Status == domain.Current {
return true return true
} }
@ -108,7 +108,7 @@ func processStatus(ctx context.Context) {
} }
if complete { if complete {
if err != nil { if err != nil {
item.Source.Packages.Iter(func(k string, v domain.PackageInfo) bool { item.Source.Packages.ForEach(func(k string, v domain.PackageInfo) bool {
v.Status = domain.Error v.Status = domain.Error
v.LastBuildStatus = domain.Error v.LastBuildStatus = domain.Error
item.Source.Packages.Set(k, v) item.Source.Packages.Set(k, v)
@ -118,7 +118,7 @@ func processStatus(ctx context.Context) {
itemsToRemove = append(itemsToRemove, k) itemsToRemove = append(itemsToRemove, k)
return true return true
} }
item.Source.Packages.Iter(func(k string, v domain.PackageInfo) bool { item.Source.Packages.ForEach(func(k string, v domain.PackageInfo) bool {
v.Status = domain.Current v.Status = domain.Current
v.LastBuildStatus = domain.Built v.LastBuildStatus = domain.Built
v.Version = item.BuildVersion v.Version = item.BuildVersion
@ -181,41 +181,71 @@ func CheckIfBuildComplete(ctx context.Context, item domain.BuildQueueItem) (bool
} }
buildName := item.Source.Name + "=" + item.BuildVersion buildName := item.Source.Name + "=" + item.BuildVersion
var f func(*html.Node) (bool, error) var builds []struct {
f = func(n *html.Node) (bool, error) { isMatch bool
status string
time time.Time
}
var f func(*html.Node) error
f = func(n *html.Node) error {
if n.Type == html.ElementNode && n.Data == "div" { if n.Type == html.ElementNode && n.Data == "div" {
for _, a := range n.Attr { for _, a := range n.Attr {
if a.Key == "class" && strings.Contains(a.Val, "flex-item") { if a.Key == "class" && a.Val == "flex-item tw-items-center" {
isMatch, status := checkBuildBlock(n, buildName) isMatch, status, buildTime := checkBuildBlock(n, buildName)
if isMatch { if isMatch {
switch status { builds = append(builds, struct {
case "Success": isMatch bool
return true, nil status string
case "Failure": time time.Time
return true, fmt.Errorf("build failed") }{isMatch, status, buildTime})
default:
// Build found but status unknown, keep searching
return false, nil
}
} }
} }
} }
} }
for c := n.FirstChild; c != nil; c = c.NextSibling { for c := n.FirstChild; c != nil; c = c.NextSibling {
isComplete, err := f(c) if err := f(c); err != nil {
if isComplete || err != nil { return err
return isComplete, err
} }
} }
return nil
}
if err := f(doc); err != nil {
return false, err
}
// Sort builds by time, most recent first
sort.Slice(builds, func(i, j int) bool {
return builds[i].time.After(builds[j].time)
})
if len(builds) == 0 {
slog.Info("No matching builds found", "buildName", buildName)
return false, nil return false, nil
} }
return f(doc) mostRecentBuild := builds[0]
switch mostRecentBuild.status {
case "Success":
return true, nil
case "Failure":
return true, fmt.Errorf("build failed")
case "Running":
return false, nil // Build is still in progress
case "Queued":
return false, nil // Build is still in progress
default:
slog.Warn("Unknown build status", "status", mostRecentBuild.status)
return false, fmt.Errorf("unknown build status: %s", mostRecentBuild.status)
}
} }
func checkBuildBlock(n *html.Node, buildName string) (bool, string) { func checkBuildBlock(n *html.Node, buildName string) (bool, string, time.Time) {
var title string var title string
var status string var status string
var buildTime time.Time
var f func(*html.Node) var f func(*html.Node)
f = func(n *html.Node) { f = func(n *html.Node) {
@ -231,16 +261,28 @@ func checkBuildBlock(n *html.Node, buildName string) (bool, string) {
} }
} }
} }
case "span": case "div":
for _, a := range n.Attr { for _, a := range n.Attr {
if a.Key == "data-tooltip-content" { if a.Key == "class" && a.Val == "flex-item-leading" {
if a.Val == "Success" || a.Val == "Failure" { for c := n.FirstChild; c != nil; c = c.NextSibling {
status = a.Val if c.Type == html.ElementNode && c.Data == "span" {
for _, attr := range c.Attr {
if attr.Key == "data-tooltip-content" {
status = attr.Val
} }
} }
} }
} }
} }
}
case "relative-time":
for _, a := range n.Attr {
if a.Key == "datetime" {
buildTime, _ = time.Parse(time.RFC3339, a.Val)
}
}
}
}
for c := n.FirstChild; c != nil; c = c.NextSibling { for c := n.FirstChild; c != nil; c = c.NextSibling {
f(c) f(c)
} }
@ -248,5 +290,5 @@ func checkBuildBlock(n *html.Node, buildName string) (bool, string) {
f(n) f(n)
return title == buildName, status return title == buildName, status, buildTime
} }

View File

@ -2,9 +2,9 @@ package db
import ( import (
"brunel/domain" "brunel/domain"
"brunel/fastmap"
"time" "time"
"github.com/alphadose/haxmap"
"github.com/jinzhu/now" "github.com/jinzhu/now"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
@ -107,9 +107,9 @@ func (r *Repository) GetPackage(name string) (domain.SourcePackage, error) {
return sourcePackageDtoToDomain(pkg), nil return sourcePackageDtoToDomain(pkg), nil
} }
func (r *Repository) SavePackages(pkgs *fastmap.Fastmap[string, domain.SourcePackage]) error { func (r *Repository) SavePackages(pkgs *haxmap.Map[string, domain.SourcePackage]) error {
packs := make([]domain.SourcePackageDTO, 0) packs := make([]domain.SourcePackageDTO, 0)
pkgs.Iter(func(k string, v domain.SourcePackage) bool { pkgs.ForEach(func(k string, v domain.SourcePackage) bool {
packs = append(packs, sourcePackageToDto(v)) packs = append(packs, sourcePackageToDto(v))
return true return true
}) })
@ -143,7 +143,7 @@ func sourcePackageToDto(pkg domain.SourcePackage) domain.SourcePackageDTO {
Name: pkg.Name, Name: pkg.Name,
Packages: make([]domain.PackageInfo, 0), Packages: make([]domain.PackageInfo, 0),
} }
pkg.Packages.Iter(func(k string, v domain.PackageInfo) bool { pkg.Packages.ForEach(func(k string, v domain.PackageInfo) bool {
dto.Packages = append(dto.Packages, v) dto.Packages = append(dto.Packages, v)
return true return true
}) })
@ -153,7 +153,7 @@ func sourcePackageToDto(pkg domain.SourcePackage) domain.SourcePackageDTO {
func sourcePackageDtoToDomain(dto domain.SourcePackageDTO) domain.SourcePackage { func sourcePackageDtoToDomain(dto domain.SourcePackageDTO) domain.SourcePackage {
pkg := domain.SourcePackage{ pkg := domain.SourcePackage{
Name: dto.Name, Name: dto.Name,
Packages: fastmap.New[string, domain.PackageInfo](), Packages: haxmap.New[string, domain.PackageInfo](),
} }
for _, v := range dto.Packages { for _, v := range dto.Packages {
pkg.Packages.Set(v.PackageName, v) pkg.Packages.Set(v.PackageName, v)

View File

@ -1,8 +1,9 @@
package domain package domain
import ( import (
"brunel/fastmap"
"time" "time"
"github.com/alphadose/haxmap"
) )
type PackagesCount struct { type PackagesCount struct {
@ -16,7 +17,7 @@ type PackagesCount struct {
type SourcePackage struct { type SourcePackage struct {
Name string `gorm:"primarykey"` Name string `gorm:"primarykey"`
Packages *fastmap.Fastmap[string, PackageInfo] `gorm:"foreignKey:PackageInfo;references:PackageName"` Packages *haxmap.Map[string, PackageInfo] `gorm:"foreignKey:PackageInfo;references:PackageName"`
} }
type SourcePackageDTO struct { type SourcePackageDTO struct {

View File

@ -32,7 +32,7 @@ func Queue(c *fiber.Ctx) error {
packs.ForEach(func(k string, source domain.BuildQueueItem) bool { packs.ForEach(func(k string, source domain.BuildQueueItem) bool {
matchesSearch := search == "" || strings.Contains(strings.ToLower(k), search) matchesSearch := search == "" || strings.Contains(strings.ToLower(k), search)
matchesFilter := filter == "" || source.Status == domain.BuildStatus(filter) matchesFilter := filter == "" || source.Status == domain.BuildStatus(filter)
source.Source.Packages.Iter(func(key string, value domain.PackageInfo) bool { source.Source.Packages.ForEach(func(key string, value domain.PackageInfo) bool {
if !matchesSearch && strings.Contains(strings.ToLower(key), search) { if !matchesSearch && strings.Contains(strings.ToLower(key), search) {
matchesSearch = true matchesSearch = true
} }

View File

@ -30,11 +30,11 @@ func Packages(c *fiber.Ctx) error {
finalReturn := fastmap.New[string, domain.SourcePackage]() finalReturn := fastmap.New[string, domain.SourcePackage]()
packs.Iter(func(k string, source domain.SourcePackage) bool { packs.ForEach(func(k string, source domain.SourcePackage) bool {
matchesFilter := filter == "" matchesFilter := filter == ""
matchesSearch := search == "" || strings.Contains(strings.ToLower(k), search) matchesSearch := search == "" || strings.Contains(strings.ToLower(k), search)
source.Packages.Iter(func(key string, value domain.PackageInfo) bool { source.Packages.ForEach(func(key string, value domain.PackageInfo) bool {
if !matchesFilter && value.Status == domain.PackageStatus(filter) { if !matchesFilter && value.Status == domain.PackageStatus(filter) {
matchesFilter = true matchesFilter = true
} }

View File

@ -4,7 +4,6 @@ import (
"brunel/config" "brunel/config"
"brunel/deb" "brunel/deb"
"brunel/domain" "brunel/domain"
"brunel/fastmap"
"brunel/helpers" "brunel/helpers"
"compress/bzip2" "compress/bzip2"
"fmt" "fmt"
@ -17,16 +16,17 @@ import (
"pault.ag/go/debian/version" "pault.ag/go/debian/version"
"github.com/alphadose/haxmap"
"github.com/klauspost/compress/gzip" "github.com/klauspost/compress/gzip"
"github.com/ulikunitz/xz" "github.com/ulikunitz/xz"
) )
var LastUpdateTime time.Time var LastUpdateTime time.Time
var currentPackagesFastMap = fastmap.New[string, domain.SourcePackage]() var currentPackagesFastMap = haxmap.New[string, domain.SourcePackage]()
func ProcessPackages() error { func ProcessPackages() error {
var internalPackages = fastmap.New[string, domain.SourcePackage]() var internalPackages = haxmap.New[string, domain.SourcePackage]()
var externalPackages = fastmap.New[string, domain.SourcePackage]() var externalPackages = haxmap.New[string, domain.SourcePackage]()
err := LoadInternalPackages(internalPackages) err := LoadInternalPackages(internalPackages)
if err != nil { if err != nil {
return err return err
@ -35,15 +35,19 @@ func ProcessPackages() error {
if err != nil { if err != nil {
return err return err
} }
// Combine packages before processing
combinePackages(internalPackages)
combinePackages(externalPackages)
ProcessStalePackages(internalPackages, externalPackages) ProcessStalePackages(internalPackages, externalPackages)
ProcessMissingPackages(internalPackages, externalPackages) ProcessMissingPackages(internalPackages, externalPackages)
currentPackagesFastMap.Clear() currentPackagesFastMap.Clear()
internalPackages.Iter(func(k string, v domain.SourcePackage) bool { internalPackages.ForEach(func(k string, v domain.SourcePackage) bool {
currentPackagesFastMap.Set(k, v) currentPackagesFastMap.Set(k, v)
return true return true
}) })
currentPackagesFastMap.StableSortByKey()
LastUpdateTime = time.Now() LastUpdateTime = time.Now()
helpers.ReloadCache() helpers.ReloadCache()
@ -55,7 +59,7 @@ func ProcessPackages() error {
return nil return nil
} }
func GetPackages() *fastmap.Fastmap[string, domain.SourcePackage] { func GetPackages() *haxmap.Map[string, domain.SourcePackage] {
return currentPackagesFastMap return currentPackagesFastMap
} }
@ -130,11 +134,10 @@ func LoadFromDb() error {
for _, pkg := range packages { for _, pkg := range packages {
currentPackagesFastMap.Set(pkg.Name, pkg) currentPackagesFastMap.Set(pkg.Name, pkg)
} }
currentPackagesFastMap.StableSortByKey()
return nil return nil
} }
func LoadInternalPackages(internalPackages *fastmap.Fastmap[string, domain.SourcePackage]) error { func LoadInternalPackages(internalPackages *haxmap.Map[string, domain.SourcePackage]) error {
localPackageFile := config.Configs.LocalPackageFiles localPackageFile := config.Configs.LocalPackageFiles
slices.SortStableFunc(localPackageFile, func(a, b config.PackageFile) int { slices.SortStableFunc(localPackageFile, func(a, b config.PackageFile) int {
if a.Priority == b.Priority { if a.Priority == b.Priority {
@ -152,10 +155,10 @@ func LoadInternalPackages(internalPackages *fastmap.Fastmap[string, domain.Sourc
if err != nil { if err != nil {
return err return err
} }
packages.Iter(func(newKey string, newPkg domain.PackageInfo) bool { packages.ForEach(func(newKey string, newPkg domain.PackageInfo) bool {
pk, ok := internalPackages.Get(newPkg.Source) pk, ok := internalPackages.Get(newPkg.Source)
if !ok { if !ok {
newMap := fastmap.New[string, domain.PackageInfo]() newMap := haxmap.New[string, domain.PackageInfo]()
newMap.Set(newKey, newPkg) newMap.Set(newKey, newPkg)
internalPackages.Set(newPkg.Source, domain.SourcePackage{ internalPackages.Set(newPkg.Source, domain.SourcePackage{
Name: newPkg.Source, Name: newPkg.Source,
@ -183,7 +186,7 @@ func LoadInternalPackages(internalPackages *fastmap.Fastmap[string, domain.Sourc
return nil return nil
} }
func LoadExternalPackages(externalPackages *fastmap.Fastmap[string, domain.SourcePackage]) error { func LoadExternalPackages(externalPackages *haxmap.Map[string, domain.SourcePackage]) error {
externalPackageFile := config.Configs.ExternalPackageFiles externalPackageFile := config.Configs.ExternalPackageFiles
slices.SortStableFunc(externalPackageFile, func(a, b config.PackageFile) int { slices.SortStableFunc(externalPackageFile, func(a, b config.PackageFile) int {
if a.Priority == b.Priority { if a.Priority == b.Priority {
@ -201,10 +204,10 @@ func LoadExternalPackages(externalPackages *fastmap.Fastmap[string, domain.Sourc
if err != nil { if err != nil {
return err return err
} }
packages.Iter(func(k string, v domain.PackageInfo) bool { packages.ForEach(func(k string, v domain.PackageInfo) bool {
pk, ok := externalPackages.Get(v.Source) pk, ok := externalPackages.Get(v.Source)
if !ok { if !ok {
newMap := fastmap.New[string, domain.PackageInfo]() newMap := haxmap.New[string, domain.PackageInfo]()
newMap.Set(k, v) newMap.Set(k, v)
externalPackages.Set(v.Source, domain.SourcePackage{ externalPackages.Set(v.Source, domain.SourcePackage{
Name: v.Source, Name: v.Source,
@ -232,12 +235,12 @@ func LoadExternalPackages(externalPackages *fastmap.Fastmap[string, domain.Sourc
return nil return nil
} }
func ProcessMissingPackages(internalPackages *fastmap.Fastmap[string, domain.SourcePackage], externalPackages *fastmap.Fastmap[string, domain.SourcePackage]) { func ProcessMissingPackages(internalPackages *haxmap.Map[string, domain.SourcePackage], externalPackages *haxmap.Map[string, domain.SourcePackage]) {
externalPackages.Iter(func(k string, src domain.SourcePackage) bool { externalPackages.ForEach(func(k string, src domain.SourcePackage) bool {
_, ok := internalPackages.Get(k) _, ok := internalPackages.Get(k)
if !ok && src.Packages.Len() > 0 { if !ok && src.Packages.Len() > 0 {
newStatus := domain.Missing newStatus := domain.Missing
src.Packages.Iter(func(k string, v domain.PackageInfo) bool { src.Packages.ForEach(func(k string, v domain.PackageInfo) bool {
v.Status = newStatus v.Status = newStatus
v.Version = strings.Split(v.Version, "+b")[0] v.Version = strings.Split(v.Version, "+b")[0]
src.Packages.Set(k, v) src.Packages.Set(k, v)
@ -249,17 +252,17 @@ func ProcessMissingPackages(internalPackages *fastmap.Fastmap[string, domain.Sou
}) })
} }
func ProcessStalePackages(internalPackages *fastmap.Fastmap[string, domain.SourcePackage], externalPackages *fastmap.Fastmap[string, domain.SourcePackage]) { func ProcessStalePackages(internalPackages *haxmap.Map[string, domain.SourcePackage], externalPackages *haxmap.Map[string, domain.SourcePackage]) {
externalPackages.Iter(func(newPackage string, newSource domain.SourcePackage) bool { externalPackages.ForEach(func(newPackage string, newSource domain.SourcePackage) bool {
matchedPackage, ok := internalPackages.Get(newPackage) matchedPackage, ok := internalPackages.Get(newPackage)
if !ok || matchedPackage.Packages.Len() == 0 { if !ok || matchedPackage.Packages.Len() == 0 {
return true return true
} }
matchedPackage.Packages.Iter(func(currentKey string, currentPackage domain.PackageInfo) bool { matchedPackage.Packages.ForEach(func(currentKey string, currentPackage domain.PackageInfo) bool {
if currentPackage.Status == domain.Missing { if currentPackage.Status == domain.Missing {
return true return true
} }
newSource.Packages.Iter(func(newKey string, newPackage domain.PackageInfo) bool { newSource.Packages.ForEach(func(newKey string, newPackage domain.PackageInfo) bool {
if currentKey != newKey { if currentKey != newKey {
return true return true
} }
@ -277,9 +280,9 @@ func ProcessStalePackages(internalPackages *fastmap.Fastmap[string, domain.Sourc
return true return true
}) })
wasMissing := false wasMissing := false
newSource.Packages.Iter(func(newKey string, newPackage domain.PackageInfo) bool { newSource.Packages.ForEach(func(newKey string, newPackage domain.PackageInfo) bool {
found := false found := false
matchedPackage.Packages.Iter(func(currentKey string, currentPackage domain.PackageInfo) bool { matchedPackage.Packages.ForEach(func(currentKey string, currentPackage domain.PackageInfo) bool {
if currentKey != newKey { if currentKey != newKey {
return true return true
} }
@ -295,7 +298,7 @@ func ProcessStalePackages(internalPackages *fastmap.Fastmap[string, domain.Sourc
return true return true
}) })
if wasMissing { if wasMissing {
matchedPackage.Packages.Iter(func(k string, v domain.PackageInfo) bool { matchedPackage.Packages.ForEach(func(k string, v domain.PackageInfo) bool {
if v.Status == domain.Missing { if v.Status == domain.Missing {
return true return true
} }
@ -307,7 +310,7 @@ func ProcessStalePackages(internalPackages *fastmap.Fastmap[string, domain.Sourc
}) })
} }
func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*fastmap.Fastmap[string, domain.PackageInfo], error) { func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[string, domain.PackageInfo], error) {
resp, err := http.Get(pkg.Url + selectedRepo + "/" + pkg.Packagepath + "." + pkg.Compression) resp, err := http.Get(pkg.Url + selectedRepo + "/" + pkg.Packagepath + "." + pkg.Compression)
if err != nil { if err != nil {
return nil, err return nil, err
@ -333,7 +336,7 @@ func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*fastmap.Fas
rdr = r rdr = r
} }
packages := fastmap.New[string, domain.PackageInfo]() packages := haxmap.New[string, domain.PackageInfo]()
sreader := deb.NewControlFileReader(rdr, false, false) sreader := deb.NewControlFileReader(rdr, false, false)
for { for {
stanza, err := sreader.ReadStanza() stanza, err := sreader.ReadStanza()
@ -411,8 +414,8 @@ func GetPackagesCount() domain.PackagesCount {
Queued: 0, Queued: 0,
Building: 0, Building: 0,
} }
currentPackagesFastMap.Iter(func(k string, v domain.SourcePackage) bool { currentPackagesFastMap.ForEach(func(k string, v domain.SourcePackage) bool {
v.Packages.Iter(func(k string, pkg domain.PackageInfo) bool { v.Packages.ForEach(func(k string, pkg domain.PackageInfo) bool {
switch pkg.Status { switch pkg.Status {
case domain.Stale: case domain.Stale:
count.Stale++ count.Stale++
@ -432,3 +435,39 @@ func GetPackagesCount() domain.PackagesCount {
return count return count
} }
func combinePackages(packages *haxmap.Map[string, domain.SourcePackage]) {
dmoPackages := haxmap.New[string, domain.SourcePackage]()
// First pass: identify and collect -dmo packages
packages.ForEach(func(k string, v domain.SourcePackage) bool {
if strings.HasSuffix(k, "-dmo") {
dmoPackages.Set(k, v)
packages.Del(k)
}
return true
})
// Second pass: combine -dmo packages with base packages or add them back
dmoPackages.ForEach(func(dmoName string, dmoPkg domain.SourcePackage) bool {
baseName := strings.TrimSuffix(dmoName, "-dmo")
basePkg, hasBase := packages.Get(baseName)
if hasBase {
// Combine packages, prioritizing -dmo
combinedPkg := dmoPkg // Start with the -dmo package
basePkg.Packages.ForEach(func(pkgName string, pkgInfo domain.PackageInfo) bool {
if _, exists := combinedPkg.Packages.Get(pkgName); !exists {
combinedPkg.Packages.Set(pkgName, pkgInfo)
}
return true
})
packages.Set(dmoName, combinedPkg) // Store under the -dmo name
packages.Del(baseName) // Remove the base package
} else {
// If there's no base package, just add the dmo package back
packages.Set(dmoName, dmoPkg)
}
return true
})
}