Skip to content

Commit 10ddb6a

Browse files
committed
osiris_log: Add reader config option for read-ahead limit
Previously the `READ_AHEAD_LIMIT` was a constant fixed at 4096. For some usage patterns setting the limit higher helps consumer throughput when the chunk size is medium/small - specifically streams published at medium throughput being consumed over a TLS connection.
1 parent 808f1f2 commit 10ddb6a

File tree

2 files changed

+29
-25
lines changed

2 files changed

+29
-25
lines changed

src/osiris.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@
8383
-type reader_options() :: #{transport => tcp | ssl,
8484
chunk_selector => all | user_data,
8585
filter_spec => osiris_bloom:filter_spec(),
86-
read_ahead => boolean()
86+
read_ahead => boolean(),
87+
read_ahead_limit => pos_integer()
8788
}.
8889

8990
-export_type([name/0,

src/osiris_log.erl

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
_/binary>>).
108108

109109
-define(SKIP_SEARCH_JUMP, 2048).
110-
-define(READ_AHEAD_LIMIT, 4096).
110+
-define(DEFAULT_READ_AHEAD_LIMIT, 4096).
111111

112112
%% Specification of the Log format.
113113
%%
@@ -425,7 +425,8 @@
425425
-record(ra,
426426
{on = true :: boolean(),
427427
size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(),
428-
buf :: undefined | {Pos :: non_neg_integer(), binary()}
428+
buf :: undefined | {Pos :: non_neg_integer(), binary()},
429+
limit = ?DEFAULT_READ_AHEAD_LIMIT :: pos_integer()
429430
}).
430431
-record(read,
431432
{type :: data | offset,
@@ -1059,7 +1060,6 @@ init_data_reader_at(ChunkId, FilePos, File,
10591060
readers_counter_fun := CountersFun} = Config) ->
10601061
case file:open(File, [raw, binary, read]) of
10611062
{ok, Fd} ->
1062-
RaOn = ra_on(Config),
10631063
Cnt = make_counter(Config),
10641064
counters:put(Cnt, ?C_OFFSET, ChunkId - 1),
10651065
CountersFun(1),
@@ -1078,7 +1078,7 @@ init_data_reader_at(ChunkId, FilePos, File,
10781078
chunk_selector = all,
10791079
position = FilePos,
10801080
transport = maps:get(transport, Config, tcp),
1081-
read_ahead = #ra{on = RaOn}},
1081+
read_ahead = ra(Config)},
10821082
fd = Fd}};
10831083
Err ->
10841084
Err
@@ -1288,7 +1288,6 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
12881288
_ ->
12891289
undefined
12901290
end,
1291-
RaOn = ra_on(Conf),
12921291
{ok, #?MODULE{cfg = #cfg{directory = Dir,
12931292
counter = Cnt,
12941293
counter_id = counter_id(Conf),
@@ -1303,7 +1302,7 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
13031302
next_offset = NextChunkId,
13041303
transport = maps:get(transport, Options, tcp),
13051304
filter = FilterMatcher,
1306-
read_ahead = #ra{on = RaOn}},
1305+
read_ahead = ra(Conf)},
13071306
fd = Fd}}.
13081307

13091308
%% Searches the index files backwards for the ID of the last user chunk.
@@ -3312,7 +3311,8 @@ iter_guess_size(Credit0, NumEntries, DataSize) ->
33123311
Credit = min(Credit0, NumEntries),
33133312
(DataSize div NumEntries * Credit).
33143313

3315-
iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries, Ra0)
3314+
iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries,
3315+
#ra{limit = ReadAheadLimit} = Ra0)
33163316
when is_integer(Credit0) andalso
33173317
MinReqSize =< DataSize ->
33183318
%% if the minimum request size can be served from read ahead then we
@@ -3335,7 +3335,7 @@ iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries, Ra0)
33353335
%% needed to serve that, else we read up to the readahead
33363336
%% limit but not beyond the end of the chunk and not less
33373337
%% that the minimum request size
3338-
MinSize = max(MinReqSize, min(?READ_AHEAD_LIMIT, DataSize)),
3338+
MinSize = max(MinReqSize, min(ReadAheadLimit, DataSize)),
33393339
Size = max(MinSize, iter_guess_size(Credit0, NumEntries,
33403340
DataSize)),
33413341
{ok, Data} = file:pread(Fd, Pos, Size),
@@ -3364,15 +3364,15 @@ ra_read(_Pos, _Len, _Ra) ->
33643364
undefined.
33653365

