Freeze feature + refactor of package logic

This commit is contained in:
ferreo 2024-10-02 19:46:32 +01:00
parent 20228de56a
commit 480fb614f4
6 changed files with 474 additions and 68 deletions

View File

@ -108,24 +108,26 @@ func (r *Repository) GetPackage(name string) (domain.SourcePackage, error) {
} }
func (r *Repository) SavePackages(pkgs *haxmap.Map[string, domain.SourcePackage]) error { func (r *Repository) SavePackages(pkgs *haxmap.Map[string, domain.SourcePackage]) error {
packs := make([]domain.SourcePackageDTO, 0) var packs []domain.SourcePackageDTO
pkgs.ForEach(func(k string, v domain.SourcePackage) bool { pkgs.ForEach(func(k string, v domain.SourcePackage) bool {
packs = append(packs, sourcePackageToDto(v)) packs = append(packs, sourcePackageToDto(v))
return true return true
}) })
for i := 0; i < len(packs); i += MaxBatchSize { return r.saveInBatches(packs)
end := i + MaxBatchSize
length := len(packs)
if end > length {
end = length
}
tx := r.db.Save(packs[i:end])
if tx.Error != nil {
return tx.Error
}
} }
func (r *Repository) saveInBatches(packs []domain.SourcePackageDTO) error {
for i := 0; i < len(packs); i += MaxBatchSize {
end := i + MaxBatchSize
if end > len(packs) {
end = len(packs)
}
batch := packs[i:end]
if err := r.db.Clauses(clause.OnConflict{UpdateAll: true}).Create(&batch).Error; err != nil {
return err
}
}
return nil return nil
} }

View File

@ -0,0 +1,47 @@
package handlers_build
import (
handlers_packages "brunel/handlers/packages"
"fmt"
"os/exec"
"github.com/gofiber/fiber/v2"
)
type FreezeRequest struct {
Name string `json:"name"`
Version string `json:"version"`
}
func TriggerFreeze(c *fiber.Ctx) error {
var req FreezeRequest
if err := c.BodyParser(&req); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Invalid request body",
})
}
if req.Name == "" || req.Version == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Name and version are required",
})
}
cmd := exec.Command("/bin/sh", "-c", fmt.Sprintf("aptly repo copy pika-canary pika-nest '%s (%s)'", req.Name, req.Version))
output, err := cmd.CombinedOutput()
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Command execution failed: %v", err),
"output": string(output),
})
}
// Remove the item from the freeze cache
handlers_packages.RemoveFromFreezeCache(req.Name, req.Version)
return c.Status(fiber.StatusOK).JSON(fiber.Map{
"message": "Freeze triggered successfully",
"output": string(output),
})
}

View File

