Fix log streamer and cleanup code

I accidentally disabled the log streamer when I moved the config options
to their own section. This change fixes that.

This change also adds some safety checks and locking when cleaning up stale
clients. The websocket hub Write() function now copies the message before
sending it on the channel to the clients.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2024-01-06 14:05:38 +00:00
parent 0aaeebde28
commit 70bfff96e0
3 changed files with 35 additions and 12 deletions

View file

@ -63,7 +63,7 @@ var logCmd = &cobra.Command{
slog.With(slog.Any("error", err)).Error("reading log message")
return
}
fmt.Print(util.SanitizeLogEntry(string(message)))
fmt.Println(util.SanitizeLogEntry(string(message)))
}
}()

View file

@ -71,8 +71,7 @@ func maybeInitController(db common.Store) error {
return nil
}
func setupLogging(ctx context.Context, cfg *config.Config, hub *websocket.Hub) {
logCfg := cfg.GetLoggingConfig()
func setupLogging(ctx context.Context, logCfg config.Logging, hub *websocket.Hub) {
logWriter, err := util.GetLoggingWriter(logCfg.LogFile)
if err != nil {
log.Fatalf("fetching log writer: %+v", err)
@ -157,16 +156,16 @@ func main() {
log.Fatalf("Fetching config: %+v", err)
}
logCfg := cfg.GetLoggingConfig()
var hub *websocket.Hub
if cfg.Default.EnableLogStreamer != nil && *cfg.Default.EnableLogStreamer {
if logCfg.EnableLogStreamer != nil && *logCfg.EnableLogStreamer {
hub = websocket.NewHub(ctx)
if err := hub.Start(); err != nil {
log.Fatal(err)
}
defer hub.Stop() //nolint
}
setupLogging(ctx, cfg, hub)
setupLogging(ctx, logCfg, hub)
db, err := database.NewDatabase(ctx, cfg.Database)
if err != nil {

View file

@ -3,6 +3,7 @@ package websocket
import (
"context"
"fmt"
"sync"
"time"
)
@ -33,6 +34,8 @@ type Hub struct {
// Unregister requests from clients.
unregister chan *Client
mux sync.Mutex
}
func (h *Hub) run() {
@ -46,28 +49,47 @@ func (h *Hub) run() {
return
case client := <-h.register:
if client != nil {
h.mux.Lock()
h.clients[client.id] = client
h.mux.Unlock()
}
case client := <-h.unregister:
if client != nil {
h.mux.Lock()
if _, ok := h.clients[client.id]; ok {
delete(h.clients, client.id)
client.conn.Close()
close(client.send)
delete(h.clients, client.id)
}
h.mux.Unlock()
}
case message := <-h.broadcast:
staleClients := []string{}
for id, client := range h.clients {
if client == nil {
staleClients = append(staleClients, id)
continue
}
select {
case client.send <- message:
case <-time.After(5 * time.Second):
close(client.send)
delete(h.clients, id)
staleClients = append(staleClients, id)
}
}
if len(staleClients) > 0 {
h.mux.Lock()
for _, id := range staleClients {
if client, ok := h.clients[id]; ok {
if client != nil {
client.conn.Close()
close(client.send)
}
delete(h.clients, id)
}
}
h.mux.Unlock()
}
}
}
}
@ -78,13 +100,15 @@ func (h *Hub) Register(client *Client) error {
}
func (h *Hub) Write(msg []byte) (int, error) {
tmp := make([]byte, len(msg))
copy(tmp, msg)
select {
case <-time.After(5 * time.Second):
return 0, fmt.Errorf("timed out sending message to client")
case h.broadcast <- msg:
case h.broadcast <- tmp:
}
return len(msg), nil
return len(tmp), nil
}
func (h *Hub) Start() error {