Use r.log()
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
9101cdc0a2
commit
0ab8f73bb4
1 changed files with 27 additions and 27 deletions
|
|
@ -125,23 +125,23 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
|
|||
_, err := r.store.GetJobByID(r.ctx, jobParams.ID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
log.Printf("[Pool mgr %s] failed to get job %d: %s", r.helper.String(), jobParams.ID, err)
|
||||
r.log("failed to get job %d: %s", jobParams.ID, err)
|
||||
return
|
||||
}
|
||||
// This job is new to us. Check if we have a pool that can handle it.
|
||||
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), jobParams.Labels)
|
||||
if err != nil {
|
||||
log.Printf("[Pool mgr %s] failed to find pools matching tags %s: %s; not recording job", r.helper.String(), strings.Join(jobParams.Labels, ", "), err)
|
||||
r.log("failed to find pools matching tags %s: %s; not recording job", strings.Join(jobParams.Labels, ", "), err)
|
||||
return
|
||||
}
|
||||
if len(potentialPools) == 0 {
|
||||
log.Printf("[Pool mgr %s] no pools matching tags %s; not recording job", r.helper.String(), strings.Join(jobParams.Labels, ", "))
|
||||
r.log("no pools matching tags %s; not recording job", strings.Join(jobParams.Labels, ", "))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil {
|
||||
log.Printf("[Pool mgr %s] failed to update job %d: %s", r.helper.String(), jobParams.ID, jobErr)
|
||||
r.log("failed to update job %d: %s", jobParams.ID, jobErr)
|
||||
}
|
||||
|
||||
if triggeredBy != 0 && jobParams.ID != triggeredBy {
|
||||
|
|
@ -150,7 +150,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
|
|||
// still queued and we don't remove the lock, it will linger until the lock timeout is reached.
|
||||
// That may take a long time, so we break the lock here and allow it to be scheduled again.
|
||||
if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil {
|
||||
log.Printf("failed to break lock for job %d: %s", triggeredBy, err)
|
||||
r.log("failed to break lock for job %d: %s", triggeredBy, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
@ -224,7 +224,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
|
|||
return errors.Wrap(err, "getting pool")
|
||||
}
|
||||
if err := r.ensureIdleRunnersForOnePool(pool); err != nil {
|
||||
log.Printf("error ensuring idle runners for pool %s: %s", pool.ID, err)
|
||||
r.log("error ensuring idle runners for pool %s: %s", pool.ID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
@ -926,9 +926,9 @@ func (r *basePoolManager) updateArgsFromProviderInstance(providerInstance params
|
|||
}
|
||||
}
|
||||
func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool) error {
|
||||
log.Printf("scaling down pool %s", pool.ID)
|
||||
r.log("scaling down pool %s", pool.ID)
|
||||
if !pool.Enabled {
|
||||
log.Printf("pool %s is disabled, skipping scale down", pool.ID)
|
||||
r.log("pool %s is disabled, skipping scale down", pool.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -1467,17 +1467,17 @@ func (r *basePoolManager) consumeQueuedJobs() error {
|
|||
|
||||
poolsCache := poolsForTags{}
|
||||
|
||||
log.Printf("[Pool mgr %s] found %d queued jobs for %s", r.helper.String(), len(queued), r.helper.String())
|
||||
r.log("found %d queued jobs for %s", len(queued), r.helper.String())
|
||||
for _, job := range queued {
|
||||
if job.LockedBy != uuid.Nil && job.LockedBy.String() != r.ID() {
|
||||
// Job was handled by us or another entity.
|
||||
log.Printf("[Pool mgr %s] job %d is locked by %s", r.helper.String(), job.ID, job.LockedBy.String())
|
||||
r.log("job %d is locked by %s", job.ID, job.LockedBy.String())
|
||||
continue
|
||||
}
|
||||
|
||||
if time.Since(job.UpdatedAt) < time.Second*30 {
|
||||
// give the idle runners a chance to pick up the job.
|
||||
log.Printf("[Pool mgr %s] job %d was updated less than 30 seconds ago. Skipping", r.helper.String(), job.ID)
|
||||
r.log("job %d was updated less than 30 seconds ago. Skipping", job.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -1493,15 +1493,15 @@ func (r *basePoolManager) consumeQueuedJobs() error {
|
|||
return errors.Wrap(err, "deleting job")
|
||||
}
|
||||
default:
|
||||
log.Printf("[Pool mgr %s] failed to fetch job information from github: %q (status code: %d)", r.helper.String(), err, ghResp.StatusCode)
|
||||
r.log("failed to fetch job information from github: %q (status code: %d)", err, ghResp.StatusCode)
|
||||
}
|
||||
}
|
||||
log.Printf("[Pool mgr %s] error fetching workflow info: %q", r.helper.String(), err)
|
||||
r.log("error fetching workflow info: %q", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if workflow.GetStatus() != "queued" {
|
||||
log.Printf("[Pool mgr %s] job is no longer in queued state on github. New status is: %s", r.helper.String(), workflow.GetStatus())
|
||||
r.log("job is no longer in queued state on github. New status is: %s", workflow.GetStatus())
|
||||
job.Action = workflow.GetStatus()
|
||||
job.Status = workflow.GetStatus()
|
||||
job.Conclusion = workflow.GetConclusion()
|
||||
|
|
@ -1518,7 +1518,7 @@ func (r *basePoolManager) consumeQueuedJobs() error {
|
|||
job.RunnerGroupID = *workflow.RunnerGroupID
|
||||
}
|
||||
if _, err := r.store.CreateOrUpdateJob(r.ctx, job); err != nil {
|
||||
log.Printf("[Pool mgr %s] failed to update job status: %q", r.helper.String(), err)
|
||||
r.log("failed to update job status: %q", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
@ -1526,7 +1526,7 @@ func (r *basePoolManager) consumeQueuedJobs() error {
|
|||
// Job is still queued in our db and in github. Unlock it and try again.
|
||||
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
|
||||
// TODO: Implament a cache? Should we return here?
|
||||
log.Printf("[Pool mgr %s] failed to unlock job %d: %q", r.helper.String(), job.ID, err)
|
||||
r.log("failed to unlock job %d: %q", job.ID, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
@ -1536,7 +1536,7 @@ func (r *basePoolManager) consumeQueuedJobs() error {
|
|||
// TODO(gabriel-samfira): create an in-memory state of existing runners that we can easily
|
||||
// check for existing pending or idle runners. If we can't find any, attempt to allocate another
|
||||
// runner.
|
||||
log.Printf("[Pool mgr %s] job %d is locked by us", r.helper.String(), job.ID)
|
||||
r.log("job %d is locked by us", job.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -1544,20 +1544,20 @@ func (r *basePoolManager) consumeQueuedJobs() error {
|
|||
if !ok {
|
||||
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), job.Labels)
|
||||
if err != nil {
|
||||
log.Printf("[Pool mgr %s] error finding pools matching labels: %s", r.helper.String(), err)
|
||||
r.log("error finding pools matching labels: %s", err)
|
||||
continue
|
||||
}
|
||||
poolRR = poolsCache.Add(job.Labels, potentialPools)
|
||||
}
|
||||
|
||||
if poolRR.Len() == 0 {
|
||||
log.Printf("[Pool mgr %s] could not find pools with labels %s", r.helper.String(), strings.Join(job.Labels, ","))
|
||||
r.log("could not find pools with labels %s", strings.Join(job.Labels, ","))
|
||||
continue
|
||||
}
|
||||
|
||||
runnerCreated := false
|
||||
if err := r.store.LockJob(r.ctx, job.ID, r.ID()); err != nil {
|
||||
log.Printf("[Pool mgr %s] could not lock job %d: %s", r.helper.String(), job.ID, err)
|
||||
r.log("could not lock job %d: %s", job.ID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -1567,31 +1567,31 @@ func (r *basePoolManager) consumeQueuedJobs() error {
|
|||
for i := 0; i < poolRR.Len(); i++ {
|
||||
pool, err := poolRR.Next()
|
||||
if err != nil {
|
||||
log.Printf("[PoolRR %s] could not find a pool to create a runner for job %d: %s", r.helper.String(), job.ID, err)
|
||||
r.log("could not find a pool to create a runner for job %d: %s", job.ID, err)
|
||||
break
|
||||
}
|
||||
|
||||
log.Printf("[PoolRR %s] attempting to create a runner in pool %s for job %d", r.helper.String(), pool.ID, job.ID)
|
||||
r.log("attempting to create a runner in pool %s for job %d", pool.ID, job.ID)
|
||||
if err := r.addRunnerToPool(pool, jobLabels); err != nil {
|
||||
log.Printf("[PoolRR] could not add runner to pool %s: %s", pool.ID, err)
|
||||
r.log("[PoolRR] could not add runner to pool %s: %s", pool.ID, err)
|
||||
continue
|
||||
}
|
||||
log.Printf("[PoolRR %s] a new runner was added to pool %s as a response to queued job %d", r.helper.String(), pool.ID, job.ID)
|
||||
r.log("a new runner was added to pool %s as a response to queued job %d", pool.ID, job.ID)
|
||||
runnerCreated = true
|
||||
break
|
||||
}
|
||||
|
||||
if !runnerCreated {
|
||||
log.Printf("[Pool mgr %s] could not create a runner for job %d; unlocking", r.helper.String(), job.ID)
|
||||
r.log("could not create a runner for job %d; unlocking", job.ID)
|
||||
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
|
||||
log.Printf("[Pool mgr %s] failed to unlock job: %d", r.helper.String(), job.ID)
|
||||
r.log("failed to unlock job: %d", job.ID)
|
||||
return errors.Wrap(err, "unlocking job")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.store.DeleteCompletedJobs(r.ctx); err != nil {
|
||||
log.Printf("[Pool mgr %s] failed to delete completed jobs: %q", r.helper.String(), err)
|
||||
r.log("failed to delete completed jobs: %q", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue