Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 223 additions & 0 deletions spec/ring_buffer_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
require "./spec_helper"

describe LavinMQ::RingBuffer do
describe "initialization" do
it "rounds capacity up to power of 2" do
rb = LavinMQ::RingBuffer(Int32).new(5)
rb.size.should eq 0
8.times { |i| rb.push(i) }
rb.size.should eq 8
end

it "handles power of 2 capacity" do
rb = LavinMQ::RingBuffer(Int32).new(8)
8.times { |i| rb.push(i) }
rb.size.should eq 8
end

it "handles minimum capacity of 2" do
rb = LavinMQ::RingBuffer(Int32).new(2)
rb.push(10)
rb.push(20)
rb.size.should eq 2
rb[0].should eq 10
rb[1].should eq 20
end

it "raises ArgumentError for capacity less than 2" do
expect_raises(ArgumentError, "Capacity must be at least 2") do
LavinMQ::RingBuffer(Int32).new(1)
end

expect_raises(ArgumentError, "Capacity must be at least 2") do
LavinMQ::RingBuffer(Int32).new(0)
end
end
end

describe "push and indexing" do
it "pushes values when not full" do
rb = LavinMQ::RingBuffer(Int32).new(4)
rb.push(1)
rb.push(2)
rb.push(3)

rb.size.should eq 3
rb[0].should eq 1
rb[1].should eq 2
rb[2].should eq 3
end

it "maintains size when at capacity" do
rb = LavinMQ::RingBuffer(Int32).new(4)
5.times { |i| rb.push(i) }

rb.size.should eq 4
end

it "overwrites oldest values when full" do
rb = LavinMQ::RingBuffer(Int32).new(4)
rb.push(0)
rb.push(1)
rb.push(2)
rb.push(3)
rb.push(4) # Overwrites 0

rb.size.should eq 4
rb[0].should eq 1
rb[1].should eq 2
rb[2].should eq 3
rb[3].should eq 4
end
end

describe "wrapping behavior" do
it "wraps correctly when buffer is full" do
rb = LavinMQ::RingBuffer(Int32).new(4)
8.times { |i| rb.push(i) }

rb.size.should eq 4
rb[0].should eq 4
rb[1].should eq 5
rb[2].should eq 6
rb[3].should eq 7
end

it "wraps multiple times" do
rb = LavinMQ::RingBuffer(Int32).new(4)
20.times { |i| rb.push(i) }

rb.size.should eq 4
rb[0].should eq 16
rb[1].should eq 17
rb[2].should eq 18
rb[3].should eq 19
end

it "handles partial wrap correctly" do
rb = LavinMQ::RingBuffer(Int32).new(4)
5.times { |i| rb.push(i) }

rb.size.should eq 4
rb[0].should eq 1
rb[1].should eq 2
rb[2].should eq 3
rb[3].should eq 4
end
end

describe "index access errors" do
it "raises IndexError for negative index" do
rb = LavinMQ::RingBuffer(Int32).new(4)
rb.push(1)

expect_raises(IndexError) do
rb[-1]
end
end

it "raises IndexError for index >= size" do
rb = LavinMQ::RingBuffer(Int32).new(4)
rb.push(1)
rb.push(2)

expect_raises(IndexError) do
rb[2]
end
end

it "raises IndexError on empty buffer" do
rb = LavinMQ::RingBuffer(Int32).new(4)

expect_raises(IndexError) do
rb[0]
end
end
end

describe "to_a" do
it "returns empty array when empty" do
rb = LavinMQ::RingBuffer(Int32).new(4)
rb.to_a.should eq [] of Int32
end

it "returns all elements before wrapping" do
rb = LavinMQ::RingBuffer(Int32).new(4)
rb.push(1)
rb.push(2)
rb.push(3)

rb.to_a.should eq [1, 2, 3]
end

it "returns elements in correct order at capacity" do
rb = LavinMQ::RingBuffer(Int32).new(4)
rb.push(0)
rb.push(1)
rb.push(2)
rb.push(3)

rb.to_a.should eq [0, 1, 2, 3]
end

it "returns elements in correct order after wrapping" do
rb = LavinMQ::RingBuffer(Int32).new(4)
5.times { |i| rb.push(i) }

rb.to_a.should eq [1, 2, 3, 4]
end

it "returns elements in correct order after multiple wraps" do
rb = LavinMQ::RingBuffer(Int32).new(4)
10.times { |i| rb.push(i) }

rb.to_a.should eq [6, 7, 8, 9]
end

it "handles edge case: full buffer where tail equals head" do
# This tests the case where @size == @capacity and tail == @head
# After exactly @capacity pushes, tail wraps around to equal @head
rb = LavinMQ::RingBuffer(Int32).new(4)
4.times { |i| rb.push(i) }

# At this point: @size == 4, @head == 0, tail == 0
# Condition: @size < @capacity (false) || tail > @head (false)
# Should use two-segment copy path
rb.to_a.should eq [0, 1, 2, 3]
rb.size.should eq 4
end

it "handles edge case: full buffer after exact capacity wraps" do
# Fill buffer, then add exactly capacity more to wrap completely
rb = LavinMQ::RingBuffer(Int32).new(4)
8.times { |i| rb.push(i) }