@ -0,0 +1,147 @@
package handlers_packages
import (
"brunel/packages"
"log/slog"
"sort"
"strings"
"sync"
"time"
"github.com/gofiber/fiber/v2"
)
type FreezeItem struct {
Name string `json:"name"`
Ver string `json:"version"`
Arch string `json:"architecture"`
}
type FreezesResponse struct {
Total int `json:"total"`
Freezes []FreezeItem `json:"freezes"`
}
var (
cachedFreezes []FreezeItem
lastCacheTime time.Time
cacheDuration = 1 * time.Hour
freezeCacheMutex sync.RWMutex
)
func UpdateFreezeCache() error {
freeze64, freeze32, err := packages.GetFreezeItems()
if err != nil {
return err
}
combinedFreezes := make([]FreezeItem, 0, len(freeze64)+len(freeze32))
for _, item := range freeze64 {
combinedFreezes = append(combinedFreezes, FreezeItem{
Name: item.Name,
Ver: item.Ver,
Arch: "amd64",
})
}
for _, item := range freeze32 {
combinedFreezes = append(combinedFreezes, FreezeItem{
Name: item.Name,
Ver: item.Ver,
Arch: "i386",
})
}
sort.Slice(combinedFreezes, func(i, j int) bool {
return combinedFreezes[i].Name < combinedFreezes[j].Name
})
freezeCacheMutex.Lock()
defer freezeCacheMutex.Unlock()
cachedFreezes = combinedFreezes
lastCacheTime = time.Now()
return nil
}
func getFreezes() ([]FreezeItem, error) {
freezeCacheMutex.RLock()
if time.Since(lastCacheTime) < cacheDuration && len(cachedFreezes) > 0 {
defer freezeCacheMutex.RUnlock()
return cachedFreezes, nil
}
freezeCacheMutex.RUnlock()
if err := UpdateFreezeCache(); err != nil {
return nil, err
}
freezeCacheMutex.RLock()
defer freezeCacheMutex.RUnlock()
return cachedFreezes, nil
}
func RemoveFromFreezeCache(name, version string) {
freezeCacheMutex.Lock()
defer freezeCacheMutex.Unlock()
for i, item := range cachedFreezes {
if item.Name == name && item.Ver == version {
cachedFreezes = append(cachedFreezes[:i], cachedFreezes[i+1:]...)
break
}
}
}
func Freezes(c *fiber.Ctx) error {
pageNum := c.QueryInt("page", 1)
pageSize := c.QueryInt("pageSize", 250)
search := strings.ToLower(c.Query("search"))
filter := c.Query("filter")
// Adjust pageNum to be 0-based for slice indexing
adjustedPageNum := pageNum - 1
if adjustedPageNum < 0 {
adjustedPageNum = 0
}
freezes, err := getFreezes()
if err != nil {
slog.Error("Failed to retrieve freeze items", "error", err)
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": "Failed to retrieve freeze items",
})
}
filteredFreezes := make([]FreezeItem, 0)
for _, freeze := range freezes {
matchesFilter := filter == "" || filter == "All" || strings.EqualFold(freeze.Arch, filter)
matchesSearch := search == "" || strings.Contains(strings.ToLower(freeze.Name), search)
if matchesFilter && matchesSearch {
filteredFreezes = append(filteredFreezes, freeze)
}
}
total := len(filteredFreezes)
startIndex := adjustedPageNum * pageSize
endIndex := (adjustedPageNum + 1) * pageSize
if startIndex >= total {
filteredFreezes = []FreezeItem{}
} else {
if endIndex > total {
endIndex = total
}
filteredFreezes = filteredFreezes[startIndex:endIndex]
}
response := FreezesResponse{
Total: total,
Freezes: filteredFreezes,
}
return c.Status(fiber.StatusOK).JSON(response)
}

99
packages/freezer.go Normal file
View File

@ -0,0 +1,99 @@
package packages
import (
"brunel/deb"
"compress/bzip2"
"fmt"
"net/http"
"strconv"
"strings"
"time"
)
const nest64Url = "https://ppa.pika-os.com/dists/pika/nest/binary-amd64/Packages.bz2"
const canary64Url = "https://ppa.pika-os.com/dists/pika/canary/binary-amd64/Packages.bz2"
const nest32Url = "https://ppa.pika-os.com/dists/pika/nest/binary-i386/Packages.bz2"
const canary32Url = "https://ppa.pika-os.com/dists/pika/canary/binary-i386/Packages.bz2"
type freezeItem struct {
Name string
Ver string
}
func GetFreezeItems() ([]freezeItem, []freezeItem, error) {
freeze64, err := compareRepositories(nest64Url, canary64Url)
if err != nil {
return nil, nil, fmt.Errorf("error comparing 64-bit repositories: %w", err)
}
freeze32, err := compareRepositories(nest32Url, canary32Url)
if err != nil {
return nil, nil, fmt.Errorf("error comparing 32-bit repositories: %w", err)
}
return freeze64, freeze32, nil
}
func compareRepositories(nestUrl, canaryUrl string) ([]freezeItem, error) {
nestPackages, err := getPackages(nestUrl)
if err != nil {
return nil, fmt.Errorf("error getting nest packages: %w", err)
}
canaryPackages, err := getPackages(canaryUrl)
if err != nil {
return nil, fmt.Errorf("error getting canary packages: %w", err)
}
nestSet := make(map[string]bool)
for _, pkg := range nestPackages {
nestSet[pkg] = true
}
var freezeItems []freezeItem
for _, pkg := range canaryPackages {
if !nestSet[pkg] {
parts := strings.SplitN(pkg, ":", 2)
if len(parts) == 2 {
freezeItems = append(freezeItems, freezeItem{Name: parts[0], Ver: parts[1]})
}
}
}
return freezeItems, nil
}
func getPackages(url string) ([]string, error) {
resp, err := http.Get(url + "?nocache=" + strconv.Itoa(int(time.Now().Unix())))
if err != nil {
return nil, fmt.Errorf("fetching package file: %w", err)
}
defer resp.Body.Close()
rdr := bzip2.NewReader(resp.Body)
sreader := deb.NewControlFileReader(rdr, false, false)
var packages []string
for {
stanza, err := sreader.ReadStanza()
if err != nil || stanza == nil {
break
}
name := stanza["Package"]
versionStr := stanza["Version"]
if name == "" || versionStr == "" {
continue
}
if strings.Contains(name, "-dbgsym") {
continue
}
packages = append(packages, name+":"+versionStr)
}
return packages, nil
}

