Skip to content

Conversation

@jage
Copy link
Member

@jage jage commented Oct 13, 2025

WHAT is this pull request doing?

Speed up internal stats collection, a ring buffer should be faster for our use case.

While not the most common operation, for a broker with a lot of objects (queues, exchanges etc) we want to minimize any CPU time spent on just keeping track of the stats.

Improvements to the to_a method that's used in the UI/API, i.e. any time we refresh the web page with new stats (proposed to be once per second in #1195) should help a bit, even though the full JSON serialization is probably much more expensive. Getting around 9% improvements in the simulated http_benchmark.cr

See benchmark/ directory for all micro benchmarks (should be cleaned up before we merge this). Also kept the old Deque based code until we're ready to say this is good.

RingBuffer (steady-state) 355.76M (  2.81ns) (± 5.70%)  0.0B/op        fastest
     Deque (steady-state) 177.83M (  5.62ns) (± 2.41%)  0.0B/op   2.00× slower

RingBuffer (fill from empty) 285.64k (  3.50µs) (± 0.76%)  12.1kB/op        fastest
     Deque (fill from empty) 202.75k (  4.93µs) (± 1.55%)  12.1kB/op   1.41× slower

RingBuffer (to_a) 872.70k (  1.15µs) (± 0.84%)  12.1kB/op        fastest
     Deque (to_a) 146.77k (  6.81µs) (± 0.94%)  12.1kB/op   5.95× slower

Why RingBuffer is faster:

Bitwise Modulo Operations

  • RingBuffer uses & @mask for wrap-around (single CPU instruction)
  • Deque uses conditional branches (if index >= @capacity)
  • Bitwise AND avoids branch misprediction penalties

No Memory Shifting

  • RingBuffer: Simply overwrites oldest value when full
  • Deque: Calls shift to remove first element, causing memory movement

Simpler Push Logic

  • RingBuffer: Direct write to calculated position
  • Deque: Check size → shift if needed → then push (3 operations vs 1)

CPU Cache Optimization

  • RingBuffer: Predictable circular access pattern
  • Deque: Shift operations invalidate cache lines

No Dynamic Memory Management

  • RingBuffer: Fixed allocation, never grows
  • Deque: Resize checks and potential reallocation overhead

HOW can this pull request be tested?

Specs and looking at the stats manually, but we might want to improve specs.

A ring buffer should be faster for our use case.

While not the most common operation, for a broker with a lot of objects
(queues, exchanges etc) we want to minimize any CPU time spent on just
keeping track of the stats.

Improvements to the `to_a` method that's used in the UI/API, i.e. any
time we refresh the web page with new stats (proposed to be once per
second in #1195) should help a bit, even though the full JSON
serialization is probably much more expensive. Getting around 9%
improvements in the simulated http_benchmark.cr

See benchmark/ directory for all micro benchmarks (should be cleaned up before we merge this).
Also kept the old Deque based code until we're ready to say this is good.

```
RingBuffer (steady-state) 355.76M (  2.81ns) (± 5.70%)  0.0B/op        fastest
     Deque (steady-state) 177.83M (  5.62ns) (± 2.41%)  0.0B/op   2.00× slower

RingBuffer (fill from empty) 285.64k (  3.50µs) (± 0.76%)  12.1kB/op        fastest
     Deque (fill from empty) 202.75k (  4.93µs) (± 1.55%)  12.1kB/op   1.41× slower

RingBuffer (to_a) 872.70k (  1.15µs) (± 0.84%)  12.1kB/op        fastest
     Deque (to_a) 146.77k (  6.81µs) (± 0.94%)  12.1kB/op   5.95× slower
```

Why RingBuffer is faster:

Bitwise Modulo Operations

- RingBuffer uses & @Mask for wrap-around (single CPU instruction)
- Deque uses conditional branches (if index >= @capacity)
- Bitwise AND avoids branch misprediction penalties

No Memory Shifting

- RingBuffer: Simply overwrites oldest value when full
- Deque: Calls shift to remove first element, causing memory movement

Simpler Push Logic

- RingBuffer: Direct write to calculated position
- Deque: Check size → shift if needed → then push (3 operations vs 1)

CPU Cache Optimization

- RingBuffer: Predictable circular access pattern
- Deque: Shift operations invalidate cache lines

No Dynamic Memory Management

- RingBuffer: Fixed allocation, never grows
- Deque: Resize checks and potential reallocation overhead
@jage jage requested a review from a team as a code owner October 13, 2025 15:03
@claude
Copy link

claude bot commented Oct 13, 2025

Claude finished @jage's task —— View job


Code Review Complete

Reviewing PR #1353 for bugs, security vulnerabilities, performance issues, missing error handling, and Crystal anti-patterns.

Todo List:

  • Read and analyze RingBuffer implementation
  • Review RingBuffer specs for completeness
  • Check stats.cr changes for correctness
  • Analyze config.cr changes
  • Report findings

Issues Found

1. Potential Integer Overflow (src/lavinmq/ring_buffer.cr:17):

@buffer = GC.malloc_atomic(@capacity * sizeof(T)).as(Pointer(T))

No bounds checking on @capacity * sizeof(T) multiplication. For very large capacities or large types, this could overflow and allocate insufficient memory, leading to buffer overruns.

Recommendation: Add overflow check:

byte_size = @capacity.to_u64 * sizeof(T)
raise ArgumentError.new("Buffer size too large") if byte_size > Int32::MAX
@buffer = GC.malloc_atomic(byte_size).as(Pointer(T))

2. Missing Resource Cleanup (src/lavinmq/ring_buffer.cr:17):
While using GC.malloc_atomic is appropriate for performance, there's no finalizer to ensure memory is cleaned up if the GC doesn't collect the object promptly. This could lead to memory leaks in long-running processes with frequent RingBuffer allocation/deallocation.

Recommendation: Consider adding a finalizer:

def finalize
  GC.free(@buffer) if @buffer
end

Note: The to_a method logic is actually correct - the test cases verify that the edge case where tail == @head when full is properly handled by the two-segment copy path.


Copy link
Member

@carlhoerberg carlhoerberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Any reason the old Stat is kept?

STDERR.puts "WARNING: RingBuffer capacity #{capacity} rounded up to #{@capacity} (must be power of 2)"
end
@mask = @capacity - 1
@buffer = Pointer(T).malloc(@capacity)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could maybe use GC.malloc_atomic instead (if T is limited to Value), for better GC performance (the memory isn't cleared and doesn't have to be scanned for pointers), but i guess it's a theoretical improvement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah that sounds interesting here.

Initial micro benchmarks show it's ~20% faster when we allocate a lot of buffers, so should help in a high churn environment. For channel churn it might be a measurable improvements, but yeah for connection/exchanges/queues I guess it's more theoretical.

If there's a huge amount of objects I guess the GC scan speedup is nice as well (haven't benchmarked).

