From 1fe09548bca0f770e9d8f09a4ae4ead38bbd1331 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 20 May 2025 13:50:56 +0000 Subject: [PATCH] Add more tests Signed-off-by: Gabriel Adrian Samfira --- database/watcher/filters.go | 13 +- database/watcher/util_test.go | 16 + database/watcher/watcher_test.go | 1274 +++++++++++++++++++++++++++++- 3 files changed, 1288 insertions(+), 15 deletions(-) create mode 100644 database/watcher/util_test.go diff --git a/database/watcher/filters.go b/database/watcher/filters.go index b09c422d..acf79ba8 100644 --- a/database/watcher/filters.go +++ b/database/watcher/filters.go @@ -205,21 +205,26 @@ func WithEntityJobFilter(ghEntity params.ForgeEntity) dbCommon.PayloadFilterFunc } } -// WithForgeCredentialsFilter returns a filter function that filters payloads by Github credentials. +// WithForgeCredentialsFilter returns a filter function that filters payloads by Github or Gitea credentials. func WithForgeCredentialsFilter(creds params.ForgeCredentials) dbCommon.PayloadFilterFunc { return func(payload dbCommon.ChangePayload) bool { - var idGetter params.IDGetter + var forgeCreds params.ForgeCredentials var ok bool switch payload.EntityType { case dbCommon.GithubCredentialsEntityType, dbCommon.GiteaCredentialsEntityType: - idGetter, ok = payload.Payload.(params.ForgeCredentials) + forgeCreds, ok = payload.Payload.(params.ForgeCredentials) default: return false } if !ok { return false } - return idGetter.GetID() == creds.GetID() + // Gite and Github creds have different models. The ID is uint, so we + // need to explicitly check their type, or risk a clash. + if forgeCreds.ForgeType != creds.ForgeType { + return false + } + return forgeCreds.GetID() == creds.GetID() } } diff --git a/database/watcher/util_test.go b/database/watcher/util_test.go new file mode 100644 index 00000000..82b94491 --- /dev/null +++ b/database/watcher/util_test.go @@ -0,0 +1,16 @@ +package watcher_test + +import ( + "time" + + "github.com/cloudbase/garm/database/common" +) + +func waitForPayload(ch <-chan common.ChangePayload, timeout time.Duration) *common.ChangePayload { + select { + case payload := <-ch: + return &payload + case <-time.After(timeout): + return nil + } +} diff --git a/database/watcher/watcher_test.go b/database/watcher/watcher_test.go index b272bda7..ab4653a9 100644 --- a/database/watcher/watcher_test.go +++ b/database/watcher/watcher_test.go @@ -20,13 +20,16 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/pkg/errors" "github.com/stretchr/testify/suite" + commonParams "github.com/cloudbase/garm-provider-common/params" "github.com/cloudbase/garm/database" "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" garmTesting "github.com/cloudbase/garm/internal/testing" + "github.com/cloudbase/garm/params" ) type WatcherTestSuite struct { @@ -50,6 +53,7 @@ func (s *WatcherTestSuite) TearDownTest() { currentWatcher := watcher.GetWatcher() if currentWatcher != nil { currentWatcher.Close() + watcher.SetWatcher(nil) } } @@ -126,7 +130,7 @@ func (s *WatcherTestSuite) TestProducerAndConsumer() { s.Require().Equal(payload, receivedPayload) } -func (s *WatcherTestSuite) TestConsumetWithFilter() { +func (s *WatcherTestSuite) TestConsumeWithFilter() { producer, err := watcher.RegisterProducer(s.ctx, "test-producer") s.Require().NoError(err) s.Require().NotNil(producer) @@ -146,12 +150,9 @@ func (s *WatcherTestSuite) TestConsumetWithFilter() { err = producer.Notify(payload) s.Require().NoError(err) - select { - case receivedPayload := <-consumer.Watch(): - s.Require().Equal(payload, receivedPayload) - case <-time.After(1 * time.Second): - s.T().Fatal("expected payload not received") - } + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) payload = common.ChangePayload{ EntityType: common.ControllerEntityType, @@ -161,11 +162,141 @@ func (s *WatcherTestSuite) TestConsumetWithFilter() { err = producer.Notify(payload) s.Require().NoError(err) - select { - case <-consumer.Watch(): - s.T().Fatal("unexpected payload received") - case <-time.After(1 * time.Second): + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithAnyFilter() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithAny( + watcher.WithEntityTypeFilter(common.ControllerEntityType), + watcher.WithEntityFilter(params.ForgeEntity{ + EntityType: params.ForgeEntityTypeRepository, + Owner: "test", + Name: "test", + ID: "test", + }), + )) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.ControllerEntityType, + Operation: common.UpdateOperation, + Payload: "test", } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.RepositoryEntityType, + Operation: common.UpdateOperation, + Payload: params.Repository{ + Owner: "test", + Name: "test", + ID: "test", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + // We're not watching for this repo + payload = common.ChangePayload{ + EntityType: common.RepositoryEntityType, + Operation: common.UpdateOperation, + Payload: params.Repository{ + Owner: "test", + Name: "test", + ID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) + + // We're not watching for orgs + payload = common.ChangePayload{ + EntityType: common.OrganizationEntityType, + Operation: common.UpdateOperation, + Payload: params.Repository{ + Owner: "test", + Name: "test", + ID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithAllFilter() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithAll( + watcher.WithEntityFilter(params.ForgeEntity{ + EntityType: params.ForgeEntityTypeRepository, + Owner: "test", + Name: "test", + ID: "test", + }), + watcher.WithOperationTypeFilter(common.CreateOperation), + )) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.RepositoryEntityType, + Operation: common.CreateOperation, + Payload: params.Repository{ + Owner: "test", + Name: "test", + ID: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.RepositoryEntityType, + Operation: common.UpdateOperation, + Payload: params.Repository{ + Owner: "test", + Name: "test", + ID: "test", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) } func maybeInitController(db common.Store) error { @@ -180,6 +311,1127 @@ func maybeInitController(db common.Store) error { return nil } +func (s *WatcherTestSuite) TestWithEntityPoolFilterRepository() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeRepository, + Owner: "test", + Name: "test", + ID: "test", + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityPoolFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.PoolEntityType, + Operation: common.UpdateOperation, + Payload: params.Pool{ + ID: "test", + RepoID: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.PoolEntityType, + Operation: common.UpdateOperation, + Payload: params.Pool{ + ID: "test", + RepoID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityPoolFilterOrg() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeOrganization, + Name: "test", + ID: "test", + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityPoolFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.PoolEntityType, + Operation: common.UpdateOperation, + Payload: params.Pool{ + ID: "test", + OrgID: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.PoolEntityType, + Operation: common.UpdateOperation, + Payload: params.Pool{ + ID: "test", + OrgID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityPoolFilterEnterprise() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeEnterprise, + Name: "test", + ID: "test", + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityPoolFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.PoolEntityType, + Operation: common.UpdateOperation, + Payload: params.Pool{ + ID: "test", + EnterpriseID: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.PoolEntityType, + Operation: common.UpdateOperation, + Payload: params.Pool{ + ID: "test", + EnterpriseID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) + + // Invalid payload for declared entity type + payload = common.ChangePayload{ + EntityType: common.PoolEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + EnterpriseID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityPoolFilterBogusEntityType() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + // This should trigger the default branch in the filter and + // return false + EntityType: params.ForgeEntityType("bogus"), + Name: "test", + ID: "test", + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityPoolFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.PoolEntityType, + Operation: common.UpdateOperation, + Payload: params.Pool{ + ID: "test", + EnterpriseID: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.PoolEntityType, + Operation: common.UpdateOperation, + Payload: params.Pool{ + ID: "test", + EnterpriseID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityScaleSetFilterRepository() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeRepository, + Owner: "test", + Name: "test", + ID: "test", + Credentials: params.ForgeCredentials{ + ForgeType: params.GithubEndpointType, + }, + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityScaleSetFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + RepoID: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + RepoID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityScaleSetFilterOrg() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeOrganization, + Name: "test", + ID: "test", + Credentials: params.ForgeCredentials{ + ForgeType: params.GithubEndpointType, + }, + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityScaleSetFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + OrgID: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + OrgID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityScaleSetFilterEnterprise() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeEnterprise, + Name: "test", + ID: "test", + Credentials: params.ForgeCredentials{ + ForgeType: params.GithubEndpointType, + }, + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityScaleSetFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + EnterpriseID: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + EnterpriseID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityScaleSetFilterBogusEntityType() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + // This should trigger the default branch in the filter and + // return false + EntityType: params.ForgeEntityType("bogus"), + Name: "test", + ID: "test", + Credentials: params.ForgeCredentials{ + ForgeType: params.GithubEndpointType, + }, + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityScaleSetFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + EnterpriseID: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + EnterpriseID: "test2", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityScaleSetFilterReturnsFalseForGiteaEndpoints() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeRepository, + Owner: "test", + Name: "test", + ID: "test", + Credentials: params.ForgeCredentials{ + ForgeType: params.GiteaEndpointType, + }, + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityScaleSetFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + RepoID: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityFilterRepository() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeRepository, + Owner: "test", + Name: "test", + ID: "test", + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.RepositoryEntityType, + Operation: common.UpdateOperation, + Payload: params.Repository{ + ID: "test", + Name: "test", + Owner: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.RepositoryEntityType, + Operation: common.UpdateOperation, + Payload: params.Repository{ + ID: "test2", + Name: "test", + Owner: "test", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityFilterOrg() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeOrganization, + Name: "test", + ID: "test", + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.OrganizationEntityType, + Operation: common.UpdateOperation, + Payload: params.Organization{ + ID: "test", + Name: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.OrganizationEntityType, + Operation: common.UpdateOperation, + Payload: params.Organization{ + ID: "test2", + Name: "test", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityFilterEnterprise() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeEnterprise, + Name: "test", + ID: "test", + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.EnterpriseEntityType, + Operation: common.UpdateOperation, + Payload: params.Enterprise{ + ID: "test", + Name: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.EnterpriseEntityType, + Operation: common.UpdateOperation, + Payload: params.Enterprise{ + ID: "test2", + Name: "test", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityJobFilterRepository() { + repoUUID, err := uuid.NewUUID() + s.Require().NoError(err) + + repoUUID2, err := uuid.NewUUID() + s.Require().NoError(err) + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeRepository, + Owner: "test", + Name: "test", + ID: repoUUID.String(), + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityJobFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.JobEntityType, + Operation: common.UpdateOperation, + Payload: params.Job{ + ID: 1, + Name: "test", + RepoID: &repoUUID, + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.JobEntityType, + Operation: common.UpdateOperation, + Payload: params.Job{ + ID: 1, + Name: "test", + RepoID: &repoUUID2, + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityJobFilterOrg() { + orgUUID, err := uuid.NewUUID() + s.Require().NoError(err) + + orgUUID2, err := uuid.NewUUID() + s.Require().NoError(err) + + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeOrganization, + Name: "test", + ID: orgUUID.String(), + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityJobFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.JobEntityType, + Operation: common.UpdateOperation, + Payload: params.Job{ + ID: 1, + Name: "test", + OrgID: &orgUUID, + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.JobEntityType, + Operation: common.UpdateOperation, + Payload: params.Job{ + ID: 1, + Name: "test", + OrgID: &orgUUID2, + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityJobFilterEnterprise() { + entUUID, err := uuid.NewUUID() + s.Require().NoError(err) + + entUUID2, err := uuid.NewUUID() + s.Require().NoError(err) + + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + EntityType: params.ForgeEntityTypeEnterprise, + Name: "test", + ID: entUUID.String(), + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityJobFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.JobEntityType, + Operation: common.UpdateOperation, + Payload: params.Job{ + ID: 1, + Name: "test", + EnterpriseID: &entUUID, + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.JobEntityType, + Operation: common.UpdateOperation, + Payload: params.Job{ + ID: 1, + Name: "test", + EnterpriseID: &entUUID2, + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithEntityJobFilterBogusEntityType() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + entity := params.ForgeEntity{ + // This should trigger the default branch in the filter and + // return false + EntityType: params.ForgeEntityType("bogus"), + Name: "test", + ID: "test", + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithEntityJobFilter(entity), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.JobEntityType, + Operation: common.UpdateOperation, + Payload: params.Job{ + ID: 1, + Name: "test", + EnterpriseID: nil, + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.JobEntityType, + Operation: common.UpdateOperation, + Payload: params.Job{ + ID: 1, + Name: "test", + EnterpriseID: nil, + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithNone() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithNone(), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.RepositoryEntityType, + Operation: common.UpdateOperation, + Payload: params.Repository{ + ID: "test", + Name: "test", + Owner: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithUserIDFilter() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + userID, err := uuid.NewUUID() + s.Require().NoError(err) + + userID2, err := uuid.NewUUID() + s.Require().NoError(err) + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithUserIDFilter(userID.String()), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.UserEntityType, + Operation: common.UpdateOperation, + Payload: params.User{ + ID: userID.String(), + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.UserEntityType, + Operation: common.UpdateOperation, + Payload: params.User{ + ID: userID2.String(), + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.UserEntityType, + Operation: common.UpdateOperation, + // Declare as user, but payload is a pool. Filter should return false. + Payload: params.Pool{}, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithForgeCredentialsGithub() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + creds := params.ForgeCredentials{ + ForgeType: params.GithubEndpointType, + ID: 1, + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithForgeCredentialsFilter(creds), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.GithubCredentialsEntityType, + Operation: common.UpdateOperation, + Payload: params.ForgeCredentials{ + ForgeType: params.GithubEndpointType, + ID: 1, + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.GiteaCredentialsEntityType, + Operation: common.UpdateOperation, + Payload: params.ForgeCredentials{ + ForgeType: params.GiteaEndpointType, + ID: 1, + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.GiteaCredentialsEntityType, + Operation: common.UpdateOperation, + Payload: params.Pool{}, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithcaleSetFilter() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + scaleSet := params.ScaleSet{ + ID: 1, + } + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithScaleSetFilter(scaleSet), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 1, + Name: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.ScaleSet{ + ID: 2, + Name: "test", + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.ScaleSetEntityType, + Operation: common.UpdateOperation, + Payload: params.Pool{}, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) +} + +func (s *WatcherTestSuite) TestWithExcludeEntityTypeFilter() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithExcludeEntityTypeFilter(common.RepositoryEntityType), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.RepositoryEntityType, + Operation: common.UpdateOperation, + Payload: params.Repository{ + ID: "test", + Name: "test", + Owner: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.OrganizationEntityType, + Operation: common.UpdateOperation, + Payload: params.Repository{ + ID: "test", + Name: "test", + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) +} + +func (s *WatcherTestSuite) TestWithInstanceStatusFilter() { + producer, err := watcher.RegisterProducer(s.ctx, "test-producer") + s.Require().NoError(err) + s.Require().NotNil(producer) + + consumer, err := watcher.RegisterConsumer( + s.ctx, "test-consumer", + watcher.WithInstanceStatusFilter( + commonParams.InstanceCreating, + commonParams.InstanceDeleting), + ) + s.Require().NoError(err) + s.Require().NotNil(consumer) + + payload := common.ChangePayload{ + EntityType: common.InstanceEntityType, + Operation: common.UpdateOperation, + Payload: params.Instance{ + ID: "test-instance", + Status: commonParams.InstanceCreating, + }, + } + err = producer.Notify(payload) + s.Require().NoError(err) + + receivedPayload := waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.InstanceEntityType, + Operation: common.UpdateOperation, + Payload: params.Instance{ + ID: "test-instance", + Status: commonParams.InstanceDeleted, + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().Nil(receivedPayload) + + payload = common.ChangePayload{ + EntityType: common.InstanceEntityType, + Operation: common.UpdateOperation, + Payload: params.Instance{ + ID: "test-instance", + Status: commonParams.InstanceDeleting, + }, + } + + err = producer.Notify(payload) + s.Require().NoError(err) + receivedPayload = waitForPayload(consumer.Watch(), 100*time.Millisecond) + s.Require().NotNil(receivedPayload) + s.Require().Equal(payload, *receivedPayload) +} + func TestWatcherTestSuite(t *testing.T) { // Watcher tests watcherSuite := &WatcherTestSuite{