Add log streamer

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2022-10-21 02:49:53 +03:00
parent 0699973abe
commit a7f151e2d2
No known key found for this signature in database
GPG key ID: 7D073DCC2C074CB5
9 changed files with 445 additions and 78 deletions

99
websocket/client.go Normal file
View file

@ -0,0 +1,99 @@
package websocket
import (
"log"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 1024
)
func NewClient(conn *websocket.Conn, hub *Hub) (*Client, error) {
clientID := uuid.New()
return &Client{
id: clientID.String(),
conn: conn,
hub: hub,
send: make(chan []byte, 100),
}, nil
}
type Client struct {
id string
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
hub *Hub
}
func (c *Client) Go() {
go c.clientReader()
go c.clientWriter()
}
// clientReader waits for options changes from the client. The client can at any time
// change the log level and binary name it watches.
func (c *Client) clientReader() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
mt, _, err := c.conn.ReadMessage()
if err != nil {
break
}
if mt == websocket.CloseMessage {
break
}
}
}
// clientWriter
func (c *Client) clientWriter() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
log.Printf("error sending message: %v", err)
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}

110
websocket/websocket.go Normal file
View file

@ -0,0 +1,110 @@
package websocket
import (
"context"
"fmt"
"time"
)
func NewHub(ctx context.Context) *Hub {
return &Hub{
clients: map[string]*Client{},
broadcast: make(chan []byte, 100),
register: make(chan *Client, 100),
unregister: make(chan *Client, 100),
ctx: ctx,
closed: make(chan struct{}),
quit: make(chan struct{}),
}
}
type Hub struct {
ctx context.Context
closed chan struct{}
quit chan struct{}
// Registered clients.
clients map[string]*Client
// Inbound messages from the clients.
broadcast chan []byte
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan *Client
}
func (h *Hub) run() {
for {
select {
case <-h.quit:
close(h.closed)
return
case <-h.ctx.Done():
close(h.closed)
return
case client := <-h.register:
if client != nil {
h.clients[client.id] = client
}
case client := <-h.unregister:
if client != nil {
if _, ok := h.clients[client.id]; ok {
delete(h.clients, client.id)
close(client.send)
}
}
case message := <-h.broadcast:
for id, client := range h.clients {
if client == nil {
continue
}
select {
case client.send <- message:
case <-time.After(5 * time.Second):
close(client.send)
delete(h.clients, id)
}
}
}
}
}
func (h *Hub) Register(client *Client) error {
h.register <- client
return nil
}
func (h *Hub) Write(msg []byte) (int, error) {
select {
case <-time.After(5 * time.Second):
return 0, fmt.Errorf("timed out sending message to client")
case h.broadcast <- msg:
}
return len(msg), nil
}
func (h *Hub) Start() error {
go h.run()
return nil
}
func (h *Hub) Stop() error {
close(h.quit)
select {
case <-h.closed:
return nil
case <-time.After(60 * time.Second):
return fmt.Errorf("timed out waiting for hub stop")
}
}
func (h *Hub) Wait() {
select {
case <-h.closed:
case <-time.After(60 * time.Second):
}
}