From 70bfff96e08dc4e8260068571559f8b953ea6fa7 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sat, 6 Jan 2024 14:05:38 +0000 Subject: [PATCH 1/2] Fix log streamer and cleanup code I accidentally disabled the log streamer when I moved the config options to their own section. This change fixes that. This change also adds some safety checks and locking when cleaning up stale clients. The websocket hub Write() function now copies the message before sending it on the channel to the clients. Signed-off-by: Gabriel Adrian Samfira --- cmd/garm-cli/cmd/log.go | 2 +- cmd/garm/main.go | 9 ++++----- websocket/websocket.go | 36 ++++++++++++++++++++++++++++++------ 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/cmd/garm-cli/cmd/log.go b/cmd/garm-cli/cmd/log.go index c8e61fa0..19708afc 100644 --- a/cmd/garm-cli/cmd/log.go +++ b/cmd/garm-cli/cmd/log.go @@ -63,7 +63,7 @@ var logCmd = &cobra.Command{ slog.With(slog.Any("error", err)).Error("reading log message") return } - fmt.Print(util.SanitizeLogEntry(string(message))) + fmt.Println(util.SanitizeLogEntry(string(message))) } }() diff --git a/cmd/garm/main.go b/cmd/garm/main.go index 217ba994..4b10fbaa 100644 --- a/cmd/garm/main.go +++ b/cmd/garm/main.go @@ -71,8 +71,7 @@ func maybeInitController(db common.Store) error { return nil } -func setupLogging(ctx context.Context, cfg *config.Config, hub *websocket.Hub) { - logCfg := cfg.GetLoggingConfig() +func setupLogging(ctx context.Context, logCfg config.Logging, hub *websocket.Hub) { logWriter, err := util.GetLoggingWriter(logCfg.LogFile) if err != nil { log.Fatalf("fetching log writer: %+v", err) @@ -157,16 +156,16 @@ func main() { log.Fatalf("Fetching config: %+v", err) } + logCfg := cfg.GetLoggingConfig() var hub *websocket.Hub - if cfg.Default.EnableLogStreamer != nil && *cfg.Default.EnableLogStreamer { + if logCfg.EnableLogStreamer != nil && *logCfg.EnableLogStreamer { hub = websocket.NewHub(ctx) if err := hub.Start(); err != nil { log.Fatal(err) } defer hub.Stop() //nolint } - - setupLogging(ctx, cfg, hub) + setupLogging(ctx, logCfg, hub) db, err := database.NewDatabase(ctx, cfg.Database) if err != nil { diff --git a/websocket/websocket.go b/websocket/websocket.go index a482fdcd..a1791c09 100644 --- a/websocket/websocket.go +++ b/websocket/websocket.go @@ -3,6 +3,7 @@ package websocket import ( "context" "fmt" + "sync" "time" ) @@ -33,6 +34,8 @@ type Hub struct { // Unregister requests from clients. unregister chan *Client + + mux sync.Mutex } func (h *Hub) run() { @@ -46,28 +49,47 @@ func (h *Hub) run() { return case client := <-h.register: if client != nil { + h.mux.Lock() h.clients[client.id] = client + h.mux.Unlock() } case client := <-h.unregister: if client != nil { + h.mux.Lock() if _, ok := h.clients[client.id]; ok { - delete(h.clients, client.id) + client.conn.Close() close(client.send) + delete(h.clients, client.id) } + h.mux.Unlock() } case message := <-h.broadcast: + staleClients := []string{} for id, client := range h.clients { if client == nil { + staleClients = append(staleClients, id) continue } select { case client.send <- message: case <-time.After(5 * time.Second): - close(client.send) - delete(h.clients, id) + staleClients = append(staleClients, id) } } + if len(staleClients) > 0 { + h.mux.Lock() + for _, id := range staleClients { + if client, ok := h.clients[id]; ok { + if client != nil { + client.conn.Close() + close(client.send) + } + delete(h.clients, id) + } + } + h.mux.Unlock() + } } } } @@ -78,13 +100,15 @@ func (h *Hub) Register(client *Client) error { } func (h *Hub) Write(msg []byte) (int, error) { + tmp := make([]byte, len(msg)) + copy(tmp, msg) + select { case <-time.After(5 * time.Second): return 0, fmt.Errorf("timed out sending message to client") - case h.broadcast <- msg: - + case h.broadcast <- tmp: } - return len(msg), nil + return len(tmp), nil } func (h *Hub) Start() error { From 4d7fcbe23a1f4b84d484d01852740ff207bda507 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sat, 6 Jan 2024 14:15:52 +0000 Subject: [PATCH 2/2] Safely close the quit channel Prevent accidental closure of an already closed channel. Signed-off-by: Gabriel Adrian Samfira --- websocket/websocket.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/websocket/websocket.go b/websocket/websocket.go index a1791c09..a650088d 100644 --- a/websocket/websocket.go +++ b/websocket/websocket.go @@ -35,17 +35,19 @@ type Hub struct { // Unregister requests from clients. unregister chan *Client - mux sync.Mutex + mux sync.Mutex + once sync.Once } func (h *Hub) run() { + defer func() { + close(h.closed) + }() for { select { case <-h.quit: - close(h.closed) return case <-h.ctx.Done(): - close(h.closed) return case client := <-h.register: if client != nil { @@ -116,8 +118,15 @@ func (h *Hub) Start() error { return nil } +func (h *Hub) Close() error { + h.once.Do(func() { + close(h.quit) + }) + return nil +} + func (h *Hub) Stop() error { - close(h.quit) + h.Close() select { case <-h.closed: return nil