simplify worker pool implementation
Jes Olson j3s@c3f.net
Sun, 21 Apr 2024 00:21:14 -0400
1 files changed,
21 insertions(+),
15 deletions(-)
jump to
M
reaper/reaper.go
→
reaper/reaper.go
@@ -3,6 +3,7 @@
import ( "log" "sort" + "sync" "time" "git.j3s.sh/vore/rss"@@ -59,26 +60,31 @@
// UpdateAll fetches every feed & attempts updating them // asynchronously, then prints the duration of the sync func (r *Reaper) refreshAllFeeds() { - start := time.Now() - semaphore := make(chan struct{}, 20) + ch := make(chan *rss.Feed) + var wg sync.WaitGroup + for i := 20; i > 0; i-- { + wg.Add(1) + + go func() { + defer wg.Done() + + for f := range ch { + start := time.Now() + log.Printf("refreshing %s\n", f.UpdateURL) + r.refreshFeed(f) + log.Printf("%s refreshed in %s\n", f.UpdateURL, time.Since(start)) + } + }() + } for i := range r.feeds { if r.feeds[i].Stale() { - semaphore <- struct{}{} - - go func(f *rss.Feed) { - // ensure we always free the channel - defer func() { - <-semaphore - }() - t := time.Now() - log.Printf("reaper feed %s started\n", r.feeds[i].UpdateURL) - r.refreshFeed(f) - log.Printf("reaper end: feed %s refreshed, took %s\n", r.feeds[i].UpdateURL, time.Since(t)) - }(r.feeds[i]) + ch <- r.feeds[i] } } - log.Printf("reaper: refresh complete in %s\n", time.Since(start)) + + close(ch) + wg.Wait() } // refreshFeed triggers a fetch on the given feed,