Skip to content

Commit 910b0c4

Browse files
committed
Add lock free queue source code
1 parent 6d423ff commit 910b0c4

File tree

4 files changed

+158
-4
lines changed

4 files changed

+158
-4
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,12 +380,12 @@ make bench
380380

381381
### Benchmark Results
382382

383-
| Benchmark | Iterations | ns/op | B/op | allocs/op |
384-
|-------------------------------------|------------|------:|-----:|----------:|
385-
| [Greet](template_benchmark_test.go) | 21,179,739 | 56.59 | 40 | 2 |
383+
| Benchmark | Iterations | ns/op | B/op | allocs/op |
384+
|--------------------------------------------------|------------|------:|-----:|----------:|
385+
| [LockFreeQ](lock_free_queue_benchmark_test.go) | 15076060 | 73.92 | 16 | 1 |
386386

387387
> These benchmarks reflect fast, allocation-free lookups for most retrieval functions, ensuring optimal performance in production environments.
388-
> Performance benchmarks for the core functions in this library, executed on an Apple M1 Max (ARM64).
388+
> Performance benchmarks for the core functions in this library, executed on an 13th Gen Intel i7-1360P (AMD64).
389389
390390
<br/>
391391

lock_free_queue.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Package lockfreequeue provides operations for a FIFO structure with
2+
// operations to enqueue and dequeue generic values.
3+
package lockfreequeue
4+
5+
import (
6+
"sync/atomic"
7+
)
8+
9+
type node[T any] struct {
10+
value T
11+
next atomic.Pointer[node[T]]
12+
}
13+
14+
// LockFreeQ represents a FIFO structure with operations to enqueue
15+
// and dequeue generic values.
16+
// This implementation is concurrent safe for queueing, but not for dequeueing.
17+
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
18+
type LockFreeQ[T any] struct {
19+
head *node[T]
20+
tail atomic.Pointer[node[T]]
21+
}
22+
23+
// NewLockFreeQ creates and initializes a LockFreeQueue
24+
func NewLockFreeQ[T any]() *LockFreeQ[T] {
25+
return &LockFreeQ[T]{
26+
head: &node[T]{},
27+
tail: atomic.Pointer[node[T]]{},
28+
}
29+
}
30+
31+
// Enqueue adds a series of Request to the queue
32+
// Enqueue is thread safe, it uses atomic operations to add to the queue
33+
func (q *LockFreeQ[T]) Enqueue(v T) {
34+
node := &node[T]{value: v}
35+
prev := q.tail.Swap(node)
36+
37+
if prev == nil {
38+
q.head.next.Store(node)
39+
return
40+
}
41+
42+
prev.next.Store(node)
43+
}
44+
45+
// Dequeue removes a Request from the queue
46+
// Dequeue is not thread safe, it should only be called from a single thread !!!
47+
func (q *LockFreeQ[T]) Dequeue() *T {
48+
next := q.head.next.Load()
49+
50+
if next == nil {
51+
return nil
52+
}
53+
54+
q.head = next
55+
56+
return &next.value
57+
}
58+
59+
// IsEmpty determines if the queue is empty
60+
func (q *LockFreeQ[T]) IsEmpty() bool {
61+
return q.head.next.Load() == nil
62+
}

lock_free_queue_benchmark_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,19 @@
11
package lockfreequeue
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func BenchmarkLockFreeQueue(b *testing.B) {
8+
q := NewLockFreeQ[int]()
9+
10+
for i := 0; i < b.N; i++ {
11+
q.Enqueue(i)
12+
}
13+
14+
for i := 0; i < b.N; i++ {
15+
if val := q.Dequeue(); val == nil {
16+
b.Errorf("Expected a value, got nil at iteration %d", i)
17+
}
18+
}
19+
}

lock_free_queue_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package lockfreequeue
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestEnqueueDequeue(t *testing.T) {
11+
q := NewLockFreeQ[int]() // Assuming your LockFreeQ works with int for this example
12+
13+
// Enqueue elements
14+
q.Enqueue(1)
15+
q.Enqueue(2)
16+
17+
// Dequeue and check elements
18+
if val := q.Dequeue(); val == nil || *val != 1 {
19+
t.Errorf("Expected 1, got %v", val)
20+
}
21+
22+
if val := q.Dequeue(); val == nil || *val != 2 {
23+
t.Errorf("Expected 2, got %v", val)
24+
}
25+
26+
// Check if queue is empty
27+
assert.True(t, q.IsEmpty(), "Expected queue to be empty after dequeuing all elements")
28+
assert.Nil(t, q.Dequeue())
29+
}
30+
31+
func TestConcurrentEnqueue(t *testing.T) {
32+
q := NewLockFreeQ[int]()
33+
34+
var wg sync.WaitGroup
35+
36+
numWorkers := 100 // Number of concurrent goroutines
37+
numEnqueues := 10 // Number of enqueues per goroutine
38+
39+
for i := 0; i < numWorkers; i++ {
40+
wg.Add(1)
41+
42+
go func(workerID int) {
43+
defer wg.Done()
44+
45+
for j := 0; j < numEnqueues; j++ {
46+
q.Enqueue(workerID*numEnqueues + j)
47+
}
48+
}(i)
49+
}
50+
51+
wg.Wait()
52+
53+
// Assuming your dequeue is not concurrently safe, this part is tricky.
54+
// We can't guarantee the order of elements, but we can check if all elements are present.
55+
// This part of the test will need adjustment based on your dequeue method's thread safety.
56+
seen := make(map[int]bool)
57+
58+
for i := 0; i < numWorkers*numEnqueues; i++ {
59+
val := q.Dequeue()
60+
if val == nil {
61+
t.Fatalf("Expected a value, got nil at iteration %d", i)
62+
}
63+
64+
if seen[*val] {
65+
t.Errorf("Duplicate value detected: %v", *val)
66+
}
67+
68+
seen[*val] = true
69+
}
70+
71+
if !q.IsEmpty() {
72+
t.Errorf("Expected queue to be empty after all dequeues")
73+
}
74+
}

0 commit comments

Comments
 (0)