# @head == 0, tail == 0, @size == 4 (wrapped exactly once)
rb.to_a.should eq [4, 5, 6, 7]
rb.size.should eq 4
end
end

describe "different types" do
it "works with strings" do
rb = LavinMQ::RingBuffer(String).new(3)
rb.push("a")
rb.push("b")
rb.push("c")
rb.push("d")
rb.push("e")

rb.to_a.should eq ["b", "c", "d", "e"]
end

it "works with custom structs" do
rb = LavinMQ::RingBuffer(NamedTuple(id: Int32, name: String)).new(2)
rb.push({id: 1, name: "first"})
rb.push({id: 2, name: "second"})
rb.push({id: 3, name: "third"})

rb[0].should eq({id: 2, name: "second"})
rb[1].should eq({id: 3, name: "third"})
end
end
end
2 changes: 1 addition & 1 deletion src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ module LavinMQ
property frame_max = 131_072_u32 # bytes
property channel_max = 2048_u16 # number
property stats_interval = 5000 # millisecond
property stats_log_size = 120 # 10 mins at 5s interval
property stats_log_size = 128 # 10 mins at 5s interval (power of 2)
property? set_timestamp = false # in message headers when receive
property socket_buffer_size = 16384 # bytes
property? tcp_nodelay = false # bool
Expand Down
59 changes: 59 additions & 0 deletions src/lavinmq/ring_buffer.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
module LavinMQ
class RingBuffer(T)
@buffer : Pointer(T)
@head = 0
@size = 0
@mask : Int32
@capacity : Int32

def initialize(capacity : Int32)
raise ArgumentError.new("Capacity must be at least 2") if capacity < 2
# Round up to next power of 2 for fast modulo via bitwise AND
@capacity = Math.pw2ceil(capacity)
if capacity != @capacity
STDERR.puts "WARNING: RingBuffer capacity #{capacity} rounded up to #{@capacity} (must be power of 2)"
end
@mask = @capacity - 1
@buffer = GC.malloc_atomic(@capacity * sizeof(T)).as(Pointer(T))
end

def push(value : T) : Nil
tail = (@head + @size) & @mask
@buffer[tail] = value

if @size < @capacity
@size += 1
else
@head = (@head + 1) & @mask
end
end

def size : Int32
@size
end

def [](index : Int32) : T
raise IndexError.new if index >= @size || index < 0
actual_index = (@head + index) & @mask
@buffer[actual_index]
end

def to_a : Array(T)
return Array(T).new if @size == 0

result = Array(T).new(@size)
tail = (@head + @size) & @mask

if @size < @capacity || tail > @head
# Not full yet, or full but not wrapped: copy single segment
result.concat((@buffer + @head).to_slice(@size))
else
# Full and wrapped: copy two segments
first_chunk_size = @capacity - @head
result.concat((@buffer + @head).to_slice(first_chunk_size))
result.concat(@buffer.to_slice(tail))
end
result
end
end
end
18 changes: 7 additions & 11 deletions src/lavinmq/stats.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./config"
require "./ring_buffer"

module LavinMQ
module Stats
Expand All @@ -7,14 +8,16 @@ module LavinMQ
@{{name.id}}_count = Atomic(UInt64).new(0_u64)
@{{name.id}}_count_prev = 0_u64
@{{name.id}}_rate = 0_f64
@{{name.id}}_log = Deque(Float64).new(Config.instance.stats_log_size)
@{{name.id}}_log = RingBuffer(Float64).new(Config.instance.stats_log_size)
def {{name.id}}_count
@{{name.id}}_count.get(:relaxed)
end
{% end %}
{% for name in log_keys %}
@{{name.id}}_log = Deque(UInt32).new(Config.instance.stats_log_size)
getter {{name.id}}_log
@{{name.id}}_log = RingBuffer(UInt32).new(Config.instance.stats_log_size)
def {{name.id}}_log
@{{name.id}}_log.to_a
end
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure about this, if we really need to do this, should be handled at compile time if some code don't handle Ringbuffer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, because of add_logs! - should rewrite that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just need to add #next to RingBuffer to make it an Iterator(T) I think. Probably nice.

{% end %}

def stats_details
Expand All @@ -23,7 +26,7 @@ module LavinMQ
{{name.id}}: {{name.id}}_count,
{{name.id}}_details: {
rate: @{{name.id}}_rate,
log: @{{name.id}}_log
log: @{{name.id}}_log.to_a
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

},
{% end %}
}
Expand All @@ -41,20 +44,13 @@ module LavinMQ

def update_rates : Nil
interval = Config.instance.stats_interval // 1000
log_size = Config.instance.stats_log_size
{% for name in stats_keys %}
until @{{name.id}}_log.size < log_size
@{{name.id}}_log.shift
end
{{name.id}}_count = @{{name.id}}_count.get(:relaxed)
@{{name.id}}_rate = (({{name.id}}_count - @{{name.id}}_count_prev) / interval).round(1)
@{{name.id}}_log.push @{{name.id}}_rate
@{{name.id}}_count_prev = {{name.id}}_count
{% end %}
{% for name in log_keys %}
until @{{name.id}}_log.size < log_size
@{{name.id}}_log.shift
end
@{{name.id}}_log.push {{name.id}}
{% end %}
end
Expand Down
Loading