View File

@ -6,12 +6,12 @@ import (
"brunel/domain" "brunel/domain"
"brunel/helpers" "brunel/helpers"
"compress/bzip2" "compress/bzip2"
"errors"
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
"net/http" "net/http"
"slices" "slices"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -56,50 +56,13 @@ func ProcessPackages() error {
if err := eg.Wait(); err != nil { if err := eg.Wait(); err != nil {
return err return err
} }
slog.Info("packages processed in " + time.Since(start).String()) slog.Info("packages loaded in " + time.Since(start).String())
combinePackages(internalPackages) mergePackages(internalPackages, externalPackages)
combinePackages(externalPackages)
ProcessMissingPackages(internalPackages, externalPackages) updatedPackages := determineUpdatedPackages(internalPackages, externalPackages)
ProcessStalePackages(internalPackages, externalPackages)
updatedPackages := haxmap.New[string, domain.SourcePackage]() updateBinaryPackageFlags(updatedPackages)
internalPackages.ForEach(func(k string, v domain.SourcePackage) bool {
curr, exists := currentPackages.Get(k)
if !exists {
updatedPackages.Set(k, v)
return true
}
mergedPackage := domain.SourcePackage{
Name: curr.Name,
Has32bit: curr.Has32bit,
Version: v.Version,
NewVersion: v.NewVersion,
Status: v.Status,
BuildAttempts: curr.BuildAttempts,
LastBuildStatus: curr.LastBuildStatus,
Packages: haxmap.New[string, domain.PackageInfo](),
}
updatedPackages.Set(k, mergedPackage)
return true
})
updatedPackages.ForEach(func(k string, v domain.SourcePackage) bool {
for _, pkg := range config.Configs.I386List {
if v.Name == pkg || v.Name == pkg+DMOSuffix {
v.Has32bit = true
updatedPackages.Set(k, v)
return true
}
}
v.Has32bit = false
updatedPackages.Set(k, v)
return true
})
currentPackages = updatedPackages currentPackages = updatedPackages
@ -111,6 +74,7 @@ func ProcessPackages() error {
return fmt.Errorf("saving to database: %w", err) return fmt.Errorf("saving to database: %w", err)
} }
slog.Info("packages processed and saved in " + time.Since(start).String())
return nil return nil
} }
@ -176,21 +140,13 @@ func LoadFromDb() error {
} }
func LoadPackages(packages *haxmap.Map[string, domain.SourcePackage], packageFiles []config.PackageFile, isInternal bool) error { func LoadPackages(packages *haxmap.Map[string, domain.SourcePackage], packageFiles []config.PackageFile, isInternal bool) error {
slices.SortStableFunc(packageFiles, func(a, b config.PackageFile) int { sortedFiles := sortPackageFiles(packageFiles, isInternal)
if a.Priority == b.Priority {
return 0
}
if (isInternal && a.Priority < b.Priority) || (!isInternal && a.Priority > b.Priority) {
return 1
}
return -1
})
var eg errgroup.Group var eg errgroup.Group
var mu sync.Mutex var mu sync.Mutex
packageResults := make([][]fetchResult, len(packageFiles)) packageResults := make([][]fetchResult, len(sortedFiles))
for i, pkg := range packageFiles { for i, pkg := range sortedFiles {
i, pkg := i, pkg // capture range variables
eg.Go(func() error { eg.Go(func() error {
results, err := fetchPackagesForFile(pkg) results, err := fetchPackagesForFile(pkg)
if err != nil { if err != nil {
@ -216,6 +172,21 @@ func LoadPackages(packages *haxmap.Map[string, domain.SourcePackage], packageFil
return nil return nil
} }
func sortPackageFiles(packageFiles []config.PackageFile, isInternal bool) []config.PackageFile {
sorted := make([]config.PackageFile, len(packageFiles))
copy(sorted, packageFiles)
slices.SortStableFunc(sorted, func(a, b config.PackageFile) int {
if a.Priority == b.Priority {
return 0
}
if (isInternal && a.Priority < b.Priority) || (!isInternal && a.Priority > b.Priority) {
return 1
}
return -1
})
return sorted
}
type fetchResult struct { type fetchResult struct {
repo string repo string
packages *haxmap.Map[string, domain.PackageInfo] packages *haxmap.Map[string, domain.PackageInfo]
@ -295,7 +266,8 @@ func ProcessStalePackages(internalPackages *haxmap.Map[string, domain.SourcePack
} }
func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[string, domain.PackageInfo], error) { func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[string, domain.PackageInfo], error) {
resp, err := http.Get(pkg.Url + selectedRepo + "/" + pkg.Packagepath + "." + pkg.Compression + "?nocache=" + strconv.Itoa(int(time.Now().Unix()))) url := fmt.Sprintf("%s%s/%s.%s?nocache=%d", pkg.Url, selectedRepo, pkg.Packagepath, pkg.Compression, time.Now().Unix())
resp, err := http.Get(url)
if err != nil { if err != nil {
return nil, fmt.Errorf("fetching package file: %w", err) return nil, fmt.Errorf("fetching package file: %w", err)
} }
@ -311,7 +283,14 @@ func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[
for { for {
stanza, err := sreader.ReadStanza() stanza, err := sreader.ReadStanza()
if err != nil || stanza == nil { if err != nil {
if errors.Is(err, deb.ErrMalformedStanza) {
slog.Warn("Malformed stanza encountered", "error", err)
continue
}
return nil, err
}
if stanza == nil {
break break
} }
@ -320,7 +299,6 @@ func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[
} }
name := stanza[PackageKey] name := stanza[PackageKey]
if shouldSkipPackage(name, pkg) { if shouldSkipPackage(name, pkg) {
continue continue
} }
@ -331,13 +309,15 @@ func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[
} }
versionStr := chooseVersion(sourceVersion, stanza[VersionKey]) versionStr := chooseVersion(sourceVersion, stanza[VersionKey])
ver, err := version.Parse(versionStr) ver, err := version.Parse(versionStr)
if err != nil { if err != nil {
return nil, fmt.Errorf("parsing version %s: %w", versionStr, err) slog.Warn("Invalid version format", "version", versionStr, "error", err)
continue
} }
if shouldUpdatePackage(packages, name, ver) { existingPkg, exists := packages.Get(name)
if !exists {
// If the package doesn't exist, add it
packages.Set(name, domain.PackageInfo{ packages.Set(name, domain.PackageInfo{
PackageName: name, PackageName: name,
Version: ver.String(), Version: ver.String(),
@ -346,6 +326,24 @@ func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[
Description: stanza[DescriptionKey], Description: stanza[DescriptionKey],
Status: domain.Current, Status: domain.Current,
}) })
} else {
// Compare existing version with the new version
cmp, err := compareVersions(ver.String(), existingPkg.Version)
if err != nil {
slog.Warn("Version comparison failed", "package", name, "error", err)
continue
}
if cmp >= 0 {
// Update the package if the new version is higher or equal
packages.Set(name, domain.PackageInfo{
PackageName: name,
Version: ver.String(),
Source: source,
Architecture: stanza[ArchitectureKey],
Description: stanza[DescriptionKey],
Status: existingPkg.Status,
})
}
} }
} }
@ -434,6 +432,111 @@ func GetPackagesCount() domain.PackagesCount {
return count return count
} }
func mergePackages(internal, external *haxmap.Map[string, domain.SourcePackage]) {
external.ForEach(func(name string, extPkg domain.SourcePackage) bool {
intPkg, exists := internal.Get(name)
if exists {
// Compare internal package version with external package version
cmp, err := compareVersions(intPkg.Version, extPkg.Version)
if err != nil {
slog.Warn("Version comparison failed in mergePackages",
"package", name,
"internal_version", intPkg.Version,
"external_version", extPkg.Version,
"error", err)
// Decide whether to continue or handle the error differently
return true // Continue with next package
}
if cmp < 0 {
// External package has a newer version
intPkg.NewVersion = extPkg.Version
intPkg.Status = domain.Stale
// Merge binary packages from external to internal
extPkg.Packages.ForEach(func(binName string, binPkg domain.PackageInfo) bool {
intPkg.Packages.Set(binName, binPkg)
return true
})
internal.Set(name, intPkg)
} else {
// Internal package has the same or newer version
// Optionally, you might still want to merge binary packages
extPkg.Packages.ForEach(func(binName string, binPkg domain.PackageInfo) bool {
// Only add binary package if it doesn't exist or has a newer version
existingBinPkg, binExists := intPkg.Packages.Get(binName)
if !binExists || binPkg.Version > existingBinPkg.Version {
intPkg.Packages.Set(binName, binPkg)
}
return true
})
internal.Set(name, intPkg)
}
} else {
// Package exists only externally; mark as missing
extPkg.Status = domain.Missing
internal.Set(name, extPkg)
}
return true
})
}
func determineUpdatedPackages(internal, external *haxmap.Map[string, domain.SourcePackage]) *haxmap.Map[string, domain.SourcePackage] {
updated := haxmap.New[string, domain.SourcePackage]()
internal.ForEach(func(name string, intPkg domain.SourcePackage) bool {
extPkg, exists := external.Get(name)
if !exists {
// Package does not exist externally; consider it as potentially updated
updated.Set(name, intPkg)
return true
}
// Compare internal package version with external package version
cmp, err := compareVersions(intPkg.Version, extPkg.Version)
if err != nil {
slog.Warn("Version comparison failed", "package", name, "v1", intPkg.Version, "v2", extPkg.Version, "error", err)
// Depending on requirements, decide whether to skip or consider as updated
return true
}
if cmp < 0 {
// External package has a newer version
intPkg.NewVersion = extPkg.Version
intPkg.Status = domain.Stale
updated.Set(name, intPkg)
}
return true
})
return updated
}
func compareVersions(v1, v2 string) (int, error) {
parsedV1, err1 := version.Parse(v1)
if err1 != nil {
return 0, fmt.Errorf("invalid version '%s': %w", v1, err1)
}
parsedV2, err2 := version.Parse(v2)
if err2 != nil {
return 0, fmt.Errorf("invalid version '%s': %w", v2, err2)
}
return version.Compare(parsedV1, parsedV2), nil
}
func updateBinaryPackageFlags(pkgs *haxmap.Map[string, domain.SourcePackage]) {
pkgs.ForEach(func(name string, pkg domain.SourcePackage) bool {
for _, p := range config.Configs.I386List {
if pkg.Name == p || pkg.Name == p+DMOSuffix {
pkg.Has32bit = true
pkgs.Set(name, pkg)
return true
}
}
pkg.Has32bit = false
pkgs.Set(name, pkg)
return true
})
}
func combinePackages(packages *haxmap.Map[string, domain.SourcePackage]) { func combinePackages(packages *haxmap.Map[string, domain.SourcePackage]) {
dmoPackages := haxmap.New[string, domain.SourcePackage]() dmoPackages := haxmap.New[string, domain.SourcePackage]()

View File

@ -54,6 +54,12 @@ func runServer(ctx context.Context) error {
} }
slog.Info("packages loaded in " + time.Since(start).String()) slog.Info("packages loaded in " + time.Since(start).String())
err = handlers_packages.UpdateFreezeCache()
if err != nil {
slog.Error("unable to update freeze cache: " + err.Error())
return err
}
buildqueue.StartPackageQueueWorker(ctx) buildqueue.StartPackageQueueWorker(ctx)
buildqueue.StartQueueAndStatusWorker(ctx) buildqueue.StartQueueAndStatusWorker(ctx)
@ -87,11 +93,13 @@ func runServer(ctx context.Context) error {
server.Get("/api/errored", handlers_build.Errored) server.Get("/api/errored", handlers_build.Errored)
server.Get("/api/isloggedin", handlers_auth.IsLoggedIn) server.Get("/api/isloggedin", handlers_auth.IsLoggedIn)
server.Get("/api/i386", handlers_packages.I386) server.Get("/api/i386", handlers_packages.I386)
server.Get("/api/freezes", handlers_packages.Freezes)
server.Post("/api/login", handlers_auth.Login) server.Post("/api/login", handlers_auth.Login)
adminRoutes.Post("/triggerbuild", handlers_build.TriggerBuild) adminRoutes.Post("/triggerbuild", handlers_build.TriggerBuild)
adminRoutes.Post("/bulkRebuild", handlers_build.BulkRebuild) adminRoutes.Post("/bulkRebuild", handlers_build.BulkRebuild)
adminRoutes.Post("/register", handlers_auth.Register) adminRoutes.Post("/register", handlers_auth.Register)
adminRoutes.Post("/trigger-freeze", handlers_build.TriggerFreeze)
adminRoutes.Post("/updatePassword", handlers_auth.UpdatePassword) adminRoutes.Post("/updatePassword", handlers_auth.UpdatePassword)
return server.Listen(fmt.Sprintf("localhost:%d", config.Configs.Port)) return server.Listen(fmt.Sprintf("localhost:%d", config.Configs.Port))