Add some message handling

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-04-17 10:53:22 +00:00
parent 12f40a5352
commit c177c31147
4 changed files with 98 additions and 3 deletions

View file

@ -46,6 +46,7 @@ type (
GithubAuthType string
PoolBalancerType string
ScaleSetState string
ScaleSetMessageType string
)
const (
@ -137,6 +138,17 @@ const (
ScaleSetPendingForceDelete ScaleSetState = "pending_force_delete"
)
const (
MessageTypeRunnerScaleSetJobMessages ScaleSetMessageType = "RunnerScaleSetJobMessages"
)
const (
MessageTypeJobAssigned = "JobAssigned"
MessageTypeJobCompleted = "JobCompleted"
MessageTypeJobStarted = "JobStarted"
MessageTypeJobAvailable = "JobAvailable"
)
type StatusMessage struct {
CreatedAt time.Time `json:"created_at,omitempty"`
Message string `json:"message,omitempty"`

View file

@ -10,4 +10,6 @@ type scaleSetHelper interface {
GetScaleSet() params.ScaleSet
SetLastMessageID(id int64) error
Owner() string
HandleJobsCompleted(jobs []params.ScaleSetJobMessage) error
HandleJobsStarted(jobs []params.ScaleSetJobMessage) error
}

View file

@ -25,3 +25,16 @@ func (w *Worker) SetLastMessageID(id int64) error {
}
return nil
}
// HandleJobCompleted handles a job completed message. If a job had a runner
// assigned and was not canceled before it had a chance to run, then we mark
// that runner as pending_delete.
func (w *Worker) HandleJobsCompleted(jobs []params.ScaleSetJobMessage) error {
return nil
}
// HandleJobStarted updates the runners from idle to active in the DB and
// assigns the job to them.
func (w *Worker) HandleJobsStarted(jobs []params.ScaleSetJobMessage) error {
return nil
}

View file

@ -94,6 +94,12 @@ func (l *scaleSetListener) Stop() error {
func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage) {
l.mux.Lock()
defer l.mux.Unlock()
if params.ScaleSetMessageType(msg.MessageType) != params.MessageTypeRunnerScaleSetJobMessages {
slog.DebugContext(l.ctx, "message is not a job message, ignoring")
return
}
body, err := msg.GetJobsFromBody()
if err != nil {
slog.ErrorContext(l.ctx, "getting jobs from body", "error", err)
@ -101,11 +107,68 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage
slog.InfoContext(l.ctx, "handling message", "message", msg, "body", body)
if msg.MessageID < l.lastMessageID {
slog.DebugContext(l.ctx, "message is older than last message, ignoring")
return
}
var completedJobs []params.ScaleSetJobMessage
var availableJobs []params.ScaleSetJobMessage
var startedJobs []params.ScaleSetJobMessage
for _, job := range body {
switch job.MessageType {
case params.MessageTypeJobAssigned:
slog.InfoContext(l.ctx, "new job assigned", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName)
case params.MessageTypeJobStarted:
slog.InfoContext(l.ctx, "job started", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName, "runner_name", job.RunnerName)
startedJobs = append(startedJobs, job)
case params.MessageTypeJobCompleted:
slog.InfoContext(l.ctx, "job completed", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName, "runner_name", job.RunnerName)
completedJobs = append(completedJobs, job)
case params.MessageTypeJobAvailable:
slog.InfoContext(l.ctx, "job available", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName)
availableJobs = append(availableJobs, job)
default:
slog.DebugContext(l.ctx, "unknown message type", "message_type", job.MessageType)
}
}
if len(availableJobs) > 0 {
jobIds := make([]int64, len(availableJobs))
for idx, job := range availableJobs {
jobIds[idx] = job.RunnerRequestId
}
idsAcquired, err := l.scaleSetHelper.ScaleSetCLI().AcquireJobs(
l.listenerCtx, l.scaleSetHelper.GetScaleSet().ScaleSetID,
l.messageSession.MessageQueueAccessToken(), jobIds)
if err != nil {
// don't mark message as processed. It will be requeued.
slog.ErrorContext(l.ctx, "acquiring jobs", "error", err)
return
}
slog.DebugContext(l.ctx, "acquired jobs", "job_ids", idsAcquired)
}
if len(completedJobs) > 0 {
if err := l.scaleSetHelper.HandleJobsCompleted(completedJobs); err != nil {
slog.ErrorContext(l.ctx, "error handling completed jobs", "error", err)
return
}
}
if len(startedJobs) > 0 {
if err := l.scaleSetHelper.HandleJobsStarted(startedJobs); err != nil {
slog.ErrorContext(l.ctx, "error handling started jobs", "error", err)
return
}
}
if err := l.scaleSetHelper.SetLastMessageID(msg.MessageID); err != nil {
slog.ErrorContext(l.ctx, "setting last message ID", "error", err)
} else {
l.lastMessageID = msg.MessageID
if err := l.scaleSetHelper.SetLastMessageID(msg.MessageID); err != nil {
slog.ErrorContext(l.ctx, "setting last message ID", "error", err)
}
}
if err := l.messageSession.DeleteMessage(l.listenerCtx, msg.MessageID); err != nil {
slog.ErrorContext(l.ctx, "deleting message", "error", err)
}
}
@ -127,6 +190,9 @@ func (l *scaleSetListener) loop() {
return
default:
slog.DebugContext(l.ctx, "getting message", "last_message_id", l.lastMessageID, "max_runners", l.scaleSetHelper.GetScaleSet().MaxRunners)
// TODO: consume initial message on startup and consolidate.
// The scale set may have undergone several messages while GARM was
// down.
msg, err := l.messageSession.GetMessage(
l.listenerCtx, l.lastMessageID, l.scaleSetHelper.GetScaleSet().MaxRunners)
if err != nil {
@ -153,6 +219,8 @@ func (l *scaleSetListener) loop() {
}
retryAfterUnauthorized = false
if !msg.IsNil() {
// Longpoll returns after 50 seconds. If no message arrives during that interval
// we get a nil message. We can simply ignore it and continue.
l.handleSessionMessage(msg)
}
}