Skip to content

Commit f81f22e

Browse files
authored
Merge pull request #119 from membraneframework/fix-waiting-bug
Handle case when ex_hls has to add some latency to Live HLS stream
2 parents a1c43f6 + 3491cc7 commit f81f22e

File tree

6 files changed

+68
-32
lines changed

6 files changed

+68
-32
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ In future, the support for MPEG-DASH is planned as well
1313
Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`.
1414

1515
```elixir
16-
{:membrane_http_adaptive_stream_plugin, "~> 0.20.0"}
16+
{:membrane_http_adaptive_stream_plugin, "~> 0.20.1"}
1717
```
1818

1919
## Usage Example

lib/membrane_http_adaptive_stream/source.ex

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,12 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
120120
state =
121121
Map.from_struct(opts)
122122
|> Map.merge(%{
123-
# status will be either :initialized, :waiting_on_pads or :streaming
123+
# status will be either:
124+
# - :initialized
125+
# - :waiting_on_client_genserver_ready
126+
# - :client_genserver_ready
127+
# - :waiting_on_pads
128+
# - :streaming
124129
status: :initialized,
125130
client_genserver: nil,
126131
stream: nil,
@@ -166,20 +171,14 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
166171

167172
@impl true
168173
def handle_playing(ctx, state) do
169-
# both start_streaming/1 and generate_new_tracks_notification/1 functions
170-
# call ClientGenServer.get_tracks_info/1 that triggers downloading first
171-
# segments of the HLS stream
174+
:ok = spawn_client_genserver(ctx, state)
172175

173-
state = create_client_gen_server(ctx, state)
176+
self() |> Process.send_after(:client_genserver_ready_timeout, 60_000)
174177

175-
if Map.values(state.pad_refs) != [nil, nil] do
176-
state |> start_streaming()
177-
else
178-
state |> generate_new_tracks_notification()
179-
end
178+
{[], %{state | status: :waiting_on_client_genserver_ready}}
180179
end
181180

182-
defp create_client_gen_server(ctx, state) do
181+
defp spawn_client_genserver(ctx, state) do
183182
start_link_arg = %{
184183
url: state.url,
185184
variant_selection_policy: state.variant_selection_policy,
@@ -192,18 +191,10 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
192191
{__MODULE__.ClientGenServer, start_link_arg}
193192
)
194193

195-
client_genserver =
196-
receive do
197-
{:client_genserver, client_genserver} -> client_genserver
198-
after
199-
5_000 ->
200-
raise "Timeout waiting for #{inspect(__MODULE__)}.ClientGenServer initialization"
201-
end
202-
203-
%{state | client_genserver: client_genserver}
194+
:ok
204195
end
205196

206-
defp generate_new_tracks_notification(%{status: :initialized} = state) do
197+
defp generate_new_tracks_notification(%{status: :client_genserver_ready} = state) do
207198
tracks_info = ClientGenServer.get_tracks_info(state.client_genserver)
208199

209200
new_tracks =
@@ -228,7 +219,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
228219
end
229220

230221
defp start_streaming(%{status: status} = state)
231-
when status in [:initialized, :waiting_on_pads] do
222+
when status in [:client_genserver_ready, :waiting_on_pads] do
232223
actions = get_stream_formats(state) ++ get_redemands(state)
233224
state = %{state | status: :streaming}
234225
{actions, state}
@@ -354,6 +345,42 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
354345
{[], state}
355346
end
356347

348+
@impl true
349+
def handle_demand(pad_ref, demand, :buffers, _ctx, state)
350+
when state.status == :waiting_on_client_genserver_ready do
351+
Membrane.Logger.debug("""
352+
Ignoring demand (#{inspect(demand)} buffers) on pad #{inspect(pad_ref)} because this \
353+
element is still waiting for ExHLS to start downloading multimedia segments.
354+
""")
355+
356+
{[], state}
357+
end
358+
359+
@impl true
360+
def handle_info({:client_genserver_ready, client_genserver}, _ctx, state)
361+
when state.status == :waiting_on_client_genserver_ready do
362+
state = %{
363+
state
364+
| client_genserver: client_genserver,
365+
status: :client_genserver_ready
366+
}
367+
368+
if Map.values(state.pad_refs) != [nil, nil] do
369+
state |> start_streaming()
370+
else
371+
state |> generate_new_tracks_notification()
372+
end
373+
end
374+
375+
@impl true
376+
def handle_info(:client_genserver_ready_timeout, _ctx, state) do
377+
if state.status == :waiting_on_client_genserver_ready do
378+
raise "Timeout while waiting for ExHLS to download first media segments."
379+
end
380+
381+
{[], state}
382+
end
383+
357384
@impl true
358385
def handle_info({:chunk, %ExHLS.Chunk{} = chunk}, _ctx, state) do
359386
buffer =

lib/membrane_http_adaptive_stream/source/client_genserver.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
8383
tracks_info: tracks_info
8484
}
8585

86-
send(state.source, {:client_genserver, self()})
86+
send(state.source, {:client_genserver_ready, self()})
8787

8888
{:noreply, state}
8989
end

mix.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Membrane.HTTPAdaptiveStream.MixProject do
22
use Mix.Project
33

4-
@version "0.20.0"
4+
@version "0.20.1"
55
@github_url "https://github.com/membraneframework/membrane_http_adaptive_stream_plugin"
66

77
def project do
@@ -69,7 +69,7 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do
6969
{:membrane_aac_plugin, "~> 0.19.0"},
7070
{:membrane_h26x_plugin, "~> 0.10.0"},
7171
{:stream_split, "~> 0.1.7"},
72-
{:ex_hls, "~> 0.1.0"},
72+
{:ex_hls, "~> 0.1.2"},
7373
{:bunch, "~> 1.6"},
7474
{:qex, "~> 0.5"},
7575
{:muontrap, "~> 1.6", only: :test},

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
1414
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
1515
"ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"},
16-
"ex_hls": {:hex, :ex_hls, "0.1.0", "618d1b19839bb77b82e9f5a3a0fcd3520643eb0bd894b4a204fb416767ec7fc9", [:mix], [{:ex_m3u8, "~> 0.15.3", [hex: :ex_m3u8, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.2", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.36.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:mpeg_ts, "~> 2.0.0", [hex: :mpeg_ts, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "91a636f66e6f1e08eb518b724199169d77443d612ceb31834cb1d5ee1dc4fe17"},
16+
"ex_hls": {:hex, :ex_hls, "0.1.2", "fca2c2a4ddf8459b9a47bf1fd6552c5d74cccf5dc72f56cf87129111c3e2f8ee", [:mix], [{:ex_m3u8, "~> 0.15.3", [hex: :ex_m3u8, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.2", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.36.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:mpeg_ts, "~> 2.0.0", [hex: :mpeg_ts, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "8129921a863918999cda4032cf66e5e4de9fa24faa33cee28a09b34f13438a49"},
1717
"ex_m3u8": {:hex, :ex_m3u8, "0.15.3", "c10427f450b2ed7bfd85808d8dce21214f1fe9fa18927591cbbf96fea0a6a8aa", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "99f20c0b44bab130dc6aca71fefe0d1a174413ae9ac2763220994b29bd310939"},
1818
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
1919
"finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"},

test/membrane_http_adaptive_stream/integration_test/source_test.exs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,11 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do
159159
describe "Membrane.HTTPAdaptiveStream.Source demuxes audio and video from" do
160160
@tag :live
161161
@tag :tmp_dir
162+
@tag :x
162163
test "Live HLS stream", %{tmp_dir: tmp_dir} do
163164
index_m3u8 = Path.join(tmp_dir, "index.m3u8")
164165
generate_live_hls(@bbb_33s_mp4_url, index_m3u8)
165-
166166
await_until_file_exists(index_m3u8)
167-
Process.sleep(7_000)
168167

169168
audio_result_file = Path.join(tmp_dir, "audio.aac")
170169
video_result_file = Path.join(tmp_dir, "video.h264")
@@ -193,9 +192,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do
193192
test "Live HLS stream played from the middle", %{tmp_dir: tmp_dir} do
194193
index_m3u8 = Path.join(tmp_dir, "index.m3u8")
195194
generate_live_hls(@bbb_33s_mp4_url, index_m3u8)
196-
197-
await_until_file_exists(index_m3u8)
198-
Process.sleep(20_000)
195+
:ok = await_until_media_sequence_is_3(index_m3u8)
199196

200197
spec =
201198
child(:hls_source, %Membrane.HTTPAdaptiveStream.Source{
@@ -254,6 +251,18 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do
254251
end
255252
end
256253

254+
defp await_until_media_sequence_is_3(index_m3u8) do
255+
with {:ok, content} <- File.read(index_m3u8),
256+
true <- String.contains?(content, "#EXT-X-MEDIA-SEQUENCE:3") do
257+
:ok
258+
else
259+
_error ->
260+
Logger.debug("Waiting for media sequence to be 3...")
261+
Process.sleep(100)
262+
await_until_media_sequence_is_3(index_m3u8)
263+
end
264+
end
265+
257266
defp generate_live_hls(source_mp4, index_m3u8) do
258267
start_supervised!(
259268
{

0 commit comments

Comments
 (0)