Several fixes

* Close response body in scaleset client
* Wait for message listener loop to exit before attempting restart
* Add LastMessageID field to scaleset model and function to update it

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-04-16 23:07:29 +00:00
parent a2aeac731c
commit 19ba210804
12 changed files with 118 additions and 19 deletions

View file

@ -31,6 +31,7 @@ import (
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/params"
garmUtil "github.com/cloudbase/garm/util"
)
const maxCapacityHeader = "X-ScaleSetMaxCapacity"
@ -63,16 +64,22 @@ func (m *MessageSession) LastError() error {
}
func (m *MessageSession) loop() {
timer := time.NewTimer(1 * time.Minute)
slog.DebugContext(m.ctx, "starting message session refresh loop", "session_id", m.session.SessionID.String())
timer := time.NewTicker(1 * time.Minute)
defer timer.Stop()
defer m.Close()
if m.closed {
slog.DebugContext(m.ctx, "message session refresh loop closed")
return
}
for {
select {
case <-m.ctx.Done():
slog.DebugContext(m.ctx, "message session refresh loop context done")
return
case <-m.done:
slog.DebugContext(m.ctx, "message session refresh loop done")
return
case <-timer.C:
if err := m.maybeRefreshToken(m.ctx); err != nil {
@ -99,6 +106,7 @@ func (m *MessageSession) SessionsRelativeURL() (string, error) {
}
func (m *MessageSession) Refresh(ctx context.Context) error {
slog.DebugContext(ctx, "refreshing message session token", "session_id", m.session.SessionID.String())
m.mux.Lock()
defer m.mux.Unlock()
@ -114,13 +122,15 @@ func (m *MessageSession) Refresh(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to delete message session: %w", err)
}
defer resp.Body.Close()
var refreshedSession params.RunnerScaleSetSession
if err := json.NewDecoder(resp.Body).Decode(&refreshedSession); err != nil {
return fmt.Errorf("failed to decode response: %w", err)
}
m.session = &refreshedSession
slog.DebugContext(ctx, "refreshed message session token", "session_id", refreshedSession.SessionID.String())
m.session.MessageQueueAccessToken = refreshedSession.MessageQueueAccessToken
m.session.Statistics = refreshedSession.Statistics
return nil
}
@ -129,16 +139,23 @@ func (m *MessageSession) maybeRefreshToken(ctx context.Context) error {
return fmt.Errorf("session is nil")
}
// add some jitter
randInt, err := rand.Int(rand.Reader, big.NewInt(1000))
randInt, err := rand.Int(rand.Reader, big.NewInt(5000))
if err != nil {
return fmt.Errorf("failed to get a random number")
}
jitter := time.Duration(randInt.Int64()) * time.Millisecond
if m.session.ExpiresIn(2*time.Minute + jitter) {
expiresAt, err := m.session.ExiresAt()
if err != nil {
return fmt.Errorf("failed to get expires at: %w", err)
}
expiresIn := time.Duration(randInt.Int64())*time.Millisecond + 10*time.Minute
slog.DebugContext(ctx, "checking if message session token needs refresh", "expires_at", expiresAt)
if m.session.ExpiresIn(expiresIn) {
slog.DebugContext(ctx, "refreshing message session token")
if err := m.Refresh(ctx); err != nil {
return fmt.Errorf("failed to refresh message queue token: %w", err)
}
}
return nil
}
@ -170,6 +187,7 @@ func (m *MessageSession) GetMessage(ctx context.Context, lastMessageID int64, ma
defer resp.Body.Close()
if resp.StatusCode == http.StatusAccepted {
slog.DebugContext(ctx, "no messages available in queue")
return params.RunnerScaleSetMessage{}, nil
}
@ -200,8 +218,8 @@ func (m *MessageSession) DeleteMessage(ctx context.Context, messageID int64) err
if err != nil {
return err
}
resp.Body.Close()
return nil
}
@ -233,10 +251,13 @@ func (s *ScaleSetClient) CreateMessageSession(ctx context.Context, runnerScaleSe
return nil, fmt.Errorf("failed to decode response: %w", err)
}
msgSessionCtx := garmUtil.WithSlogContext(
ctx,
slog.Any("session_id", createdSession.SessionID.String()))
sess := &MessageSession{
ssCli: s,
session: &createdSession,
ctx: ctx,
ctx: msgSessionCtx,
done: make(chan struct{}),
closed: false,
}
@ -256,11 +277,12 @@ func (s *ScaleSetClient) DeleteMessageSession(ctx context.Context, session *Mess
return fmt.Errorf("failed to create message delete request: %w", err)
}
_, err = s.Do(req)
resp, err := s.Do(req)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("failed to delete message session: %w", err)
}
}
defer resp.Body.Close()
return nil
}

View file

@ -47,6 +47,7 @@ func (s *ScaleSetClient) GetRunnerScaleSetByNameAndRunnerGroup(ctx context.Conte
if err != nil {
return params.RunnerScaleSet{}, err
}
defer resp.Body.Close()
var runnerScaleSetList *params.RunnerScaleSetsResponse
if err := json.NewDecoder(resp.Body).Decode(&runnerScaleSetList); err != nil {
@ -72,6 +73,7 @@ func (s *ScaleSetClient) GetRunnerScaleSetByID(ctx context.Context, runnerScaleS
if err != nil {
return params.RunnerScaleSet{}, fmt.Errorf("failed to get runner scaleset with ID %d: %w", runnerScaleSetID, err)
}
defer resp.Body.Close()
var runnerScaleSet params.RunnerScaleSet
if err := json.NewDecoder(resp.Body).Decode(&runnerScaleSet); err != nil {
@ -94,6 +96,7 @@ func (s *ScaleSetClient) ListRunnerScaleSets(ctx context.Context) (*params.Runne
if err != nil {
return nil, fmt.Errorf("failed to list runner scale sets: %w", err)
}
defer resp.Body.Close()
var runnerScaleSetList params.RunnerScaleSetsResponse
if err := json.NewDecoder(resp.Body).Decode(&runnerScaleSetList); err != nil {
@ -119,6 +122,7 @@ func (s *ScaleSetClient) CreateRunnerScaleSet(ctx context.Context, runnerScaleSe
if err != nil {
return params.RunnerScaleSet{}, fmt.Errorf("failed to create runner scale set: %w", err)
}
defer resp.Body.Close()
var createdRunnerScaleSet params.RunnerScaleSet
if err := json.NewDecoder(resp.Body).Decode(&createdRunnerScaleSet); err != nil {
@ -144,6 +148,7 @@ func (s *ScaleSetClient) UpdateRunnerScaleSet(ctx context.Context, runnerScaleSe
if err != nil {
return params.RunnerScaleSet{}, fmt.Errorf("failed to make request: %w", err)
}
defer resp.Body.Close()
var ret params.RunnerScaleSet
if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
@ -164,12 +169,12 @@ func (s *ScaleSetClient) DeleteRunnerScaleSet(ctx context.Context, runnerScaleSe
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("failed to delete scale set with code %d", resp.StatusCode)
}
resp.Body.Close()
return nil
}