Skip to content

Commit da7dfdf

Browse files
committed
osiris_log: Read read-ahead limit from application environment
Previously the `READ_AHEAD_LIMIT` was a constant fixed at 4096. For some usage patterns setting the limit higher helps consumer throughput when the chunk size is medium/small - specifically streams published at medium throughput being consumed over a TLS connection.
1 parent 808f1f2 commit da7dfdf

File tree

1 file changed

+19
-15
lines changed

1 file changed

+19
-15
lines changed

src/osiris_log.erl

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@
107107
_/binary>>).
108108

109109
-define(SKIP_SEARCH_JUMP, 2048).
110-
-define(READ_AHEAD_LIMIT, 4096).
111110

112111
%% Specification of the Log format.
113112
%%
@@ -425,7 +424,8 @@
425424
-record(ra,
426425
{on = true :: boolean(),
427426
size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(),
428-
buf :: undefined | {Pos :: non_neg_integer(), binary()}
427+
buf :: undefined | {Pos :: non_neg_integer(), binary()},
428+
limit = read_ahead_limit() :: pos_integer()
429429
}).
430430
-record(read,
431431
{type :: data | offset,
@@ -3335,7 +3335,7 @@ iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries, Ra0)
33353335
%% needed to serve that, else we read up to the readahead
33363336
%% limit but not beyond the end of the chunk and not less
33373337
%% that the minimum request size
3338-
MinSize = max(MinReqSize, min(?READ_AHEAD_LIMIT, DataSize)),
3338+
MinSize = max(MinReqSize, min(read_ahead_limit(), DataSize)),
33393339
Size = max(MinSize, iter_guess_size(Credit0, NumEntries,
33403340
DataSize)),
33413341
{ok, Data} = file:pread(Fd, Pos, Size),
@@ -3364,15 +3364,15 @@ ra_read(_Pos, _Len, _Ra) ->
33643364
undefined.
33653365

33663366
ra_update_size(undefined, FilterSize, LastDataSize,
3367-
#ra{on = true, size = Sz} = Ra)
3368-
when Sz < ?READ_AHEAD_LIMIT andalso
3369-
LastDataSize =< (?READ_AHEAD_LIMIT - ?HEADER_SIZE_B -
3367+
#ra{on = true, size = Sz, limit = Limit} = Ra)
3368+
when Sz < Limit andalso
3369+
LastDataSize =< (Limit - ?HEADER_SIZE_B -
33703370
FilterSize - ?REC_HDR_SZ_SUBBATCH_B) ->
33713371
%% no filter and last data size was small so enable data read ahead
3372-
Ra#ra{size = ?READ_AHEAD_LIMIT};
3372+
Ra#ra{size = Limit};
33733373
ra_update_size(undefined, FilterSize, LastDataSize,
3374-
#ra{on = true, size = ?READ_AHEAD_LIMIT} = Ra)
3375-
when LastDataSize =< (?READ_AHEAD_LIMIT - ?HEADER_SIZE_B -
3374+
#ra{on = true, size = Limit, limit = Limit} = Ra)
3375+
when LastDataSize =< (Limit - ?HEADER_SIZE_B -
33763376
FilterSize - ?REC_HDR_SZ_SUBBATCH_B) ->
33773377
Ra;
33783378
ra_update_size(_Filter, FilterSize, _LastDataSize, #ra{size = Sz} = Ra) ->
@@ -3434,35 +3434,39 @@ write_in_chunks(ToWrite, MsgsPerChunk, Msg, W0) when ToWrite > 0 ->
34343434
write_in_chunks(_, _, _, W) ->
34353435
W.
34363436

3437+
read_ahead_limit() ->
3438+
application:get_env(osiris, read_ahead_limit, 4096).
3439+
34373440
-ifdef(TEST).
34383441
-include_lib("eunit/include/eunit.hrl").
34393442

34403443
ra_update_size_test() ->
3444+
ReadAheadLimit = read_ahead_limit(),
34413445
DefSize = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE,
34423446
?assertMatch(#ra{size = DefSize}, #ra{}),
34433447
Ra0 = #ra{},
3444-
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3448+
?assertMatch(#ra{size = ReadAheadLimit},
34453449
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra0)),
34463450

3447-
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3451+
?assertMatch(#ra{size = ReadAheadLimit},
34483452
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra0)),
3449-
Ra1 = #ra{size = ?READ_AHEAD_LIMIT},
3453+
Ra1 = #ra{size = ReadAheadLimit},
34503454
?assertMatch(#ra{size = DefSize},
34513455
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 5000, Ra1)),
34523456

3453-
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3457+
?assertMatch(#ra{size = ReadAheadLimit},
34543458
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra1)),
34553459

34563460
?assertMatch(#ra{size = DefSize},
34573461
ra_update_size("a filter", ?DEFAULT_FILTER_SIZE, 100, Ra0)),
34583462

34593463
%% we need to ensure that if we enable read ahead we can at least fulfil
34603464
%% the prior chunk including header filter and record length header
3461-
MaxEnablingDataSize = ?READ_AHEAD_LIMIT - ?HEADER_SIZE_B - ?DEFAULT_FILTER_SIZE - ?REC_HDR_SZ_SUBBATCH_B,
3465+
MaxEnablingDataSize = ReadAheadLimit - ?HEADER_SIZE_B - ?DEFAULT_FILTER_SIZE - ?REC_HDR_SZ_SUBBATCH_B,
34623466
?assertMatch(#ra{size = DefSize},
34633467
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, MaxEnablingDataSize + 1 ,
34643468
Ra0)),
3465-
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3469+
?assertMatch(#ra{size = ReadAheadLimit},
34663470
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, MaxEnablingDataSize,
34673471
Ra0)),
34683472
ok.

0 commit comments

Comments
 (0)