Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

85 рядки
1.6KB

  1. package polling
  2. import (
  3. "context"
  4. "log/slog"
  5. "sync"
  6. "time"
  7. "qctextbuilder/internal/buildsvc"
  8. "qctextbuilder/internal/store"
  9. )
  10. type Service struct {
  11. buildSvc buildsvc.Service
  12. store store.BuildStore
  13. interval time.Duration
  14. maxPolls int
  15. logger *slog.Logger
  16. }
  17. func New(buildSvc buildsvc.Service, buildStore store.BuildStore, interval time.Duration, maxPolls int, logger *slog.Logger) *Service {
  18. if interval <= 0 {
  19. interval = 5 * time.Second
  20. }
  21. if maxPolls <= 0 {
  22. maxPolls = 4
  23. }
  24. return &Service{
  25. buildSvc: buildSvc,
  26. store: buildStore,
  27. interval: interval,
  28. maxPolls: maxPolls,
  29. logger: logger,
  30. }
  31. }
  32. func (s *Service) Start(ctx context.Context) error {
  33. ticker := time.NewTicker(s.interval)
  34. defer ticker.Stop()
  35. for {
  36. select {
  37. case <-ctx.Done():
  38. return nil
  39. case <-ticker.C:
  40. s.PollPendingOnce(ctx, 20)
  41. }
  42. }
  43. }
  44. func (s *Service) PollPendingOnce(ctx context.Context, limit int) {
  45. builds, err := s.store.ListBuildsByStatuses(ctx, []string{"queued", "processing"}, limit)
  46. if err != nil {
  47. s.logger.Error("list pending builds failed", "error", err)
  48. return
  49. }
  50. semSize := s.maxPolls
  51. if semSize <= 0 {
  52. semSize = 1
  53. }
  54. sem := make(chan struct{}, semSize)
  55. var wg sync.WaitGroup
  56. for _, build := range builds {
  57. select {
  58. case <-ctx.Done():
  59. return
  60. default:
  61. }
  62. wg.Add(1)
  63. sem <- struct{}{}
  64. go func(buildID string) {
  65. defer wg.Done()
  66. defer func() { <-sem }()
  67. if err := s.buildSvc.PollOnce(ctx, buildID); err != nil {
  68. s.logger.Error("poll build failed", "buildId", buildID, "error", err)
  69. }
  70. }(build.ID)
  71. }
  72. wg.Wait()
  73. }