Skip to content

Commit e809e65

Browse files
authored
Merge pull request #6582 from thaJeztah/fix_stats_cancellation
cli/command/container: RunStats: simplify, and fix context-cancellation
2 parents e50c94f + e01ce69 commit e809e65

File tree

2 files changed

+90
-57
lines changed

2 files changed

+90
-57
lines changed

cli/command/container/stats.go

Lines changed: 79 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ import (
1111
"time"
1212

1313
"github.com/containerd/errdefs"
14+
"github.com/containerd/log"
1415
"github.com/docker/cli/cli"
1516
"github.com/docker/cli/cli/command"
1617
"github.com/docker/cli/cli/command/completion"
1718
"github.com/docker/cli/cli/command/formatter"
1819
flagsHelper "github.com/docker/cli/cli/flags"
1920
"github.com/moby/moby/api/types/events"
2021
"github.com/moby/moby/client"
21-
"github.com/sirupsen/logrus"
2222
"github.com/spf13/cobra"
2323
)
2424

@@ -113,8 +113,10 @@ func RunStats(ctx context.Context, dockerCLI command.Cli, options *StatsOptions)
113113

114114
// waitFirst is a WaitGroup to wait first stat data's reach for each container
115115
waitFirst := &sync.WaitGroup{}
116-
// closeChan is a non-buffered channel used to collect errors from goroutines.
117-
closeChan := make(chan error)
116+
// closeChan is used to collect errors from goroutines. It uses a small buffer
117+
// to avoid blocking sends when sends occur after closeChan is set to nil or
118+
// after the reader has exited, preventing deadlocks.
119+
closeChan := make(chan error, 4)
118120
cStats := stats{}
119121

