From 2554f70b893661cfa9dc73bb488def9247825936 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Fri, 5 Jul 2024 10:48:27 +0000 Subject: [PATCH] Replace time.After with time.NewTimer Improper use of time.After can lead to memory leaks if the timer never gets a chance to fire. Signed-off-by: Gabriel Adrian Samfira --- database/watcher/consumer.go | 4 +++- database/watcher/producer.go | 4 +++- runner/pool/pool.go | 8 ++++++-- runner/runner.go | 5 +++-- websocket/client.go | 18 +++++++++++------- websocket/websocket.go | 9 ++++++--- 6 files changed, 32 insertions(+), 16 deletions(-) diff --git a/database/watcher/consumer.go b/database/watcher/consumer.go index fb36c694..9282ece8 100644 --- a/database/watcher/consumer.go +++ b/database/watcher/consumer.go @@ -69,13 +69,15 @@ func (w *consumer) Send(payload common.ChangePayload) { } } + timer := time.NewTimer(1 * time.Second) + defer timer.Stop() slog.DebugContext(w.ctx, "sending payload") select { case <-w.quit: slog.DebugContext(w.ctx, "consumer is closed") case <-w.ctx.Done(): slog.DebugContext(w.ctx, "consumer is closed") - case <-time.After(1 * time.Second): + case <-timer.C: slog.DebugContext(w.ctx, "timeout trying to send payload", "payload", payload) case w.messages <- payload: } diff --git a/database/watcher/producer.go b/database/watcher/producer.go index fd61aa16..159ad843 100644 --- a/database/watcher/producer.go +++ b/database/watcher/producer.go @@ -26,12 +26,14 @@ func (w *producer) Notify(payload common.ChangePayload) error { return common.ErrProducerClosed } + timer := time.NewTimer(1 * time.Second) + defer timer.Stop() select { case <-w.quit: return common.ErrProducerClosed case <-w.ctx.Done(): return common.ErrProducerClosed - case <-time.After(1 * time.Second): + case <-timer.C: return common.ErrProducerTimeoutErr case w.messages <- payload: } diff --git a/runner/pool/pool.go b/runner/pool/pool.go index c78fffb1..d3dbd96b 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -793,8 +793,10 @@ func (r *basePoolManager) Status() params.PoolManagerStatus { func (r *basePoolManager) waitForTimeoutOrCancelled(timeout time.Duration) { slog.DebugContext( r.ctx, fmt.Sprintf("sleeping for %.2f minutes", timeout.Minutes())) + timer := time.NewTimer(timeout) + defer timer.Stop() select { - case <-time.After(timeout): + case <-timer.C: case <-r.ctx.Done(): case <-r.quit: } @@ -1471,13 +1473,15 @@ func (r *basePoolManager) addPendingInstances() error { func (r *basePoolManager) Wait() error { done := make(chan struct{}) + timer := time.NewTimer(60 * time.Second) go func() { r.wg.Wait() + timer.Stop() close(done) }() select { case <-done: - case <-time.After(60 * time.Second): + case <-timer.C: return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop") } return nil diff --git a/runner/runner.go b/runner/runner.go index c7ff4534..532412dd 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -431,11 +431,12 @@ func (r *Runner) waitForErrorGroupOrTimeout(g *errgroup.Group) error { go func() { done <- g.Wait() }() - + timer := time.NewTimer(60 * time.Second) + defer timer.Stop() select { case err := <-done: return err - case <-time.After(60 * time.Second): + case <-timer.C: return fmt.Errorf("timed out waiting for pool manager start") } } diff --git a/websocket/client.go b/websocket/client.go index c82f244d..5b80ba81 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -134,9 +134,11 @@ func (c *Client) Write(msg []byte) (int, error) { tmp := make([]byte, len(msg)) copy(tmp, msg) + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() select { - case <-time.After(5 * time.Second): + case <-timer.C: return 0, fmt.Errorf("timed out sending message to client") case c.send <- tmp: } @@ -193,11 +195,6 @@ func (c *Client) writeMessage(messageType int, message []byte) error { // clientWriter func (c *Client) clientWriter() { - ticker := time.NewTicker(pingPeriod) - defer func() { - c.Stop() - ticker.Stop() - }() // Set up expiration timer. // NOTE: if a token is created without an expiration date // this will be set to nil, which will close the loop bellow @@ -208,6 +205,13 @@ func (c *Client) clientWriter() { if expires != nil { authExpires = *expires } + authTimer := time.NewTimer(time.Until(authExpires)) + ticker := time.NewTicker(pingPeriod) + defer func() { + c.Stop() + ticker.Stop() + authTimer.Stop() + }() for { select { case message, ok := <-c.send: @@ -236,7 +240,7 @@ func (c *Client) clientWriter() { } case <-c.ctx.Done(): return - case <-time.After(time.Until(authExpires)): + case <-authTimer.C: // Auth has expired slog.DebugContext(c.ctx, "auth expired, closing connection") return diff --git a/websocket/websocket.go b/websocket/websocket.go index 18b56585..57820449 100644 --- a/websocket/websocket.go +++ b/websocket/websocket.go @@ -107,9 +107,10 @@ func (h *Hub) Unregister(client *Client) error { func (h *Hub) Write(msg []byte) (int, error) { tmp := make([]byte, len(msg)) copy(tmp, msg) - + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() select { - case <-time.After(5 * time.Second): + case <-timer.C: return 0, fmt.Errorf("timed out sending message to client") case h.broadcast <- tmp: } @@ -134,9 +135,11 @@ func (h *Hub) Stop() error { } func (h *Hub) Wait() error { + timer := time.NewTimer(60 * time.Second) + defer timer.Stop() select { case <-h.closed: - case <-time.After(60 * time.Second): + case <-timer.C: return fmt.Errorf("timed out waiting for hub stop") } return nil