From 480fb614f48512e64247ddd1f3fed5b794a6114a Mon Sep 17 00:00:00 2001 From: ferreo Date: Wed, 2 Oct 2024 19:46:32 +0100 Subject: [PATCH] Freeze feature + refactor of package logic --- db/repository.go | 18 +-- handlers/build/triggerFreeze.go | 47 +++++++ handlers/packages/freezes.go | 147 +++++++++++++++++++++ packages/freezer.go | 99 ++++++++++++++ packages/packages.go | 223 +++++++++++++++++++++++--------- server.go | 8 ++ 6 files changed, 474 insertions(+), 68 deletions(-) create mode 100644 handlers/build/triggerFreeze.go create mode 100644 handlers/packages/freezes.go create mode 100644 packages/freezer.go diff --git a/db/repository.go b/db/repository.go index c42cf52..860a961 100644 --- a/db/repository.go +++ b/db/repository.go @@ -108,24 +108,26 @@ func (r *Repository) GetPackage(name string) (domain.SourcePackage, error) { } func (r *Repository) SavePackages(pkgs *haxmap.Map[string, domain.SourcePackage]) error { - packs := make([]domain.SourcePackageDTO, 0) + var packs []domain.SourcePackageDTO pkgs.ForEach(func(k string, v domain.SourcePackage) bool { packs = append(packs, sourcePackageToDto(v)) return true }) + return r.saveInBatches(packs) +} + +func (r *Repository) saveInBatches(packs []domain.SourcePackageDTO) error { for i := 0; i < len(packs); i += MaxBatchSize { end := i + MaxBatchSize - length := len(packs) - if end > length { - end = length + if end > len(packs) { + end = len(packs) } - tx := r.db.Save(packs[i:end]) - if tx.Error != nil { - return tx.Error + batch := packs[i:end] + if err := r.db.Clauses(clause.OnConflict{UpdateAll: true}).Create(&batch).Error; err != nil { + return err } } - return nil } diff --git a/handlers/build/triggerFreeze.go b/handlers/build/triggerFreeze.go new file mode 100644 index 0000000..d4703ab --- /dev/null +++ b/handlers/build/triggerFreeze.go @@ -0,0 +1,47 @@ +package handlers_build + +import ( + handlers_packages "brunel/handlers/packages" + "fmt" + "os/exec" + + "github.com/gofiber/fiber/v2" +) + +type FreezeRequest struct { + Name string `json:"name"` + Version string `json:"version"` +} + +func TriggerFreeze(c *fiber.Ctx) error { + var req FreezeRequest + if err := c.BodyParser(&req); err != nil { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ + "error": "Invalid request body", + }) + } + + if req.Name == "" || req.Version == "" { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ + "error": "Name and version are required", + }) + } + + cmd := exec.Command("/bin/sh", "-c", fmt.Sprintf("aptly repo copy pika-canary pika-nest '%s (%s)'", req.Name, req.Version)) + + output, err := cmd.CombinedOutput() + if err != nil { + return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{ + "error": fmt.Sprintf("Command execution failed: %v", err), + "output": string(output), + }) + } + + // Remove the item from the freeze cache + handlers_packages.RemoveFromFreezeCache(req.Name, req.Version) + + return c.Status(fiber.StatusOK).JSON(fiber.Map{ + "message": "Freeze triggered successfully", + "output": string(output), + }) +} diff --git a/handlers/packages/freezes.go b/handlers/packages/freezes.go new file mode 100644 index 0000000..e1098c7 --- /dev/null +++ b/handlers/packages/freezes.go @@ -0,0 +1,147 @@ +package handlers_packages + +import ( + "brunel/packages" + "log/slog" + "sort" + "strings" + "sync" + "time" + + "github.com/gofiber/fiber/v2" +) + +type FreezeItem struct { + Name string `json:"name"` + Ver string `json:"version"` + Arch string `json:"architecture"` +} + +type FreezesResponse struct { + Total int `json:"total"` + Freezes []FreezeItem `json:"freezes"` +} + +var ( + cachedFreezes []FreezeItem + lastCacheTime time.Time + cacheDuration = 1 * time.Hour + freezeCacheMutex sync.RWMutex +) + +func UpdateFreezeCache() error { + freeze64, freeze32, err := packages.GetFreezeItems() + if err != nil { + return err + } + + combinedFreezes := make([]FreezeItem, 0, len(freeze64)+len(freeze32)) + + for _, item := range freeze64 { + combinedFreezes = append(combinedFreezes, FreezeItem{ + Name: item.Name, + Ver: item.Ver, + Arch: "amd64", + }) + } + + for _, item := range freeze32 { + combinedFreezes = append(combinedFreezes, FreezeItem{ + Name: item.Name, + Ver: item.Ver, + Arch: "i386", + }) + } + + sort.Slice(combinedFreezes, func(i, j int) bool { + return combinedFreezes[i].Name < combinedFreezes[j].Name + }) + + freezeCacheMutex.Lock() + defer freezeCacheMutex.Unlock() + + cachedFreezes = combinedFreezes + lastCacheTime = time.Now() + + return nil +} + +func getFreezes() ([]FreezeItem, error) { + freezeCacheMutex.RLock() + if time.Since(lastCacheTime) < cacheDuration && len(cachedFreezes) > 0 { + defer freezeCacheMutex.RUnlock() + return cachedFreezes, nil + } + freezeCacheMutex.RUnlock() + + if err := UpdateFreezeCache(); err != nil { + return nil, err + } + + freezeCacheMutex.RLock() + defer freezeCacheMutex.RUnlock() + return cachedFreezes, nil +} + +func RemoveFromFreezeCache(name, version string) { + freezeCacheMutex.Lock() + defer freezeCacheMutex.Unlock() + + for i, item := range cachedFreezes { + if item.Name == name && item.Ver == version { + cachedFreezes = append(cachedFreezes[:i], cachedFreezes[i+1:]...) + break + } + } +} + +func Freezes(c *fiber.Ctx) error { + pageNum := c.QueryInt("page", 1) + pageSize := c.QueryInt("pageSize", 250) + search := strings.ToLower(c.Query("search")) + filter := c.Query("filter") + + // Adjust pageNum to be 0-based for slice indexing + adjustedPageNum := pageNum - 1 + if adjustedPageNum < 0 { + adjustedPageNum = 0 + } + + freezes, err := getFreezes() + if err != nil { + slog.Error("Failed to retrieve freeze items", "error", err) + return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{ + "error": "Failed to retrieve freeze items", + }) + } + + filteredFreezes := make([]FreezeItem, 0) + for _, freeze := range freezes { + matchesFilter := filter == "" || filter == "All" || strings.EqualFold(freeze.Arch, filter) + matchesSearch := search == "" || strings.Contains(strings.ToLower(freeze.Name), search) + + if matchesFilter && matchesSearch { + filteredFreezes = append(filteredFreezes, freeze) + } + } + + total := len(filteredFreezes) + + startIndex := adjustedPageNum * pageSize + endIndex := (adjustedPageNum + 1) * pageSize + if startIndex >= total { + filteredFreezes = []FreezeItem{} + } else { + if endIndex > total { + endIndex = total + } + filteredFreezes = filteredFreezes[startIndex:endIndex] + } + + response := FreezesResponse{ + Total: total, + Freezes: filteredFreezes, + } + + return c.Status(fiber.StatusOK).JSON(response) +} diff --git a/packages/freezer.go b/packages/freezer.go new file mode 100644 index 0000000..54d7d02 --- /dev/null +++ b/packages/freezer.go @@ -0,0 +1,99 @@ +package packages + +import ( + "brunel/deb" + "compress/bzip2" + "fmt" + "net/http" + "strconv" + "strings" + "time" +) + +const nest64Url = "https://ppa.pika-os.com/dists/pika/nest/binary-amd64/Packages.bz2" +const canary64Url = "https://ppa.pika-os.com/dists/pika/canary/binary-amd64/Packages.bz2" +const nest32Url = "https://ppa.pika-os.com/dists/pika/nest/binary-i386/Packages.bz2" +const canary32Url = "https://ppa.pika-os.com/dists/pika/canary/binary-i386/Packages.bz2" + +type freezeItem struct { + Name string + Ver string +} + +func GetFreezeItems() ([]freezeItem, []freezeItem, error) { + freeze64, err := compareRepositories(nest64Url, canary64Url) + if err != nil { + return nil, nil, fmt.Errorf("error comparing 64-bit repositories: %w", err) + } + + freeze32, err := compareRepositories(nest32Url, canary32Url) + if err != nil { + return nil, nil, fmt.Errorf("error comparing 32-bit repositories: %w", err) + } + + return freeze64, freeze32, nil +} + +func compareRepositories(nestUrl, canaryUrl string) ([]freezeItem, error) { + nestPackages, err := getPackages(nestUrl) + if err != nil { + return nil, fmt.Errorf("error getting nest packages: %w", err) + } + + canaryPackages, err := getPackages(canaryUrl) + if err != nil { + return nil, fmt.Errorf("error getting canary packages: %w", err) + } + + nestSet := make(map[string]bool) + for _, pkg := range nestPackages { + nestSet[pkg] = true + } + + var freezeItems []freezeItem + for _, pkg := range canaryPackages { + if !nestSet[pkg] { + parts := strings.SplitN(pkg, ":", 2) + if len(parts) == 2 { + freezeItems = append(freezeItems, freezeItem{Name: parts[0], Ver: parts[1]}) + } + } + } + + return freezeItems, nil +} + +func getPackages(url string) ([]string, error) { + resp, err := http.Get(url + "?nocache=" + strconv.Itoa(int(time.Now().Unix()))) + if err != nil { + return nil, fmt.Errorf("fetching package file: %w", err) + } + defer resp.Body.Close() + + rdr := bzip2.NewReader(resp.Body) + sreader := deb.NewControlFileReader(rdr, false, false) + + var packages []string + + for { + stanza, err := sreader.ReadStanza() + if err != nil || stanza == nil { + break + } + + name := stanza["Package"] + versionStr := stanza["Version"] + + if name == "" || versionStr == "" { + continue + } + + if strings.Contains(name, "-dbgsym") { + continue + } + + packages = append(packages, name+":"+versionStr) + } + + return packages, nil +} diff --git a/packages/packages.go b/packages/packages.go index ab0912a..8f71c2c 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -6,12 +6,12 @@ import ( "brunel/domain" "brunel/helpers" "compress/bzip2" + "errors" "fmt" "io" "log/slog" "net/http" "slices" - "strconv" "strings" "sync" "time" @@ -56,50 +56,13 @@ func ProcessPackages() error { if err := eg.Wait(); err != nil { return err } - slog.Info("packages processed in " + time.Since(start).String()) + slog.Info("packages loaded in " + time.Since(start).String()) - combinePackages(internalPackages) - combinePackages(externalPackages) + mergePackages(internalPackages, externalPackages) - ProcessMissingPackages(internalPackages, externalPackages) - ProcessStalePackages(internalPackages, externalPackages) + updatedPackages := determineUpdatedPackages(internalPackages, externalPackages) - updatedPackages := haxmap.New[string, domain.SourcePackage]() - - internalPackages.ForEach(func(k string, v domain.SourcePackage) bool { - curr, exists := currentPackages.Get(k) - if !exists { - updatedPackages.Set(k, v) - return true - } - - mergedPackage := domain.SourcePackage{ - Name: curr.Name, - Has32bit: curr.Has32bit, - Version: v.Version, - NewVersion: v.NewVersion, - Status: v.Status, - BuildAttempts: curr.BuildAttempts, - LastBuildStatus: curr.LastBuildStatus, - Packages: haxmap.New[string, domain.PackageInfo](), - } - - updatedPackages.Set(k, mergedPackage) - return true - }) - - updatedPackages.ForEach(func(k string, v domain.SourcePackage) bool { - for _, pkg := range config.Configs.I386List { - if v.Name == pkg || v.Name == pkg+DMOSuffix { - v.Has32bit = true - updatedPackages.Set(k, v) - return true - } - } - v.Has32bit = false - updatedPackages.Set(k, v) - return true - }) + updateBinaryPackageFlags(updatedPackages) currentPackages = updatedPackages @@ -111,6 +74,7 @@ func ProcessPackages() error { return fmt.Errorf("saving to database: %w", err) } + slog.Info("packages processed and saved in " + time.Since(start).String()) return nil } @@ -176,21 +140,13 @@ func LoadFromDb() error { } func LoadPackages(packages *haxmap.Map[string, domain.SourcePackage], packageFiles []config.PackageFile, isInternal bool) error { - slices.SortStableFunc(packageFiles, func(a, b config.PackageFile) int { - if a.Priority == b.Priority { - return 0 - } - if (isInternal && a.Priority < b.Priority) || (!isInternal && a.Priority > b.Priority) { - return 1 - } - return -1 - }) - + sortedFiles := sortPackageFiles(packageFiles, isInternal) var eg errgroup.Group var mu sync.Mutex - packageResults := make([][]fetchResult, len(packageFiles)) + packageResults := make([][]fetchResult, len(sortedFiles)) - for i, pkg := range packageFiles { + for i, pkg := range sortedFiles { + i, pkg := i, pkg // capture range variables eg.Go(func() error { results, err := fetchPackagesForFile(pkg) if err != nil { @@ -216,6 +172,21 @@ func LoadPackages(packages *haxmap.Map[string, domain.SourcePackage], packageFil return nil } +func sortPackageFiles(packageFiles []config.PackageFile, isInternal bool) []config.PackageFile { + sorted := make([]config.PackageFile, len(packageFiles)) + copy(sorted, packageFiles) + slices.SortStableFunc(sorted, func(a, b config.PackageFile) int { + if a.Priority == b.Priority { + return 0 + } + if (isInternal && a.Priority < b.Priority) || (!isInternal && a.Priority > b.Priority) { + return 1 + } + return -1 + }) + return sorted +} + type fetchResult struct { repo string packages *haxmap.Map[string, domain.PackageInfo] @@ -295,7 +266,8 @@ func ProcessStalePackages(internalPackages *haxmap.Map[string, domain.SourcePack } func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[string, domain.PackageInfo], error) { - resp, err := http.Get(pkg.Url + selectedRepo + "/" + pkg.Packagepath + "." + pkg.Compression + "?nocache=" + strconv.Itoa(int(time.Now().Unix()))) + url := fmt.Sprintf("%s%s/%s.%s?nocache=%d", pkg.Url, selectedRepo, pkg.Packagepath, pkg.Compression, time.Now().Unix()) + resp, err := http.Get(url) if err != nil { return nil, fmt.Errorf("fetching package file: %w", err) } @@ -311,7 +283,14 @@ func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[ for { stanza, err := sreader.ReadStanza() - if err != nil || stanza == nil { + if err != nil { + if errors.Is(err, deb.ErrMalformedStanza) { + slog.Warn("Malformed stanza encountered", "error", err) + continue + } + return nil, err + } + if stanza == nil { break } @@ -320,7 +299,6 @@ func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[ } name := stanza[PackageKey] - if shouldSkipPackage(name, pkg) { continue } @@ -331,13 +309,15 @@ func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[ } versionStr := chooseVersion(sourceVersion, stanza[VersionKey]) - ver, err := version.Parse(versionStr) if err != nil { - return nil, fmt.Errorf("parsing version %s: %w", versionStr, err) + slog.Warn("Invalid version format", "version", versionStr, "error", err) + continue } - if shouldUpdatePackage(packages, name, ver) { + existingPkg, exists := packages.Get(name) + if !exists { + // If the package doesn't exist, add it packages.Set(name, domain.PackageInfo{ PackageName: name, Version: ver.String(), @@ -346,6 +326,24 @@ func fetchPackageFile(pkg config.PackageFile, selectedRepo string) (*haxmap.Map[ Description: stanza[DescriptionKey], Status: domain.Current, }) + } else { + // Compare existing version with the new version + cmp, err := compareVersions(ver.String(), existingPkg.Version) + if err != nil { + slog.Warn("Version comparison failed", "package", name, "error", err) + continue + } + if cmp >= 0 { + // Update the package if the new version is higher or equal + packages.Set(name, domain.PackageInfo{ + PackageName: name, + Version: ver.String(), + Source: source, + Architecture: stanza[ArchitectureKey], + Description: stanza[DescriptionKey], + Status: existingPkg.Status, + }) + } } } @@ -434,6 +432,111 @@ func GetPackagesCount() domain.PackagesCount { return count } +func mergePackages(internal, external *haxmap.Map[string, domain.SourcePackage]) { + external.ForEach(func(name string, extPkg domain.SourcePackage) bool { + intPkg, exists := internal.Get(name) + if exists { + // Compare internal package version with external package version + cmp, err := compareVersions(intPkg.Version, extPkg.Version) + if err != nil { + slog.Warn("Version comparison failed in mergePackages", + "package", name, + "internal_version", intPkg.Version, + "external_version", extPkg.Version, + "error", err) + // Decide whether to continue or handle the error differently + return true // Continue with next package + } + + if cmp < 0 { + // External package has a newer version + intPkg.NewVersion = extPkg.Version + intPkg.Status = domain.Stale + // Merge binary packages from external to internal + extPkg.Packages.ForEach(func(binName string, binPkg domain.PackageInfo) bool { + intPkg.Packages.Set(binName, binPkg) + return true + }) + internal.Set(name, intPkg) + } else { + // Internal package has the same or newer version + // Optionally, you might still want to merge binary packages + extPkg.Packages.ForEach(func(binName string, binPkg domain.PackageInfo) bool { + // Only add binary package if it doesn't exist or has a newer version + existingBinPkg, binExists := intPkg.Packages.Get(binName) + if !binExists || binPkg.Version > existingBinPkg.Version { + intPkg.Packages.Set(binName, binPkg) + } + return true + }) + internal.Set(name, intPkg) + } + } else { + // Package exists only externally; mark as missing + extPkg.Status = domain.Missing + internal.Set(name, extPkg) + } + return true + }) +} + +func determineUpdatedPackages(internal, external *haxmap.Map[string, domain.SourcePackage]) *haxmap.Map[string, domain.SourcePackage] { + updated := haxmap.New[string, domain.SourcePackage]() + internal.ForEach(func(name string, intPkg domain.SourcePackage) bool { + extPkg, exists := external.Get(name) + if !exists { + // Package does not exist externally; consider it as potentially updated + updated.Set(name, intPkg) + return true + } + + // Compare internal package version with external package version + cmp, err := compareVersions(intPkg.Version, extPkg.Version) + if err != nil { + slog.Warn("Version comparison failed", "package", name, "v1", intPkg.Version, "v2", extPkg.Version, "error", err) + // Depending on requirements, decide whether to skip or consider as updated + return true + } + + if cmp < 0 { + // External package has a newer version + intPkg.NewVersion = extPkg.Version + intPkg.Status = domain.Stale + updated.Set(name, intPkg) + } + + return true + }) + return updated +} + +func compareVersions(v1, v2 string) (int, error) { + parsedV1, err1 := version.Parse(v1) + if err1 != nil { + return 0, fmt.Errorf("invalid version '%s': %w", v1, err1) + } + parsedV2, err2 := version.Parse(v2) + if err2 != nil { + return 0, fmt.Errorf("invalid version '%s': %w", v2, err2) + } + return version.Compare(parsedV1, parsedV2), nil +} + +func updateBinaryPackageFlags(pkgs *haxmap.Map[string, domain.SourcePackage]) { + pkgs.ForEach(func(name string, pkg domain.SourcePackage) bool { + for _, p := range config.Configs.I386List { + if pkg.Name == p || pkg.Name == p+DMOSuffix { + pkg.Has32bit = true + pkgs.Set(name, pkg) + return true + } + } + pkg.Has32bit = false + pkgs.Set(name, pkg) + return true + }) +} + func combinePackages(packages *haxmap.Map[string, domain.SourcePackage]) { dmoPackages := haxmap.New[string, domain.SourcePackage]() diff --git a/server.go b/server.go index 5027530..c55015d 100644 --- a/server.go +++ b/server.go @@ -54,6 +54,12 @@ func runServer(ctx context.Context) error { } slog.Info("packages loaded in " + time.Since(start).String()) + err = handlers_packages.UpdateFreezeCache() + if err != nil { + slog.Error("unable to update freeze cache: " + err.Error()) + return err + } + buildqueue.StartPackageQueueWorker(ctx) buildqueue.StartQueueAndStatusWorker(ctx) @@ -87,11 +93,13 @@ func runServer(ctx context.Context) error { server.Get("/api/errored", handlers_build.Errored) server.Get("/api/isloggedin", handlers_auth.IsLoggedIn) server.Get("/api/i386", handlers_packages.I386) + server.Get("/api/freezes", handlers_packages.Freezes) server.Post("/api/login", handlers_auth.Login) adminRoutes.Post("/triggerbuild", handlers_build.TriggerBuild) adminRoutes.Post("/bulkRebuild", handlers_build.BulkRebuild) adminRoutes.Post("/register", handlers_auth.Register) + adminRoutes.Post("/trigger-freeze", handlers_build.TriggerFreeze) adminRoutes.Post("/updatePassword", handlers_auth.UpdatePassword) return server.Listen(fmt.Sprintf("localhost:%d", config.Configs.Port))