120122
showAll := len(options.Containers) == 0
@@ -139,24 +141,34 @@ func RunStats(ctx context.Context, dockerCLI command.Cli, options *StatsOptions)
139141
eh := newEventHandler()
140142
if options.All {
141143
eh.setHandler(events.ActionCreate, func(e events.Message) {
142-
s := NewStats(e.Actor.ID)
143-
if cStats.add(s) {
144+
if s := NewStats(e.Actor.ID); cStats.add(s) {
144145
waitFirst.Add(1)
146+
log.G(ctx).WithFields(map[string]any{
147+
"event": e.Action,
148+
"container": e.Actor.ID,
149+
}).Debug("collecting stats for container")
145150
go collect(ctx, s, apiClient, !options.NoStream, waitFirst)
146151
}
147152
})
148153
}
149154

150155
eh.setHandler(events.ActionStart, func(e events.Message) {
151-
s := NewStats(e.Actor.ID)
152-
if cStats.add(s) {
156+
if s := NewStats(e.Actor.ID); cStats.add(s) {
153157
waitFirst.Add(1)
158+
log.G(ctx).WithFields(map[string]any{
159+
"event": e.Action,
160+
"container": e.Actor.ID,
161+
}).Debug("collecting stats for container")
154162
go collect(ctx, s, apiClient, !options.NoStream, waitFirst)
155163
}
156164
})
157165

158166
if !options.All {
159167
eh.setHandler(events.ActionDie, func(e events.Message) {
168+
log.G(ctx).WithFields(map[string]any{
169+
"event": e.Action,
170+
"container": e.Actor.ID,
171+
}).Debug("stop collecting stats for container")
160172
cStats.remove(e.Actor.ID)
161173
})
162174
}
@@ -182,10 +194,17 @@ func RunStats(ctx context.Context, dockerCLI command.Cli, options *StatsOptions)
182194
select {
183195
case <-stopped:
184196
return
197+
case <-ctx.Done():
198+
return
185199
case event := <-eventChan:
186200
c <- event
187201
case err := <-errChan:
188-
closeChan <- err
202+
// Prevent blocking if closeChan is full or unread
203+
select {
204+
case closeChan <- err:
205+
default:
206+
// drop if not read; avoids deadlock
207+
}
189208
return
190209
}
191210
}
@@ -209,9 +228,11 @@ func RunStats(ctx context.Context, dockerCLI command.Cli, options *StatsOptions)
209228
return err
210229
}
211230
for _, ctr := range cs {
212-
s := NewStats(ctr.ID)
213-
if cStats.add(s) {
231+
if s := NewStats(ctr.ID); cStats.add(s) {
214232
waitFirst.Add(1)
233+
log.G(ctx).WithFields(map[string]any{
234+
"container": ctr.ID,
235+
}).Debug("collecting stats for container")
215236
go collect(ctx, s, apiClient, !options.NoStream, waitFirst)
216237
}
217238
}
@@ -230,15 +251,18 @@ func RunStats(ctx context.Context, dockerCLI command.Cli, options *StatsOptions)
230251
// Create the list of containers, and start collecting stats for all
231252
// containers passed.
232253
for _, ctr := range options.Containers {
233-
s := NewStats(ctr)
234-
if cStats.add(s) {
254+
if s := NewStats(ctr); cStats.add(s) {
235255
waitFirst.Add(1)
256+
log.G(ctx).WithFields(map[string]any{
257+
"container": ctr,
258+
}).Debug("collecting stats for container")
236259
go collect(ctx, s, apiClient, !options.NoStream, waitFirst)
237260
}
238261
}
239262

240-
// We don't expect any asynchronous errors: closeChan can be closed.
263+
// We don't expect any asynchronous errors: closeChan can be closed and disabled.
241264
close(closeChan)
265+
closeChan = nil
242266

243267
// make sure each container get at least one valid stat data
244268
waitFirst.Wait()
@@ -257,7 +281,7 @@ func RunStats(ctx context.Context, dockerCLI command.Cli, options *StatsOptions)
257281
}
258282

259283
format := options.Format
260-
if len(format) == 0 {
284+
if format == "" {
261285
if len(dockerCLI.ConfigFile().StatsFormat) > 0 {
262286
format = dockerCLI.ConfigFile().StatsFormat
263287
} else {
@@ -274,63 +298,68 @@ func RunStats(ctx context.Context, dockerCLI command.Cli, options *StatsOptions)
274298
Format: NewStatsFormat(format, daemonOSType),
275299
}
276300

277-
var err error
278-
ticker := time.NewTicker(500 * time.Millisecond)
279-
defer ticker.Stop()
280-
for range ticker.C {
281-
var ccStats []StatsEntry
301+
if options.NoStream {
282302
cStats.mu.RLock()
303+
ccStats := make([]StatsEntry, 0, len(cStats.cs))
283304
for _, c := range cStats.cs {
284305
ccStats = append(ccStats, c.GetStatistics())
285306
}
286307
cStats.mu.RUnlock()
287308

288-
if !options.NoStream {
309+
if len(ccStats) == 0 {
310+
return nil
311+
}
312+
if err := statsFormatWrite(statsCtx, ccStats, daemonOSType, !options.NoTrunc); err != nil {
313+
return err
314+
}
315+
_, _ = fmt.Fprint(dockerCLI.Out(), statsTextBuffer.String())
316+
return nil
317+
}
318+
319+
ticker := time.NewTicker(500 * time.Millisecond)
320+
defer ticker.Stop()
321+
for {
322+
select {
323+
case <-ticker.C:
324+
cStats.mu.RLock()
325+
ccStats := make([]StatsEntry, 0, len(cStats.cs))
326+
for _, c := range cStats.cs {
327+
ccStats = append(ccStats, c.GetStatistics())
328+
}
329+
cStats.mu.RUnlock()
330+
289331
// Start by moving the cursor to the top-left
290332
_, _ = fmt.Fprint(&statsTextBuffer, "\033[H")
291-
}
292333

293-
if err = statsFormatWrite(statsCtx, ccStats, daemonOSType, !options.NoTrunc); err != nil {
294-
break
295-
}
334+
if err := statsFormatWrite(statsCtx, ccStats, daemonOSType, !options.NoTrunc); err != nil {
335+
return err
336+
}
296337

297-
if !options.NoStream {
298338
for _, line := range strings.Split(statsTextBuffer.String(), "\n") {
299339
// In case the new text is shorter than the one we are writing over,
300340
// we'll append the "erase line" escape sequence to clear the remaining text.
301341
_, _ = fmt.Fprintln(&statsTextBuffer, line, "\033[K")
302342
}
303-
304343
// We might have fewer containers than before, so let's clear the remaining text
305344
_, _ = fmt.Fprint(&statsTextBuffer, "\033[J")
306-
}
307345

308-
_, _ = fmt.Fprint(dockerCLI.Out(), statsTextBuffer.String())
309-
statsTextBuffer.Reset()
346+
_, _ = fmt.Fprint(dockerCLI.Out(), statsTextBuffer.String())
347+
statsTextBuffer.Reset()
310348

311-
if len(cStats.cs) == 0 && !showAll {
312-
break
313-
}
314-
if options.NoStream {
315-
break
316-
}
317-
select {
349+
if len(ccStats) == 0 && !showAll {
350+
return nil
351+
}
318352
case err, ok := <-closeChan:
319-
if ok {
320-
if err != nil {
321-
// Suppress "unexpected EOF" errors in the CLI so that
322-
// it shuts down cleanly when the daemon restarts.
323-
if errors.Is(err, io.ErrUnexpectedEOF) {
324-
return nil
325-
}
326-
return err
327-
}
353+
if !ok || err == nil || errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
354+
// Suppress "unexpected EOF" errors in the CLI so that
355+
// it shuts down cleanly when the daemon restarts.
356+
return nil
328357
}
329-
default:
330-
// just skip
358+
return err
359+
case <-ctx.Done():
360+
return ctx.Err()
331361
}
332362
}
333-
return err
334363
}
335364

336365
// newEventHandler initializes and returns an eventHandler
@@ -357,11 +386,11 @@ func (eh *eventHandler) watch(c <-chan events.Message) {
357386
continue
358387
}
359388
if e.Actor.ID == "" {
360-
logrus.WithField("event", e).Errorf("event handler: received %s event with empty ID", e.Action)
389+
log.G(context.TODO()).WithField("event", e).Errorf("event handler: received %s event with empty ID", e.Action)
361390
continue
362391
}
363392

364-
logrus.WithField("event", e).Debugf("event handler: received %s event for: %s", e.Action, e.Actor.ID)
393+
log.G(context.TODO()).WithField("event", e).Debugf("event handler: received %s event for: %s", e.Action, e.Actor.ID)
365394
go h(e)
366395
}
367396
}

cli/command/container/stats_helpers.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
"github.com/moby/moby/api/types/container"
1212
"github.com/moby/moby/client"
13-
"github.com/sirupsen/logrus"
1413
)
1514

1615
type stats struct {
@@ -50,8 +49,7 @@ func (s *stats) isKnownContainer(cid string) (int, bool) {
5049
return -1, false
5150
}
5251

53-
func collect(ctx context.Context, s *Stats, cli client.ContainerAPIClient, streamStats bool, waitFirst *sync.WaitGroup) {
54-
logrus.Debugf("collecting stats for %s", s.Container)
52+
func collect(ctx context.Context, s *Stats, cli client.ContainerAPIClient, streamStats bool, waitFirst *sync.WaitGroup) { //nolint:gocyclo
5553
var (
5654
getFirst bool
5755
previousCPU uint64
@@ -72,11 +70,14 @@ func collect(ctx context.Context, s *Stats, cli client.ContainerAPIClient, strea
7270
s.SetError(err)
7371
return
7472
}
75-
defer response.Body.Close()
7673

77-
dec := json.NewDecoder(response.Body)
7874
go func() {
75+
defer response.Body.Close()
76+
dec := json.NewDecoder(response.Body)
7977
for {
78+
if ctx.Err() != nil {
79+
return
80+
}
8081
var (
8182
v *container.StatsResponse
8283
memPercent, cpuPercent float64
@@ -143,8 +144,8 @@ func collect(ctx context.Context, s *Stats, cli client.ContainerAPIClient, strea
143144
}
144145
case err := <-u:
145146
s.SetError(err)
146-
if err == io.EOF {
147-
break
147+
if errors.Is(err, io.EOF) {
148+
return
148149
}
149150
if err != nil {
150151
continue
@@ -154,6 +155,9 @@ func collect(ctx context.Context, s *Stats, cli client.ContainerAPIClient, strea
154155
getFirst = true
155156
waitFirst.Done()
156157
}
158+
case <-ctx.Done():
159+
s.SetError(ctx.Err())
160+
return
157161
}
158162
if !streamStats {
159163
return

0 commit comments

Comments
 (0)