Skip to content

Commit 6f74768

Browse files
ajax-ryzhyi-rjamengualclaude
authored
fix: Fix crashes (#5669)
Signed-off-by: Roma Ryzhyi <[email protected]> Co-authored-by: PePe Amengual <[email protected]> Co-authored-by: Claude <[email protected]>
1 parent a25062b commit 6f74768

File tree

2 files changed

+204
-23
lines changed

2 files changed

+204
-23
lines changed

server/jobs/project_command_output_handler.go

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -109,25 +109,22 @@ func NewAsyncProjectCommandOutputHandler(
109109
}
110110

111111
func (p *AsyncProjectCommandOutputHandler) GetPullToJobMapping() []PullInfoWithJobIDs {
112-
113-
pullToJobMappings := []PullInfoWithJobIDs{}
114-
i := 0
112+
var pullToJobMappings []PullInfoWithJobIDs
115113

116114
p.pullToJobMapping.Range(func(key, value interface{}) bool {
117115
pullInfo := key.(PullInfo)
118-
jobIDMap := value.(map[string]JobIDInfo)
119-
120-
p := PullInfoWithJobIDs{
121-
Pull: pullInfo,
122-
JobIDInfos: make([]JobIDInfo, 0, len(jobIDMap)),
123-
}
116+
jobIDSyncMap := value.(*sync.Map)
124117

125-
for _, JobIDInfo := range jobIDMap {
126-
p.JobIDInfos = append(p.JobIDInfos, JobIDInfo)
127-
}
118+
var jobIDInfos []JobIDInfo
119+
jobIDSyncMap.Range(func(_, v interface{}) bool {
120+
jobIDInfos = append(jobIDInfos, v.(JobIDInfo))
121+
return true
122+
})
128123

129-
pullToJobMappings = append(pullToJobMappings, p)
130-
i++
124+
pullToJobMappings = append(pullToJobMappings, PullInfoWithJobIDs{
125+
Pull: pullInfo,
126+
JobIDInfos: jobIDInfos,
127+
})
131128
return true
132129
})
133130

@@ -192,16 +189,16 @@ func (p *AsyncProjectCommandOutputHandler) Handle() {
192189

193190
// Add job to pullToJob mapping
194191
if _, ok := p.pullToJobMapping.Load(msg.JobInfo.PullInfo); !ok {
195-
p.pullToJobMapping.Store(msg.JobInfo.PullInfo, map[string]JobIDInfo{})
192+
p.pullToJobMapping.Store(msg.JobInfo.PullInfo, &sync.Map{})
196193
}
197194
value, _ := p.pullToJobMapping.Load(msg.JobInfo.PullInfo)
198-
jobMapping := value.(map[string]JobIDInfo)
199-
jobMapping[msg.JobID] = JobIDInfo{
195+
jobMapping := value.(*sync.Map)
196+
jobMapping.Store(msg.JobID, JobIDInfo{
200197
JobID: msg.JobID,
201198
JobDescription: msg.JobInfo.JobDescription,
202199
Time: time.Now(),
203200
JobStep: msg.JobInfo.JobStep,
204-
}
201+
})
205202

206203
// Forward new message to all receiver channels and output buffer
207204
p.writeLogLine(msg.JobID, msg.Line)
@@ -291,33 +288,44 @@ func (p *AsyncProjectCommandOutputHandler) Deregister(jobID string, ch chan stri
291288
}
292289

293290
func (p *AsyncProjectCommandOutputHandler) GetReceiverBufferForPull(jobID string) map[chan string]bool {
291+
p.receiverBuffersLock.RLock()
292+
defer p.receiverBuffersLock.RUnlock()
294293
return p.receiverBuffers[jobID]
295294
}
296295

297296
func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(jobID string) OutputBuffer {
297+
p.projectOutputBuffersLock.RLock()
298+
defer p.projectOutputBuffersLock.RUnlock()
298299
return p.projectOutputBuffers[jobID]
299300
}
300301

301302
func (p *AsyncProjectCommandOutputHandler) GetJobIDMapForPull(pullInfo PullInfo) map[string]JobIDInfo {
303+
result := make(map[string]JobIDInfo)
302304
if value, ok := p.pullToJobMapping.Load(pullInfo); ok {
303-
return value.(map[string]JobIDInfo)
305+
jobIDSyncMap := value.(*sync.Map)
306+
jobIDSyncMap.Range(func(k, v interface{}) bool {
307+
result[k.(string)] = v.(JobIDInfo)
308+
return true
309+
})
310+
return result
304311
}
305312
return nil
306313
}
307314

308315
func (p *AsyncProjectCommandOutputHandler) CleanUp(pullInfo PullInfo) {
309316
if value, ok := p.pullToJobMapping.Load(pullInfo); ok {
310-
jobMapping := value.(map[string]JobIDInfo)
311-
for jobID := range jobMapping {
317+
jobIDSyncMap := value.(*sync.Map)
318+
jobIDSyncMap.Range(func(k, _ interface{}) bool {
319+
jobID := k.(string)
312320
p.projectOutputBuffersLock.Lock()
313321
delete(p.projectOutputBuffers, jobID)
314322
p.projectOutputBuffersLock.Unlock()
315323

316324
p.receiverBuffersLock.Lock()
317325
delete(p.receiverBuffers, jobID)
318326
p.receiverBuffersLock.Unlock()
319-
}
320-
327+
return true
328+
})
321329
// Remove job mapping
322330
p.pullToJobMapping.Delete(pullInfo)
323331
}

server/jobs/project_command_output_handler_test.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package jobs_test
22

33
import (
4+
"fmt"
45
"sync"
56
"testing"
67
"time"
@@ -252,3 +253,175 @@ func TestProjectCommandOutputHandler(t *testing.T) {
252253
assert.True(t, <-opComplete)
253254
})
254255
}
256+
257+
// TestRaceConditionPrevention tests that our fixes prevent the specific race conditions
258+
func TestRaceConditionPrevention(t *testing.T) {
259+
logger := logging.NewNoopLogger(t)
260+
prjCmdOutputChan := make(chan *jobs.ProjectCmdOutputLine)
261+
handler := jobs.NewAsyncProjectCommandOutputHandler(prjCmdOutputChan, logger)
262+
263+
// Start the handler
264+
go handler.Handle()
265+
266+
ctx := createTestProjectCmdContext(t)
267+
pullInfo := jobs.PullInfo{
268+
PullNum: ctx.Pull.Num,
269+
Repo: ctx.BaseRepo.Name,
270+
RepoFullName: ctx.BaseRepo.FullName,
271+
ProjectName: ctx.ProjectName,
272+
Path: ctx.RepoRelDir,
273+
Workspace: ctx.Workspace,
274+
}
275+
276+
t.Run("concurrent pullToJobMapping access", func(t *testing.T) {
277+
var wg sync.WaitGroup
278+
numGoroutines := 50
279+
280+
// This test specifically targets the original race condition
281+
// that was fixed by using sync.Map for pullToJobMapping
282+
283+
// Concurrent writers (Handle() method updates the mapping)
284+
for i := 0; i < numGoroutines; i++ {
285+
wg.Add(1)
286+
go func(id int) {
287+
defer wg.Done()
288+
// Send message which triggers Handle() to update pullToJobMapping
289+
handler.Send(ctx, fmt.Sprintf("message-%d", id), false)
290+
}(i)
291+
}
292+
293+
// Concurrent readers (GetPullToJobMapping() method reads the mapping)
294+
for i := 0; i < numGoroutines; i++ {
295+
wg.Add(1)
296+
go func() {
297+
defer wg.Done()
298+
// This would race with Handle() before the sync.Map fix
299+
mappings := handler.GetPullToJobMapping()
300+
_ = mappings
301+
}()
302+
}
303+
304+
// Concurrent readers of GetJobIDMapForPull
305+
for i := 0; i < numGoroutines; i++ {
306+
wg.Add(1)
307+
go func() {
308+
defer wg.Done()
309+
// This would also race with Handle() before the fix
310+
jobMap := handler.(*jobs.AsyncProjectCommandOutputHandler).GetJobIDMapForPull(pullInfo)
311+
_ = jobMap
312+
}()
313+
}
314+
315+
wg.Wait()
316+
})
317+
318+
t.Run("concurrent buffer access", func(t *testing.T) {
319+
var wg sync.WaitGroup
320+
numGoroutines := 30
321+
322+
// First populate some data
323+
handler.Send(ctx, "initial", false)
324+
time.Sleep(5 * time.Millisecond)
325+
326+
// Test the race condition we fixed in GetProjectOutputBuffer
327+
for i := 0; i < numGoroutines; i++ {
328+
wg.Add(1)
329+
go func() {
330+
defer wg.Done()
331+
// This would race with completeJob() before the RLock fix
332+
buffer := handler.(*jobs.AsyncProjectCommandOutputHandler).GetProjectOutputBuffer(ctx.JobID)
333+
_ = buffer
334+
}()
335+
}
336+
337+
// Concurrent operations that modify the buffer
338+
for i := 0; i < numGoroutines; i++ {
339+
wg.Add(1)
340+
go func(id int) {
341+
defer wg.Done()
342+
if id%10 == 0 {
343+
// Occasionally complete a job to test completeJob() race
344+
handler.Send(ctx, "", true)
345+
} else {
346+
handler.Send(ctx, "test", false)
347+
}
348+
}(i)
349+
}
350+
351+
wg.Wait()
352+
})
353+
354+
// Clean up
355+
close(prjCmdOutputChan)
356+
}
357+
358+
// TestHighConcurrencyStress performs stress testing with many concurrent operations
359+
func TestHighConcurrencyStress(t *testing.T) {
360+
if testing.Short() {
361+
t.Skip("Skipping stress test in short mode")
362+
}
363+
364+
logger := logging.NewNoopLogger(t)
365+
prjCmdOutputChan := make(chan *jobs.ProjectCmdOutputLine)
366+
handler := jobs.NewAsyncProjectCommandOutputHandler(prjCmdOutputChan, logger)
367+
368+
// Start the handler
369+
go handler.Handle()
370+
371+
var wg sync.WaitGroup
372+
numWorkers := 20
373+
operationsPerWorker := 100
374+
375+
// Multiple workers performing mixed operations
376+
wg.Add(numWorkers)
377+
for worker := 0; worker < numWorkers; worker++ {
378+
go func(workerID int) {
379+
defer wg.Done()
380+
381+
ctx := createTestProjectCmdContext(t)
382+
ctx.JobID = "worker-job-" + fmt.Sprintf("%d", workerID)
383+
ctx.Pull.Num = workerID
384+
385+
pullInfo := jobs.PullInfo{
386+
PullNum: ctx.Pull.Num,
387+
Repo: ctx.BaseRepo.Name,
388+
RepoFullName: ctx.BaseRepo.FullName,
389+
ProjectName: ctx.ProjectName,
390+
Path: ctx.RepoRelDir,
391+
Workspace: ctx.Workspace,
392+
}
393+
394+
for op := 0; op < operationsPerWorker; op++ {
395+
switch op % 6 {
396+
case 0:
397+
// Send messages
398+
handler.Send(ctx, "stress test message", false)
399+
case 1:
400+
// Read pull to job mapping
401+
mappings := handler.GetPullToJobMapping()
402+
_ = mappings
403+
case 2:
404+
// Read job ID map for pull
405+
jobMap := handler.(*jobs.AsyncProjectCommandOutputHandler).GetJobIDMapForPull(pullInfo)
406+
_ = jobMap
407+
case 3:
408+
// Read project output buffer
409+
buffer := handler.(*jobs.AsyncProjectCommandOutputHandler).GetProjectOutputBuffer(ctx.JobID)
410+
_ = buffer
411+
case 4:
412+
// Read receiver buffer
413+
receivers := handler.(*jobs.AsyncProjectCommandOutputHandler).GetReceiverBufferForPull(ctx.JobID)
414+
_ = receivers
415+
case 5:
416+
// Occasional cleanup
417+
if op%20 == 0 {
418+
handler.CleanUp(pullInfo)
419+
}
420+
}
421+
}
422+
}(worker)
423+
}
424+
425+
wg.Wait()
426+
close(prjCmdOutputChan)
427+
}

0 commit comments

Comments
 (0)