This change adds a new websocket endpoint for database events. The events
endpoint allows clients to stream events as they happen in GARM. Events
are defined as a structure containning the event type (create, update, delete),
the database entity involved (instances, pools, repos, etc) and the payload
consisting of the object involved in the event. The payload translates
to the types normally returned by the API and can be deserialized as one
of the types present in the params package.
The events endpoint is a websocket endpoint and it accepts filters as
a simple json send over the websocket connection. The filters allows the
user to specify which entities are of interest, and which operations should
be returned. For example, you may be interested in changes made to pools
or runners, in which case you could create a filter that only returns
update operations for pools. Or update and delete operations.
The filters can be defined as:
{
"filters": [
{
"entity_type": "instance",
"operations": ["update", "delete"]
},
{
"entity_type": "pool"
},
],
"send_everything": false
}
This would return only update and delete events for instances and all events
for pools. Alternatively you can ask GARM to send you everything:
{
"send_everything": true
}
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
182 lines
3.8 KiB
Go
182 lines
3.8 KiB
Go
package watcher
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/cloudbase/garm/database/common"
|
|
garmUtil "github.com/cloudbase/garm/util"
|
|
)
|
|
|
|
var databaseWatcher common.Watcher
|
|
|
|
func InitWatcher(ctx context.Context) {
|
|
if databaseWatcher != nil {
|
|
return
|
|
}
|
|
ctx = garmUtil.WithContext(ctx, slog.Any("watcher", "database"))
|
|
w := &watcher{
|
|
producers: make(map[string]*producer),
|
|
consumers: make(map[string]*consumer),
|
|
quit: make(chan struct{}),
|
|
ctx: ctx,
|
|
}
|
|
|
|
go w.loop()
|
|
databaseWatcher = w
|
|
}
|
|
|
|
func RegisterProducer(ctx context.Context, id string) (common.Producer, error) {
|
|
if databaseWatcher == nil {
|
|
return nil, common.ErrWatcherNotInitialized
|
|
}
|
|
ctx = garmUtil.WithContext(ctx, slog.Any("producer_id", id))
|
|
return databaseWatcher.RegisterProducer(ctx, id)
|
|
}
|
|
|
|
func RegisterConsumer(ctx context.Context, id string, filters ...common.PayloadFilterFunc) (common.Consumer, error) {
|
|
if databaseWatcher == nil {
|
|
return nil, common.ErrWatcherNotInitialized
|
|
}
|
|
ctx = garmUtil.WithContext(ctx, slog.Any("consumer_id", id))
|
|
return databaseWatcher.RegisterConsumer(ctx, id, filters...)
|
|
}
|
|
|
|
type watcher struct {
|
|
producers map[string]*producer
|
|
consumers map[string]*consumer
|
|
|
|
mux sync.Mutex
|
|
closed bool
|
|
quit chan struct{}
|
|
ctx context.Context
|
|
}
|
|
|
|
func (w *watcher) RegisterProducer(ctx context.Context, id string) (common.Producer, error) {
|
|
w.mux.Lock()
|
|
defer w.mux.Unlock()
|
|
|
|
if _, ok := w.producers[id]; ok {
|
|
return nil, errors.Wrapf(common.ErrProducerAlreadyRegistered, "producer_id: %s", id)
|
|
}
|
|
p := &producer{
|
|
id: id,
|
|
messages: make(chan common.ChangePayload, 1),
|
|
quit: make(chan struct{}),
|
|
ctx: ctx,
|
|
}
|
|
w.producers[id] = p
|
|
go w.serviceProducer(p)
|
|
return p, nil
|
|
}
|
|
|
|
func (w *watcher) serviceProducer(prod *producer) {
|
|
defer func() {
|
|
w.mux.Lock()
|
|
defer w.mux.Unlock()
|
|
prod.Close()
|
|
slog.InfoContext(w.ctx, "removing producer from watcher", "consumer_id", prod.id)
|
|
delete(w.producers, prod.id)
|
|
}()
|
|
for {
|
|
select {
|
|
case <-w.quit:
|
|
slog.InfoContext(w.ctx, "shutting down watcher")
|
|
return
|
|
case <-w.ctx.Done():
|
|
slog.InfoContext(w.ctx, "shutting down watcher")
|
|
return
|
|
case <-prod.quit:
|
|
slog.InfoContext(w.ctx, "closing producer")
|
|
return
|
|
case <-prod.ctx.Done():
|
|
slog.InfoContext(w.ctx, "closing producer")
|
|
return
|
|
case payload := <-prod.messages:
|
|
w.mux.Lock()
|
|
for _, c := range w.consumers {
|
|
go c.Send(payload)
|
|
}
|
|
w.mux.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *watcher) RegisterConsumer(ctx context.Context, id string, filters ...common.PayloadFilterFunc) (common.Consumer, error) {
|
|
w.mux.Lock()
|
|
defer w.mux.Unlock()
|
|
if _, ok := w.consumers[id]; ok {
|
|
return nil, common.ErrConsumerAlreadyRegistered
|
|
}
|
|
c := &consumer{
|
|
messages: make(chan common.ChangePayload, 1),
|
|
filters: filters,
|
|
quit: make(chan struct{}),
|
|
id: id,
|
|
ctx: ctx,
|
|
}
|
|
w.consumers[id] = c
|
|
go w.serviceConsumer(c)
|
|
return c, nil
|
|
}
|
|
|
|
func (w *watcher) serviceConsumer(consumer *consumer) {
|
|
defer func() {
|
|
w.mux.Lock()
|
|
defer w.mux.Unlock()
|
|
consumer.Close()
|
|
slog.InfoContext(w.ctx, "removing consumer from watcher", "consumer_id", consumer.id)
|
|
delete(w.consumers, consumer.id)
|
|
}()
|
|
slog.InfoContext(w.ctx, "starting consumer", "consumer_id", consumer.id)
|
|
for {
|
|
select {
|
|
case <-consumer.quit:
|
|
return
|
|
case <-consumer.ctx.Done():
|
|
return
|
|
case <-w.quit:
|
|
return
|
|
case <-w.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *watcher) Close() {
|
|
w.mux.Lock()
|
|
defer w.mux.Unlock()
|
|
if w.closed {
|
|
return
|
|
}
|
|
|
|
close(w.quit)
|
|
w.closed = true
|
|
|
|
for _, p := range w.producers {
|
|
p.Close()
|
|
}
|
|
|
|
for _, c := range w.consumers {
|
|
c.Close()
|
|
}
|
|
|
|
databaseWatcher = nil
|
|
}
|
|
|
|
func (w *watcher) loop() {
|
|
defer func() {
|
|
w.Close()
|
|
}()
|
|
for {
|
|
select {
|
|
case <-w.quit:
|
|
return
|
|
case <-w.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|