2025-05-20 09:40:15 +00:00
|
|
|
// Copyright 2025 Cloudbase Solutions SRL
|
|
|
|
|
//
|
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
|
|
|
// not use this file except in compliance with the License. You may obtain
|
|
|
|
|
// a copy of the License at
|
|
|
|
|
//
|
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
//
|
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
|
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
|
// License for the specific language governing permissions and limitations
|
|
|
|
|
// under the License.
|
|
|
|
|
|
2022-10-21 02:49:53 +03:00
|
|
|
package websocket
|
|
|
|
|
|
|
|
|
|
import (
|
2024-07-02 22:26:12 +00:00
|
|
|
"context"
|
2025-08-16 19:31:58 +00:00
|
|
|
"errors"
|
2024-07-02 22:26:12 +00:00
|
|
|
"fmt"
|
2024-01-05 23:32:16 +00:00
|
|
|
"log/slog"
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
"net"
|
2024-07-02 22:26:12 +00:00
|
|
|
"sync"
|
2022-10-21 02:49:53 +03:00
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
|
|
"github.com/gorilla/websocket"
|
2024-07-02 22:26:12 +00:00
|
|
|
|
|
|
|
|
"github.com/cloudbase/garm/auth"
|
|
|
|
|
"github.com/cloudbase/garm/database/common"
|
|
|
|
|
"github.com/cloudbase/garm/database/watcher"
|
|
|
|
|
"github.com/cloudbase/garm/params"
|
2022-10-21 02:49:53 +03:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// Time allowed to write a message to the peer.
|
|
|
|
|
writeWait = 10 * time.Second
|
|
|
|
|
|
|
|
|
|
// Time allowed to read the next pong message from the peer.
|
|
|
|
|
pongWait = 60 * time.Second
|
|
|
|
|
|
|
|
|
|
// Send pings to peer with this period. Must be less than pongWait.
|
|
|
|
|
pingPeriod = (pongWait * 9) / 10
|
|
|
|
|
|
|
|
|
|
// Maximum message size allowed from peer.
|
2024-07-04 15:29:15 +00:00
|
|
|
maxMessageSize = 16384 // 16 KB
|
2022-10-21 02:49:53 +03:00
|
|
|
)
|
|
|
|
|
|
2024-07-02 22:26:12 +00:00
|
|
|
type HandleWebsocketMessage func([]byte) error
|
|
|
|
|
|
|
|
|
|
func NewClient(ctx context.Context, conn *websocket.Conn) (*Client, error) {
|
2022-10-21 02:49:53 +03:00
|
|
|
clientID := uuid.New()
|
2024-07-02 22:26:12 +00:00
|
|
|
consumerID := fmt.Sprintf("ws-client-watcher-%s", clientID.String())
|
|
|
|
|
|
|
|
|
|
user := auth.UserID(ctx)
|
|
|
|
|
if user == "" {
|
|
|
|
|
return nil, fmt.Errorf("user not found in context")
|
|
|
|
|
}
|
|
|
|
|
generation := auth.PasswordGeneration(ctx)
|
|
|
|
|
|
|
|
|
|
consumer, err := watcher.RegisterConsumer(
|
|
|
|
|
ctx, consumerID,
|
|
|
|
|
watcher.WithUserIDFilter(user),
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
2025-08-16 19:31:58 +00:00
|
|
|
return nil, fmt.Errorf("error registering consumer: %w", err)
|
2024-07-02 22:26:12 +00:00
|
|
|
}
|
2022-10-21 02:49:53 +03:00
|
|
|
return &Client{
|
2024-07-02 22:26:12 +00:00
|
|
|
id: clientID.String(),
|
|
|
|
|
conn: conn,
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
userID: user,
|
|
|
|
|
passwordGeneration: generation,
|
|
|
|
|
consumer: consumer,
|
2022-10-21 02:49:53 +03:00
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Client struct {
|
|
|
|
|
id string
|
|
|
|
|
conn *websocket.Conn
|
|
|
|
|
// Buffered channel of outbound messages.
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
send chan []byte
|
|
|
|
|
mux sync.Mutex
|
|
|
|
|
writeMux sync.Mutex
|
|
|
|
|
ctx context.Context
|
2024-07-02 22:26:12 +00:00
|
|
|
|
|
|
|
|
userID string
|
|
|
|
|
passwordGeneration uint
|
|
|
|
|
consumer common.Consumer
|
|
|
|
|
|
|
|
|
|
messageHandler HandleWebsocketMessage
|
|
|
|
|
|
|
|
|
|
running bool
|
|
|
|
|
done chan struct{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Client) ID() string {
|
|
|
|
|
return c.id
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Client) Stop() {
|
|
|
|
|
c.mux.Lock()
|
|
|
|
|
defer c.mux.Unlock()
|
|
|
|
|
|
|
|
|
|
if !c.running {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.running = false
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
c.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
2024-07-02 22:26:12 +00:00
|
|
|
c.conn.Close()
|
|
|
|
|
close(c.send)
|
|
|
|
|
close(c.done)
|
|
|
|
|
}
|
2022-10-21 02:49:53 +03:00
|
|
|
|
2024-07-02 22:26:12 +00:00
|
|
|
func (c *Client) Done() <-chan struct{} {
|
|
|
|
|
return c.done
|
2022-10-21 02:49:53 +03:00
|
|
|
}
|
|
|
|
|
|
2024-07-02 22:26:12 +00:00
|
|
|
func (c *Client) SetMessageHandler(handler HandleWebsocketMessage) {
|
|
|
|
|
c.mux.Lock()
|
|
|
|
|
defer c.mux.Unlock()
|
|
|
|
|
c.messageHandler = handler
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Client) Start() error {
|
|
|
|
|
c.mux.Lock()
|
|
|
|
|
defer c.mux.Unlock()
|
|
|
|
|
|
|
|
|
|
c.running = true
|
2025-05-14 15:22:27 +00:00
|
|
|
c.send = make(chan []byte, 100)
|
|
|
|
|
c.done = make(chan struct{})
|
2024-07-02 22:26:12 +00:00
|
|
|
|
|
|
|
|
go c.runWatcher()
|
2022-10-21 02:49:53 +03:00
|
|
|
go c.clientReader()
|
|
|
|
|
go c.clientWriter()
|
2024-07-02 22:26:12 +00:00
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Client) Write(msg []byte) (int, error) {
|
|
|
|
|
c.mux.Lock()
|
|
|
|
|
defer c.mux.Unlock()
|
|
|
|
|
|
|
|
|
|
if !c.running {
|
2025-08-27 00:25:17 +00:00
|
|
|
return 0, fmt.Errorf("websocket client is stopped")
|
2024-07-02 22:26:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tmp := make([]byte, len(msg))
|
|
|
|
|
copy(tmp, msg)
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case c.send <- tmp:
|
2025-08-27 00:25:17 +00:00
|
|
|
return len(tmp), nil
|
|
|
|
|
default:
|
|
|
|
|
return 0, fmt.Errorf("timed out sending message to websocket client")
|
2024-07-02 22:26:12 +00:00
|
|
|
}
|
2022-10-21 02:49:53 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// clientReader waits for options changes from the client. The client can at any time
|
|
|
|
|
// change the log level and binary name it watches.
|
|
|
|
|
func (c *Client) clientReader() {
|
|
|
|
|
defer func() {
|
2024-07-02 22:26:12 +00:00
|
|
|
c.Stop()
|
2022-10-21 02:49:53 +03:00
|
|
|
}()
|
|
|
|
|
c.conn.SetReadLimit(maxMessageSize)
|
2023-01-20 22:19:54 +02:00
|
|
|
if err := c.conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
|
2024-01-05 23:32:16 +00:00
|
|
|
slog.With(slog.Any("error", err)).Error("failed to set read deadline")
|
2023-01-20 22:19:54 +02:00
|
|
|
}
|
|
|
|
|
c.conn.SetPongHandler(func(string) error {
|
|
|
|
|
if err := c.conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
2022-10-21 02:49:53 +03:00
|
|
|
for {
|
2024-07-02 22:26:12 +00:00
|
|
|
mt, data, err := c.conn.ReadMessage()
|
2022-10-21 02:49:53 +03:00
|
|
|
if err != nil {
|
2024-07-02 22:26:12 +00:00
|
|
|
if IsErrorOfInterest(err) {
|
|
|
|
|
slog.ErrorContext(c.ctx, "error reading websocket message", slog.Any("error", err))
|
|
|
|
|
}
|
2022-10-21 02:49:53 +03:00
|
|
|
break
|
|
|
|
|
}
|
2024-07-02 22:26:12 +00:00
|
|
|
|
|
|
|
|
if c.messageHandler != nil {
|
|
|
|
|
if err := c.messageHandler(data); err != nil {
|
|
|
|
|
slog.ErrorContext(c.ctx, "error handling message", slog.Any("error", err))
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-10-21 02:49:53 +03:00
|
|
|
if mt == websocket.CloseMessage {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
func (c *Client) writeMessage(messageType int, message []byte) error {
|
|
|
|
|
c.writeMux.Lock()
|
|
|
|
|
defer c.writeMux.Unlock()
|
|
|
|
|
if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to set write deadline: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if err := c.conn.WriteMessage(messageType, message); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to write message: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-21 02:49:53 +03:00
|
|
|
// clientWriter
|
|
|
|
|
func (c *Client) clientWriter() {
|
2024-07-04 13:40:59 +00:00
|
|
|
// Set up expiration timer.
|
|
|
|
|
// NOTE: if a token is created without an expiration date
|
|
|
|
|
// this will be set to nil, which will close the loop bellow
|
|
|
|
|
// and terminate the connection immediately.
|
|
|
|
|
// We can't have a token without an expiration date.
|
2024-07-02 22:26:12 +00:00
|
|
|
var authExpires time.Time
|
|
|
|
|
expires := auth.Expires(c.ctx)
|
|
|
|
|
if expires != nil {
|
|
|
|
|
authExpires = *expires
|
|
|
|
|
}
|
2024-07-05 10:48:27 +00:00
|
|
|
authTimer := time.NewTimer(time.Until(authExpires))
|
|
|
|
|
ticker := time.NewTicker(pingPeriod)
|
|
|
|
|
defer func() {
|
|
|
|
|
c.Stop()
|
|
|
|
|
ticker.Stop()
|
|
|
|
|
authTimer.Stop()
|
|
|
|
|
}()
|
2022-10-21 02:49:53 +03:00
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case message, ok := <-c.send:
|
|
|
|
|
if !ok {
|
|
|
|
|
// The hub closed the channel.
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
if err := c.writeMessage(websocket.CloseMessage, []byte{}); err != nil {
|
2024-07-02 22:26:12 +00:00
|
|
|
if IsErrorOfInterest(err) {
|
|
|
|
|
slog.With(slog.Any("error", err)).Error("failed to write message")
|
|
|
|
|
}
|
2023-01-20 22:19:54 +02:00
|
|
|
}
|
2022-10-21 02:49:53 +03:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
if err := c.writeMessage(websocket.TextMessage, message); err != nil {
|
2024-07-02 22:26:12 +00:00
|
|
|
if IsErrorOfInterest(err) {
|
|
|
|
|
slog.With(slog.Any("error", err)).Error("error sending message")
|
|
|
|
|
}
|
2022-10-21 02:49:53 +03:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
case <-ticker.C:
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
if err := c.writeMessage(websocket.PingMessage, nil); err != nil {
|
2024-07-02 22:26:12 +00:00
|
|
|
if IsErrorOfInterest(err) {
|
|
|
|
|
slog.With(slog.Any("error", err)).Error("failed to write ping message")
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
case <-c.ctx.Done():
|
|
|
|
|
return
|
2024-07-05 10:48:27 +00:00
|
|
|
case <-authTimer.C:
|
2024-07-02 22:26:12 +00:00
|
|
|
// Auth has expired
|
|
|
|
|
slog.DebugContext(c.ctx, "auth expired, closing connection")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Client) runWatcher() {
|
|
|
|
|
defer func() {
|
|
|
|
|
c.Stop()
|
|
|
|
|
}()
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-c.Done():
|
|
|
|
|
return
|
|
|
|
|
case <-c.ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case event, ok := <-c.consumer.Watch():
|
|
|
|
|
if !ok {
|
|
|
|
|
slog.InfoContext(c.ctx, "watcher closed")
|
2022-10-21 02:49:53 +03:00
|
|
|
return
|
|
|
|
|
}
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
if event.EntityType != common.UserEntityType {
|
|
|
|
|
continue
|
|
|
|
|
}
|
2024-07-02 22:26:12 +00:00
|
|
|
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
user, ok := event.Payload.(params.User)
|
|
|
|
|
if !ok {
|
|
|
|
|
slog.ErrorContext(c.ctx, "failed to cast payload to user")
|
|
|
|
|
continue
|
|
|
|
|
}
|
2024-07-02 22:26:12 +00:00
|
|
|
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
if user.ID != c.userID {
|
|
|
|
|
continue
|
|
|
|
|
}
|
2024-07-02 22:26:12 +00:00
|
|
|
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
if event.Operation == common.DeleteOperation {
|
|
|
|
|
slog.InfoContext(c.ctx, "user deleted; closing connection")
|
|
|
|
|
c.Stop()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !user.Enabled {
|
|
|
|
|
slog.InfoContext(c.ctx, "user disabled; closing connection")
|
|
|
|
|
c.Stop()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if user.Generation != c.passwordGeneration {
|
|
|
|
|
slog.InfoContext(c.ctx, "password generation mismatch; closing connection")
|
|
|
|
|
c.Stop()
|
|
|
|
|
}
|
2024-07-02 22:26:12 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func IsErrorOfInterest(err error) bool {
|
|
|
|
|
if err == nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if errors.Is(err, websocket.ErrCloseSent) {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if errors.Is(err, websocket.ErrBadHandshake) {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
Add events websocket endpoint
This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2024-07-03 22:30:41 +00:00
|
|
|
if errors.Is(err, net.ErrClosed) {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-02 22:26:12 +00:00
|
|
|
asCloseErr, ok := err.(*websocket.CloseError)
|
|
|
|
|
if ok {
|
|
|
|
|
switch asCloseErr.Code {
|
|
|
|
|
case websocket.CloseNormalClosure, websocket.CloseGoingAway,
|
|
|
|
|
websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure:
|
|
|
|
|
return false
|
2022-10-21 02:49:53 +03:00
|
|
|
}
|
|
|
|
|
}
|
2024-07-02 22:26:12 +00:00
|
|
|
|
|
|
|
|
return true
|
2022-10-21 02:49:53 +03:00
|
|
|
}
|