Not sure of there's any downsides? Memory won't be cleared but that's implicitly already assumed by the design, so should be fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jage
Copy link
Member Author

jage commented Oct 14, 2025

Nice! Any reason the old Stat is kept?

Nah just to nice to have the "original" left during development, but I'll add some cleanup commits and then this PR should be squash merged.

jage added 2 commits October 14, 2025 06:27
Memory isn't cleared and doesn't have to be scanned for pointers.

Micro benchmarks show this being ~20% faster in a high churn
environment.
@jage
Copy link
Member Author

jage commented Oct 14, 2025

For version two of this we can re-architecture to use a "ring buffer group", e.g. we let a group of several ring buffers share the same head, size, mask etc, it would just be @buffer that's uniq to each stat. Since we update all values at the same time, this should save a few bytes and maybe be faster if we can get the CPU to do more at the same time.

This is relevant for "large" brokers, e.g. 100k exchanges would have 400k ring buffers (publish_in, publish_out, unroutable, dedup).

  RingBufferGroup (capacity=8) with 3 buffers:

  ┌──────────────────────────────────────────────────┐
  │ RingBufferGroup                                  │
  │   @head = 2                                      │
  │   @size = 6                                      │
  │   @mask = 7                                      │
  │   @capacity = 8                                  │
  └──────────────────────────────────────────────────┘
           │
           │ Shared State
           ├───────────┬───────────┬───────────┐
           ↓           ↓           ↓           ↓

  RingBufferView(Float64) - rates:
  ┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐
  │ 5.2  │ 6.1  │ 1.5  │ 2.3  │ 3.7  │ 4.9  │ 5.2  │ 6.1  │
  └──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┘
    0      1      2      3      4      5      6      7
                  ↑                                  ↑
                head=2                           tail=0 ((2+6)&7=0)

  RingBufferView(UInt32) - counts:
  ┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐
  │ 400  │ 500  │ 100  │ 200  │ 300  │ 400  │ 500  │ 600  │
  └──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┘
    0      1      2      3      4      5      6      7
                  ↑                                  ↑
                head=2                           tail=0

  RingBufferView(Int64) - timestamps:
  ┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐
  │ 1008 │ 1009 │ 1003 │ 1004 │ 1005 │ 1006 │ 1007 │ 1008 │
  └──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┘
    0      1      2      3      4      5      6      7
                  ↑                                  ↑
                head=2                           tail=0

@{{name.id}}_log = RingBuffer(UInt32).new(Config.instance.stats_log_size)
def {{name.id}}_log
@{{name.id}}_log.to_a
end
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure about this, if we really need to do this, should be handled at compile time if some code don't handle Ringbuffer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, because of add_logs! - should rewrite that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just need to add #next to RingBuffer to make it an Iterator(T) I think. Probably nice.

{{name.id}}_details: {
rate: @{{name.id}}_rate,
log: @{{name.id}}_log
log: @{{name.id}}_log.to_a
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

@jage
Copy link
Member Author

jage commented Oct 14, 2025

Need to take another round on this, missed how add_logs! works.

@spuun
Copy link
Member

spuun commented Oct 14, 2025

For version two of this we can re-architecture to use a "ring buffer group", e.g. we let a group of several ring buffers share the same head, size, mask etc, it would just be @buffer that's uniq to each stat. Since we update all values at the same time, this should save a few bytes and maybe be faster if we can get the CPU to do more at the same time.

And maybe it's worth making it a lock-free MT safe structure.

@carlhoerberg
Copy link
Member

Or, if we really want to support massive amount of entities, i think we should do away with the stat logs completely, only have counters and let the clients poll counters and calulate their own rates, a la. prometheus.

The UI could poll the counters in the background and store them in the IndexedDB. The difficulty is of course how do with history of one queue among 100k. But maybe we can start naively and collect all counters from all entities and see how it performs..

@jage jage marked this pull request as draft November 24, 2025 14:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants