|
- package polling
-
- import (
- "context"
- "log/slog"
- "sync"
- "time"
-
- "qctextbuilder/internal/buildsvc"
- "qctextbuilder/internal/store"
- )
-
- type Service struct {
- buildSvc buildsvc.Service
- store store.BuildStore
- interval time.Duration
- maxPolls int
- logger *slog.Logger
- }
-
- func New(buildSvc buildsvc.Service, buildStore store.BuildStore, interval time.Duration, maxPolls int, logger *slog.Logger) *Service {
- if interval <= 0 {
- interval = 5 * time.Second
- }
- if maxPolls <= 0 {
- maxPolls = 4
- }
- return &Service{
- buildSvc: buildSvc,
- store: buildStore,
- interval: interval,
- maxPolls: maxPolls,
- logger: logger,
- }
- }
-
- func (s *Service) Start(ctx context.Context) error {
- ticker := time.NewTicker(s.interval)
- defer ticker.Stop()
-
- for {
- select {
- case <-ctx.Done():
- return nil
- case <-ticker.C:
- s.PollPendingOnce(ctx, 20)
- }
- }
- }
-
- func (s *Service) PollPendingOnce(ctx context.Context, limit int) {
- builds, err := s.store.ListBuildsByStatuses(ctx, []string{"queued", "processing"}, limit)
- if err != nil {
- s.logger.Error("list pending builds failed", "error", err)
- return
- }
-
- semSize := s.maxPolls
- if semSize <= 0 {
- semSize = 1
- }
- sem := make(chan struct{}, semSize)
- var wg sync.WaitGroup
-
- for _, build := range builds {
- select {
- case <-ctx.Done():
- return
- default:
- }
-
- wg.Add(1)
- sem <- struct{}{}
- go func(buildID string) {
- defer wg.Done()
- defer func() { <-sem }()
- if err := s.buildSvc.PollOnce(ctx, buildID); err != nil {
- s.logger.Error("poll build failed", "buildId", buildID, "error", err)
- }
- }(build.ID)
- }
-
- wg.Wait()
- }
|