diff --git a/spec/ring_buffer_spec.cr b/spec/ring_buffer_spec.cr new file mode 100644 index 0000000000..7047857999 --- /dev/null +++ b/spec/ring_buffer_spec.cr @@ -0,0 +1,223 @@ +require "./spec_helper" + +describe LavinMQ::RingBuffer do + describe "initialization" do + it "rounds capacity up to power of 2" do + rb = LavinMQ::RingBuffer(Int32).new(5) + rb.size.should eq 0 + 8.times { |i| rb.push(i) } + rb.size.should eq 8 + end + + it "handles power of 2 capacity" do + rb = LavinMQ::RingBuffer(Int32).new(8) + 8.times { |i| rb.push(i) } + rb.size.should eq 8 + end + + it "handles minimum capacity of 2" do + rb = LavinMQ::RingBuffer(Int32).new(2) + rb.push(10) + rb.push(20) + rb.size.should eq 2 + rb[0].should eq 10 + rb[1].should eq 20 + end + + it "raises ArgumentError for capacity less than 2" do + expect_raises(ArgumentError, "Capacity must be at least 2") do + LavinMQ::RingBuffer(Int32).new(1) + end + + expect_raises(ArgumentError, "Capacity must be at least 2") do + LavinMQ::RingBuffer(Int32).new(0) + end + end + end + + describe "push and indexing" do + it "pushes values when not full" do + rb = LavinMQ::RingBuffer(Int32).new(4) + rb.push(1) + rb.push(2) + rb.push(3) + + rb.size.should eq 3 + rb[0].should eq 1 + rb[1].should eq 2 + rb[2].should eq 3 + end + + it "maintains size when at capacity" do + rb = LavinMQ::RingBuffer(Int32).new(4) + 5.times { |i| rb.push(i) } + + rb.size.should eq 4 + end + + it "overwrites oldest values when full" do + rb = LavinMQ::RingBuffer(Int32).new(4) + rb.push(0) + rb.push(1) + rb.push(2) + rb.push(3) + rb.push(4) # Overwrites 0 + + rb.size.should eq 4 + rb[0].should eq 1 + rb[1].should eq 2 + rb[2].should eq 3 + rb[3].should eq 4 + end + end + + describe "wrapping behavior" do + it "wraps correctly when buffer is full" do + rb = LavinMQ::RingBuffer(Int32).new(4) + 8.times { |i| rb.push(i) } + + rb.size.should eq 4 + rb[0].should eq 4 + rb[1].should eq 5 + rb[2].should eq 6 + rb[3].should eq 7 + end + + it "wraps multiple times" do + rb = LavinMQ::RingBuffer(Int32).new(4) + 20.times { |i| rb.push(i) } + + rb.size.should eq 4 + rb[0].should eq 16 + rb[1].should eq 17 + rb[2].should eq 18 + rb[3].should eq 19 + end + + it "handles partial wrap correctly" do + rb = LavinMQ::RingBuffer(Int32).new(4) + 5.times { |i| rb.push(i) } + + rb.size.should eq 4 + rb[0].should eq 1 + rb[1].should eq 2 + rb[2].should eq 3 + rb[3].should eq 4 + end + end + + describe "index access errors" do + it "raises IndexError for negative index" do + rb = LavinMQ::RingBuffer(Int32).new(4) + rb.push(1) + + expect_raises(IndexError) do + rb[-1] + end + end + + it "raises IndexError for index >= size" do + rb = LavinMQ::RingBuffer(Int32).new(4) + rb.push(1) + rb.push(2) + + expect_raises(IndexError) do + rb[2] + end + end + + it "raises IndexError on empty buffer" do + rb = LavinMQ::RingBuffer(Int32).new(4) + + expect_raises(IndexError) do + rb[0] + end + end + end + + describe "to_a" do + it "returns empty array when empty" do + rb = LavinMQ::RingBuffer(Int32).new(4) + rb.to_a.should eq [] of Int32 + end + + it "returns all elements before wrapping" do + rb = LavinMQ::RingBuffer(Int32).new(4) + rb.push(1) + rb.push(2) + rb.push(3) + + rb.to_a.should eq [1, 2, 3] + end + + it "returns elements in correct order at capacity" do + rb = LavinMQ::RingBuffer(Int32).new(4) + rb.push(0) + rb.push(1) + rb.push(2) + rb.push(3) + + rb.to_a.should eq [0, 1, 2, 3] + end + + it "returns elements in correct order after wrapping" do + rb = LavinMQ::RingBuffer(Int32).new(4) + 5.times { |i| rb.push(i) } + + rb.to_a.should eq [1, 2, 3, 4] + end + + it "returns elements in correct order after multiple wraps" do + rb = LavinMQ::RingBuffer(Int32).new(4) + 10.times { |i| rb.push(i) } + + rb.to_a.should eq [6, 7, 8, 9] + end + + it "handles edge case: full buffer where tail equals head" do + # This tests the case where @size == @capacity and tail == @head + # After exactly @capacity pushes, tail wraps around to equal @head + rb = LavinMQ::RingBuffer(Int32).new(4) + 4.times { |i| rb.push(i) } + + # At this point: @size == 4, @head == 0, tail == 0 + # Condition: @size < @capacity (false) || tail > @head (false) + # Should use two-segment copy path + rb.to_a.should eq [0, 1, 2, 3] + rb.size.should eq 4 + end + + it "handles edge case: full buffer after exact capacity wraps" do + # Fill buffer, then add exactly capacity more to wrap completely + rb = LavinMQ::RingBuffer(Int32).new(4) + 8.times { |i| rb.push(i) } + + # @head == 0, tail == 0, @size == 4 (wrapped exactly once) + rb.to_a.should eq [4, 5, 6, 7] + rb.size.should eq 4 + end + end + + describe "different types" do + it "works with strings" do + rb = LavinMQ::RingBuffer(String).new(3) + rb.push("a") + rb.push("b") + rb.push("c") + rb.push("d") + rb.push("e") + + rb.to_a.should eq ["b", "c", "d", "e"] + end + + it "works with custom structs" do + rb = LavinMQ::RingBuffer(NamedTuple(id: Int32, name: String)).new(2) + rb.push({id: 1, name: "first"}) + rb.push({id: 2, name: "second"}) + rb.push({id: 3, name: "third"}) + + rb[0].should eq({id: 2, name: "second"}) + rb[1].should eq({id: 3, name: "third"}) + end + end +end diff --git a/src/lavinmq/config.cr b/src/lavinmq/config.cr index 32e466acf8..e0e1088042 100644 --- a/src/lavinmq/config.cr +++ b/src/lavinmq/config.cr @@ -42,7 +42,7 @@ module LavinMQ property frame_max = 131_072_u32 # bytes property channel_max = 2048_u16 # number property stats_interval = 5000 # millisecond - property stats_log_size = 120 # 10 mins at 5s interval + property stats_log_size = 128 # 10 mins at 5s interval (power of 2) property? set_timestamp = false # in message headers when receive property socket_buffer_size = 16384 # bytes property? tcp_nodelay = false # bool diff --git a/src/lavinmq/ring_buffer.cr b/src/lavinmq/ring_buffer.cr new file mode 100644 index 0000000000..7db9087dec --- /dev/null +++ b/src/lavinmq/ring_buffer.cr @@ -0,0 +1,59 @@ +module LavinMQ + class RingBuffer(T) + @buffer : Pointer(T) + @head = 0 + @size = 0 + @mask : Int32 + @capacity : Int32 + + def initialize(capacity : Int32) + raise ArgumentError.new("Capacity must be at least 2") if capacity < 2 + # Round up to next power of 2 for fast modulo via bitwise AND + @capacity = Math.pw2ceil(capacity) + if capacity != @capacity + STDERR.puts "WARNING: RingBuffer capacity #{capacity} rounded up to #{@capacity} (must be power of 2)" + end + @mask = @capacity - 1 + @buffer = GC.malloc_atomic(@capacity * sizeof(T)).as(Pointer(T)) + end + + def push(value : T) : Nil + tail = (@head + @size) & @mask + @buffer[tail] = value + + if @size < @capacity + @size += 1 + else + @head = (@head + 1) & @mask + end + end + + def size : Int32 + @size + end + + def [](index : Int32) : T + raise IndexError.new if index >= @size || index < 0 + actual_index = (@head + index) & @mask + @buffer[actual_index] + end + + def to_a : Array(T) + return Array(T).new if @size == 0 + + result = Array(T).new(@size) + tail = (@head + @size) & @mask + + if @size < @capacity || tail > @head + # Not full yet, or full but not wrapped: copy single segment + result.concat((@buffer + @head).to_slice(@size)) + else + # Full and wrapped: copy two segments + first_chunk_size = @capacity - @head + result.concat((@buffer + @head).to_slice(first_chunk_size)) + result.concat(@buffer.to_slice(tail)) + end + result + end + end +end diff --git a/src/lavinmq/stats.cr b/src/lavinmq/stats.cr index 04c56899bc..d502a485a3 100644 --- a/src/lavinmq/stats.cr +++ b/src/lavinmq/stats.cr @@ -1,4 +1,5 @@ require "./config" +require "./ring_buffer" module LavinMQ module Stats @@ -7,14 +8,16 @@ module LavinMQ @{{name.id}}_count = Atomic(UInt64).new(0_u64) @{{name.id}}_count_prev = 0_u64 @{{name.id}}_rate = 0_f64 - @{{name.id}}_log = Deque(Float64).new(Config.instance.stats_log_size) + @{{name.id}}_log = RingBuffer(Float64).new(Config.instance.stats_log_size) def {{name.id}}_count @{{name.id}}_count.get(:relaxed) end {% end %} {% for name in log_keys %} - @{{name.id}}_log = Deque(UInt32).new(Config.instance.stats_log_size) - getter {{name.id}}_log + @{{name.id}}_log = RingBuffer(UInt32).new(Config.instance.stats_log_size) + def {{name.id}}_log + @{{name.id}}_log.to_a + end {% end %} def stats_details @@ -23,7 +26,7 @@ module LavinMQ {{name.id}}: {{name.id}}_count, {{name.id}}_details: { rate: @{{name.id}}_rate, - log: @{{name.id}}_log + log: @{{name.id}}_log.to_a }, {% end %} } @@ -41,20 +44,13 @@ module LavinMQ def update_rates : Nil interval = Config.instance.stats_interval // 1000 - log_size = Config.instance.stats_log_size {% for name in stats_keys %} - until @{{name.id}}_log.size < log_size - @{{name.id}}_log.shift - end {{name.id}}_count = @{{name.id}}_count.get(:relaxed) @{{name.id}}_rate = (({{name.id}}_count - @{{name.id}}_count_prev) / interval).round(1) @{{name.id}}_log.push @{{name.id}}_rate @{{name.id}}_count_prev = {{name.id}}_count {% end %} {% for name in log_keys %} - until @{{name.id}}_log.size < log_size - @{{name.id}}_log.shift - end @{{name.id}}_log.push {{name.id}} {% end %} end