33663366
ra_update_size(undefined, FilterSize, LastDataSize,
3367-
#ra{on = true, size = Sz} = Ra)
3368-
when Sz < ?READ_AHEAD_LIMIT andalso
3369-
LastDataSize =< (?READ_AHEAD_LIMIT - ?HEADER_SIZE_B -
3367+
#ra{on = true, size = Sz, limit = Limit} = Ra)
3368+
when Sz < Limit andalso
3369+
LastDataSize =< (Limit - ?HEADER_SIZE_B -
33703370
FilterSize - ?REC_HDR_SZ_SUBBATCH_B) ->
33713371
%% no filter and last data size was small so enable data read ahead
3372-
Ra#ra{size = ?READ_AHEAD_LIMIT};
3372+
Ra#ra{size = Limit};
33733373
ra_update_size(undefined, FilterSize, LastDataSize,
3374-
#ra{on = true, size = ?READ_AHEAD_LIMIT} = Ra)
3375-
when LastDataSize =< (?READ_AHEAD_LIMIT - ?HEADER_SIZE_B -
3374+
#ra{on = true, size = Limit, limit = Limit} = Ra)
3375+
when LastDataSize =< (Limit - ?HEADER_SIZE_B -
33763376
FilterSize - ?REC_HDR_SZ_SUBBATCH_B) ->
33773377
Ra;
33783378
ra_update_size(_Filter, FilterSize, _LastDataSize, #ra{size = Sz} = Ra) ->
@@ -3397,10 +3397,13 @@ ra_fill(Fd, Pos, #ra{size = Sz} = Ra) ->
33973397
Err
33983398
end.
33993399

3400-
ra_on(#{options := #{read_ahead := false}}) ->
3401-
false;
3402-
ra_on(_) ->
3403-
true.
3400+
-spec ra(config()) -> #ra{}.
3401+
ra(#{options := #{read_ahead := false}}) ->
3402+
#ra{on = false};
3403+
ra(#{options := #{read_ahead_limit := Limit}}) when is_integer(Limit) ->
3404+
#ra{limit = Limit};
3405+
ra(_) ->
3406+
#ra{on = true}.
34043407

34053408
generate_log(Msg, MsgsPerChunk, NumMessages, Directory) ->
34063409
Name = filename:basename(Directory),
@@ -3441,28 +3444,28 @@ ra_update_size_test() ->
34413444
DefSize = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE,
34423445
?assertMatch(#ra{size = DefSize}, #ra{}),
34433446
Ra0 = #ra{},
3444-
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3447+
?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT},
34453448
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra0)),
34463449

3447-
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3450+
?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT},
34483451
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra0)),
3449-
Ra1 = #ra{size = ?READ_AHEAD_LIMIT},
3452+
Ra1 = #ra{size = ?DEFAULT_READ_AHEAD_LIMIT},
34503453
?assertMatch(#ra{size = DefSize},
34513454
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 5000, Ra1)),
34523455

3453-
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3456+
?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT},
34543457
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra1)),
34553458

34563459
?assertMatch(#ra{size = DefSize},
34573460
ra_update_size("a filter", ?DEFAULT_FILTER_SIZE, 100, Ra0)),
34583461

34593462
%% we need to ensure that if we enable read ahead we can at least fulfil
34603463
%% the prior chunk including header filter and record length header
3461-
MaxEnablingDataSize = ?READ_AHEAD_LIMIT - ?HEADER_SIZE_B - ?DEFAULT_FILTER_SIZE - ?REC_HDR_SZ_SUBBATCH_B,
3464+
MaxEnablingDataSize = ?DEFAULT_READ_AHEAD_LIMIT - ?HEADER_SIZE_B - ?DEFAULT_FILTER_SIZE - ?REC_HDR_SZ_SUBBATCH_B,
34623465
?assertMatch(#ra{size = DefSize},
34633466
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, MaxEnablingDataSize + 1 ,
34643467
Ra0)),
3465-
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3468+
?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT},
34663469
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, MaxEnablingDataSize,
34673470
Ra0)),
34683471
ok.

0 commit comments

Comments
 (0)