diff --git a/cmd/garm-cli/cmd/jobs.go b/cmd/garm-cli/cmd/jobs.go index 30612d19..847131d6 100644 --- a/cmd/garm-cli/cmd/jobs.go +++ b/cmd/garm-cli/cmd/jobs.go @@ -63,7 +63,7 @@ func formatJobs(jobs []params.Job) { return } t := table.NewWriter() - header := table.Row{"Workflow Job ID", "Scale Set Job ID", "Name", "Status", "Conclusion", "Runner Name", "Repository", "Requested Labels", "Locked by"} + header := table.Row{"Workflow Job ID", "Name", "Status", "Conclusion", "Runner Name", "Repository", "Requested Labels", "Locked by", "Workflow job run URL"} t.AppendHeader(header) for _, job := range jobs { @@ -72,7 +72,7 @@ func formatJobs(jobs []params.Job) { if job.LockedBy != uuid.Nil { lockedBy = job.LockedBy.String() } - t.AppendRow(table.Row{job.WorkflowJobID, job.ScaleSetJobID, job.Name, job.Status, job.Conclusion, job.RunnerName, repo, strings.Join(job.Labels, " "), lockedBy}) + t.AppendRow(table.Row{job.WorkflowJobID, job.Name, job.Status, job.Conclusion, job.RunnerName, repo, strings.Join(job.Labels, " "), lockedBy, job.WorkflowRunURL}) t.AppendSeparator() } fmt.Println(t.Render()) diff --git a/database/sql/jobs.go b/database/sql/jobs.go index 0845188d..8e307582 100644 --- a/database/sql/jobs.go +++ b/database/sql/jobs.go @@ -63,11 +63,13 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) { CreatedAt: job.CreatedAt, UpdatedAt: job.UpdatedAt, LockedBy: job.LockedBy, + WorkflowRunURL: job.WorkflowRunURL, } if job.InstanceID != nil { jobParam.RunnerName = job.Instance.Name } + return jobParam, nil } @@ -273,6 +275,7 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa workflowJob.GithubRunnerID = job.GithubRunnerID workflowJob.RunnerGroupID = job.RunnerGroupID workflowJob.RunnerGroupName = job.RunnerGroupName + workflowJob.WorkflowRunURL = job.WorkflowRunURL if job.RunID != 0 && workflowJob.RunID == 0 { workflowJob.RunID = job.RunID } @@ -312,6 +315,7 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa if err != nil { return params.Job{}, fmt.Errorf("error converting job: %w", err) } + workflowJob.WorkflowRunURL = job.WorkflowRunURL if err := s.conn.Create(&workflowJob).Error; err != nil { return params.Job{}, fmt.Errorf("error creating job: %w", err) } @@ -391,7 +395,9 @@ func (s *sqlDatabase) ListAllJobs(_ context.Context) ([]params.Job, error) { var jobs []WorkflowJob query := s.conn.Model(&WorkflowJob{}) - if err := query.Preload("Instance").Find(&jobs); err.Error != nil { + if err := query. + Preload("Instance"). + Find(&jobs); err.Error != nil { if errors.Is(err.Error, gorm.ErrRecordNotFound) { return []params.Job{}, nil } diff --git a/database/sql/models.go b/database/sql/models.go index 585b6ed7..53507c0a 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -419,7 +419,8 @@ type WorkflowJob struct { RepositoryName string RepositoryOwner string - Labels datatypes.JSON + Labels datatypes.JSON + WorkflowRunURL string // The entity that received the hook. // diff --git a/params/params.go b/params/params.go index 427fbb36..30df70c0 100644 --- a/params/params.go +++ b/params/params.go @@ -1276,6 +1276,7 @@ type Job struct { // repository in which the job was triggered. RepositoryName string `json:"repository_name,omitempty"` RepositoryOwner string `json:"repository_owner,omitempty"` + WorkflowRunURL string `json:"workflow_run_url,omitempty"` Labels []string `json:"labels,omitempty"` diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 52f5133b..1dbacc47 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -1125,6 +1125,10 @@ func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) ( Labels: job.WorkflowJob.Labels, } + if job.Repository.HTMLURL != "" && job.WorkflowJob.RunID != 0 { + jobParams.WorkflowRunURL = fmt.Sprintf("%s/actions/runs/%d", strings.TrimRight(job.Repository.HTMLURL, "/"), job.WorkflowJob.RunID) + } + switch r.entity.EntityType { case params.ForgeEntityTypeEnterprise: jobParams.EnterpriseID = &asUUID diff --git a/util/github/scalesets/message_sessions.go b/util/github/scalesets/message_sessions.go index 8fafc2c4..96ad2b83 100644 --- a/util/github/scalesets/message_sessions.go +++ b/util/github/scalesets/message_sessions.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "log/slog" "math/big" "net/http" @@ -195,7 +196,11 @@ func (m *MessageSession) GetMessage(ctx context.Context, lastMessageID int64, ma } var message params.RunnerScaleSetMessage - if err := json.NewDecoder(resp.Body).Decode(&message); err != nil { + data, err := io.ReadAll(resp.Body) + if err != nil { + return params.RunnerScaleSetMessage{}, fmt.Errorf("failed to read response body: %w", err) + } + if err := json.Unmarshal(data, &message); err != nil { return params.RunnerScaleSetMessage{}, fmt.Errorf("failed to decode response: %w", err) } return message, nil diff --git a/util/logging.go b/util/logging.go index 99c69da7..771d9934 100644 --- a/util/logging.go +++ b/util/logging.go @@ -28,7 +28,34 @@ const ( var _ slog.Handler = &SlogMultiHandler{} func WithSlogContext(ctx context.Context, attrs ...slog.Attr) context.Context { - return context.WithValue(ctx, slogCtxFields, attrs) + attrMap := map[string]struct{}{} + for _, val := range attrs { + attrMap[val.Key] = struct{}{} + } + + newAttrs := attrs + existingAttrs, ok := ctx.Value(slogCtxFields).([]slog.Attr) + if ok { + for _, val := range existingAttrs { + if _, ok := attrMap[val.Key]; !ok { + newAttrs = append(newAttrs, val) + } + } + } + return context.WithValue(ctx, slogCtxFields, newAttrs) +} + +func GetSlogValuesFromContext(ctx context.Context) []slog.Attr { + vals, ok := ctx.Value(slogCtxFields).([]slog.Attr) + if ok { + return vals + } + return []slog.Attr{} +} + +func CopySlogValuesToNewCtx(sourceCtx, destCtx context.Context) context.Context { + vals := GetSlogValuesFromContext(sourceCtx) + return WithSlogContext(destCtx, vals...) } type SlogMultiHandler struct { diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index 683470b1..019623ff 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -31,6 +31,7 @@ import ( "github.com/cloudbase/garm/locking" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/runner/common" + garmUtil "github.com/cloudbase/garm/util" ) func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider) (*Worker, error) { @@ -39,6 +40,18 @@ func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleS if err != nil { return nil, fmt.Errorf("getting controller info: %w", err) } + ctx = garmUtil.WithSlogContext( + ctx, + slog.Any("worker", consumerID), + ) + scalesetEntity, err := scaleSet.GetEntity() + if err != nil { + return nil, fmt.Errorf("failed to get scaleset entity: %w", err) + } + entity, err := store.GetForgeEntity(ctx, scalesetEntity.EntityType, scalesetEntity.ID) + if err != nil { + return nil, fmt.Errorf("failed to get entity from the db: %w", err) + } return &Worker{ ctx: ctx, controllerInfo: controllerInfo, @@ -46,6 +59,7 @@ func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleS store: store, provider: provider, scaleSet: scaleSet, + entity: entity, runners: make(map[string]params.Instance), }, nil } @@ -58,6 +72,7 @@ type Worker struct { provider common.Provider store dbCommon.Store scaleSet params.ScaleSet + entity params.ForgeEntity runners map[string]params.Instance consumer dbCommon.Consumer diff --git a/workers/scaleset/scaleset_helper.go b/workers/scaleset/scaleset_helper.go index c04c92a2..e0871d19 100644 --- a/workers/scaleset/scaleset_helper.go +++ b/workers/scaleset/scaleset_helper.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "log/slog" + "strings" runnerErrors "github.com/cloudbase/garm-provider-common/errors" commonParams "github.com/cloudbase/garm-provider-common/params" @@ -69,6 +70,7 @@ func (w *Worker) recordOrUpdateJob(job params.ScaleSetJobMessage) error { return fmt.Errorf("getting entity ID as UUID: %w", err) } + baseURL := strings.TrimRight(w.entity.Credentials.BaseURL, "/") jobParams := job.ToJob() jobParams.RunnerGroupName = w.scaleSet.GitHubRunnerGroup @@ -83,6 +85,10 @@ func (w *Worker) recordOrUpdateJob(job params.ScaleSetJobMessage) error { return fmt.Errorf("unknown entity type: %s --> %s", entity.EntityType, entity) } + if baseURL != "" { + jobParams.WorkflowRunURL = fmt.Sprintf("%s/%s/%s/actions/runs/%d", baseURL, jobParams.RepositoryOwner, jobParams.RepositoryName, jobParams.RunID) + } + if _, jobErr := w.store.CreateOrUpdateJob(w.ctx, jobParams); jobErr != nil { slog.With(slog.Any("error", jobErr)).ErrorContext( w.ctx, "failed to update job", "job_id", jobParams.ID) diff --git a/workers/scaleset/scaleset_listener.go b/workers/scaleset/scaleset_listener.go index 3cdcae62..d89d3450 100644 --- a/workers/scaleset/scaleset_listener.go +++ b/workers/scaleset/scaleset_listener.go @@ -22,6 +22,7 @@ import ( runnerErrors "github.com/cloudbase/garm-provider-common/errors" "github.com/cloudbase/garm/params" + garmUtil "github.com/cloudbase/garm/util" "github.com/cloudbase/garm/util/github/scalesets" ) @@ -30,6 +31,10 @@ var closed = make(chan struct{}) func init() { close(closed) } func newListener(ctx context.Context, scaleSetHelper scaleSetHelper) *scaleSetListener { + ctx = garmUtil.WithSlogContext( + ctx, + slog.Any("sub_worker", "scaleset-listener"), + ) return &scaleSetListener{ ctx: ctx, scaleSetHelper: scaleSetHelper, @@ -59,7 +64,7 @@ type scaleSetListener struct { } func (l *scaleSetListener) Start() error { - slog.DebugContext(l.ctx, "starting scale set listener", "scale_set", l.scaleSetHelper.GetScaleSet().ScaleSetID) + slog.DebugContext(l.ctx, "starting scale set listener", "id", l.scaleSetHelper.GetScaleSet().ID, "scale_set_id", l.scaleSetHelper.GetScaleSet().ScaleSetID) l.mux.Lock() defer l.mux.Unlock() @@ -67,7 +72,15 @@ func (l *scaleSetListener) Start() error { return nil } - l.listenerCtx, l.cancelFunc = context.WithCancel(context.Background()) + listenCtx, listenCancelFunc := context.WithCancel(context.Background()) + listenCtx = garmUtil.CopySlogValuesToNewCtx(l.ctx, listenCtx) + listenCtx = garmUtil.WithSlogContext( + listenCtx, + slog.Any("session_listener", l.scaleSetHelper.GetScaleSet().ID), + ) + l.listenerCtx = listenCtx + l.cancelFunc = listenCancelFunc + scaleSet := l.scaleSetHelper.GetScaleSet() scaleSetClient, err := l.scaleSetHelper.GetScaleSetClient() if err != nil {