Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
af45359
Add MVP of the MPEG-TS ingress in form of a HLS client
varsill Apr 15, 2025
7629402
Add timestamps convertion and ExHLS.Frame struct
varsill Apr 15, 2025
b6faec9
Fix bug in read_frame
varsill Apr 15, 2025
16f3243
remove debug logs
varsill Apr 15, 2025
1f71810
remove debug logs
varsill Apr 15, 2025
784921f
wip
FelonEkonom May 27, 2025
7c36df9
wip
FelonEkonom May 27, 2025
015d7c7
Add hint for myself
FelonEkonom May 27, 2025
5fa6960
Update mp4 plugin dep
FelonEkonom May 28, 2025
1bf482d
mpgts demuxing engine wip
FelonEkonom Jun 16, 2025
ba618f4
wip
FelonEkonom Jun 17, 2025
7fa8c49
Fix test
FelonEkonom Jun 17, 2025
2d51f7e
Improve tests wip
FelonEkonom Jun 18, 2025
b83295b
Add test for fmp4 wip
FelonEkonom Jun 18, 2025
3a22143
wip
FelonEkonom Jun 18, 2025
286a0e5
Fix test
FelonEkonom Jun 25, 2025
3864cb3
Fix test once again
FelonEkonom Jun 25, 2025
f5df8ab
Fix typo
FelonEkonom Jun 25, 2025
ff82810
Refactor test layout
FelonEkonom Jun 25, 2025
e36ac0f
Improve choosing variant
FelonEkonom Jun 25, 2025
cfc76fd
Small test refactor
FelonEkonom Jun 25, 2025
5ec8db1
Refactor project
FelonEkonom Jun 25, 2025
c85a77a
Remove leftovers
FelonEkonom Jun 25, 2025
eaa8bc0
Write todos
FelonEkonom Jun 25, 2025
cab0603
Add failing test
FelonEkonom Jun 25, 2025
e258c63
Change the way wariants are chosen wip
FelonEkonom Jun 27, 2025
c06e4cb
wip
FelonEkonom Jun 27, 2025
b0b4fcb
Change get_variants API, eradicate GenServer
FelonEkonom Jun 30, 2025
a8ed5c4
Replace template in mix.exs with ex_hls
FelonEkonom Jun 30, 2025
e4ecb72
Small refactor
FelonEkonom Jun 30, 2025
49503dd
Update m3u8 dependency
FelonEkonom Jun 30, 2025
9bf42a0
Download segments to get tracks info
FelonEkonom Jul 1, 2025
d725695
Drop unsupported streams
FelonEkonom Jul 2, 2025
f6fdf34
Implement CR wip
FelonEkonom Jul 4, 2025
f467bda
Update mpeg_ts version
FelonEkonom Jul 4, 2025
21b697c
Update lib/ex_hls/client.ex
FelonEkonom Jul 4, 2025
be0372e
CR refactor wip
FelonEkonom Jul 4, 2025
2e43270
Implement CR wip
FelonEkonom Jul 4, 2025
1497091
Debug wip
FelonEkonom Jul 7, 2025
d39caef
Fix bug
FelonEkonom Jul 7, 2025
1c04735
Implementing minor CR comments
FelonEkonom Jul 8, 2025
8de4292
Upgrade ex_m3u8 dep
FelonEkonom Jul 8, 2025
c060d84
Fix bug
FelonEkonom Jul 9, 2025
076b2ad
Rename Sample on Chunk
FelonEkonom Jul 14, 2025
54e9656
Trigger CI
FelonEkonom Jul 15, 2025
3890a94
Update deps
FelonEkonom Jul 15, 2025
bf78d23
unlock unused deps
FelonEkonom Jul 15, 2025
0746339
Merge pull request #2 from membraneframework-labs/plug-demuxing-engin…
FelonEkonom Jul 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
# Membrane Template Plugin
# ExHLS

[![Hex.pm](https://img.shields.io/hexpm/v/membrane_template_plugin.svg)](https://hex.pm/packages/membrane_template_plugin)
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_template_plugin)
[![CircleCI](https://circleci.com/gh/membraneframework/membrane_template_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_template_plugin)
[![Hex.pm](https://img.shields.io/hexpm/v/ex_hls.svg)](https://hex.pm/packages/ex_hls)
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/ex_hls)
[![CircleCI](https://circleci.com/gh/membraneframework/ex_hls.svg?style=svg)](https://circleci.com/gh/membraneframework/ex_hls)

This repository contains a template for new plugins.

Check out different branches for other flavors of this template.
This repository contains ExHLS - an Elixir package for handling HLS streams

It's a part of the [Membrane Framework](https://membrane.stream).

## Installation

The package can be installed by adding `membrane_template_plugin` to your list of dependencies in `mix.exs`:
The package can be installed by adding `ex_hls` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:membrane_template_plugin, "~> 0.1.0"}
{:ex_hls, "~> 0.1.0"}
]
end
```
Expand All @@ -28,8 +26,8 @@ TODO

## Copyright and License

Copyright 2020, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin)
Copyright 2025, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=ex_hls)

[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin)
[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=ex_hls)

Licensed under the [Apache License, Version 2.0](LICENSE)
Binary file added fixture/fileSequence0.m4s
Binary file not shown.
Binary file added fixture/init.mp4
Binary file not shown.
9 changes: 9 additions & 0 deletions fixture/output.m3u8
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#EXTM3U
#EXT-X-VERSION:7
#EXT-X-TARGETDURATION:10
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-MAP:URI="init.mp4"
#EXTINF:10.001628,
fileSequence0.m4s
#EXT-X-ENDLIST
21 changes: 21 additions & 0 deletions lib/ex_hls/chunk.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule ExHLS.Chunk do
@moduledoc """
A struct representing a media chunk in the ExHLS demuxing engine.
"""
@enforce_keys [:payload, :pts_ms, :dts_ms, :track_id]
defstruct @enforce_keys ++ [metadata: %{}]

@type t :: %__MODULE__{
payload: binary(),
pts_ms: integer(),
dts_ms: integer(),
track_id: term(),
metadata: map()
}

# timestamps need to be represented in milliseconds
@time_base 1000

@spec time_base() :: integer()
def time_base(), do: @time_base
end
286 changes: 286 additions & 0 deletions lib/ex_hls/client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
defmodule ExHLS.Client do
@moduledoc """
Module providing functionality to read and demux HLS streams.
It allows reading chunks from the stream, choosing variants, and managing media playlists.
"""

alias ExHLS.DemuxingEngine
alias Membrane.{AAC, H264, RemoteStream}

@opaque client :: map()
@type chunk :: any()

@type variant_description :: %{
id: integer(),
name: String.t() | nil,
frame_rate: number() | nil,
resolution: {integer(), integer()} | nil,
codecs: String.t() | nil,
bandwidth: integer() | nil,
uri: String.t() | nil
}

@doc """
Starts the ExHLS client with the given URL and demuxing engine implementation.

By default, it uses `DemuxingEngine.MPEGTS` as the demuxing engine implementation.
"""

@spec new(String.t()) :: client()
def new(url) do
%{status: 200, body: request_body} = Req.get!(url)
multivariant_playlist = request_body |> ExM3U8.deserialize_multivariant_playlist!([])

%{
media_playlist: nil,
media_base_url: nil,
multivariant_playlist: multivariant_playlist,
base_url: Path.dirname(url),
video_chunks: [],
demuxing_engine_impl: nil,
demuxing_engine: nil,
queues: %{audio: Qex.new(), video: Qex.new()},
timestamp_offsets: %{audio: nil, video: nil},
last_timestamps: %{audio: nil, video: nil}
}
end

defp ensure_media_playlist_loaded(%{media_playlist: nil} = client) do
get_variants(client)
|> Map.to_list()
|> case do
[] ->
read_media_playlist_without_variant(client)

[{variant_id, _variant}] ->
choose_variant(client, variant_id)

_many_variants ->
raise """
If there are available variants, you have to choose one of them using \
`choose_variant/2` function before reading chunks. Available variants:
#{get_variants(client) |> inspect(limit: :infinity, pretty: true)}
"""
end
end

defp ensure_media_playlist_loaded(client), do: client

defp read_media_playlist_without_variant(%{media_playlist: nil} = client) do
media_playlist =
client.base_url
|> Path.join("output.m3u8")
|> Req.get!()

deserialized_media_playlist =
ExM3U8.deserialize_media_playlist!(media_playlist.body, [])

%{
client
| media_playlist: deserialized_media_playlist,
media_base_url: client.base_url
}
end

@spec get_variants(client()) :: %{optional(integer()) => variant_description()}
def get_variants(client) do
client.multivariant_playlist.items
|> Enum.filter(&match?(%ExM3U8.Tags.Stream{}, &1))
|> Enum.with_index(fn variant, index ->
variant_description =
variant
|> Map.take([:name, :frame_rate, :resolution, :codecs, :bandwidth, :uri])
|> Map.put(:id, index)

{index, variant_description}
end)
|> Map.new()
end

@spec choose_variant(client(), String.t()) :: client()
def choose_variant(client, variant_id) do
chosen_variant =
get_variants(client)
|> Map.fetch!(variant_id)

media_playlist = Path.join(client.base_url, chosen_variant.uri) |> Req.get!()

deserialized_media_playlist =
ExM3U8.deserialize_media_playlist!(media_playlist.body, [])

media_base_url = Path.join(client.base_url, Path.dirname(chosen_variant.uri))

%{
client
| media_playlist: deserialized_media_playlist,
media_base_url: media_base_url
}
end

@spec read_video_chunk(client()) :: chunk() | :end_of_stream
def read_video_chunk(client), do: pop_queue_or_do_read_chunk(client, :video)

@spec read_audio_chunk(client()) :: chunk() | :end_of_stream
def read_audio_chunk(client), do: pop_queue_or_do_read_chunk(client, :audio)

defp pop_queue_or_do_read_chunk(client, media_type) do
client.queues[media_type]
|> Qex.pop()
|> case do
{{:value, chunk}, queue} ->
client = client |> put_in([:queues, media_type], queue)
{chunk, client}

{:empty, _queue} ->
do_read_chunk(client, media_type)
end
end

@spec do_read_chunk(client(), :audio | :video) :: {chunk() | :end_of_stream, client()}
defp do_read_chunk(client, media_type) do
client = ensure_media_playlist_loaded(client)

with impl when impl != nil <- client.demuxing_engine_impl,
track_id <- get_track_id!(client, media_type),
{:ok, chunk, demuxing_engine} <- client.demuxing_engine |> impl.pop_chunk(track_id) do
client =
with %{timestamp_offsets: %{^media_type => nil}} <- client do
client |> put_in([:timestamp_offsets, media_type], chunk.dts_ms)
end
|> put_in([:last_timestamps, media_type], chunk.dts_ms)
|> put_in([:demuxing_engine], demuxing_engine)

{chunk, client}
else
other ->
case other do
{:error, _reason, demuxing_engine} -> %{client | demuxing_engine: demuxing_engine}
nil -> client
end
|> download_chunk()
|> case do
{:ok, client} -> do_read_chunk(client, media_type)
{:end_of_stream, client} -> {:end_of_stream, client}
end
end
end

@spec get_tracks_info(client()) ::
{:ok, %{optional(integer()) => struct()}, client()}
| {:error, reason :: any(), client()}
def get_tracks_info(client) do
with impl when impl != nil <- client.demuxing_engine_impl,
{:ok, tracks_info} <- client.demuxing_engine |> impl.get_tracks_info() do
{:ok, tracks_info, client}
else
_other ->
media_type = media_type_with_lower_ts(client)
{chunk_or_eos, client} = do_read_chunk(client, media_type)

with %ExHLS.Chunk{} <- chunk_or_eos do
client
|> update_in([:queues, media_type], &Qex.push(&1, chunk_or_eos))
|> get_tracks_info()
else
:end_of_stream ->
{:error, "end of stream reached, but tracks info is not available", client}
end
end
end

defp media_type_with_lower_ts(client) do
cond do
client.timestamp_offsets.audio == nil ->
:audio

client.timestamp_offsets.video == nil ->
:video

true ->
[:audio, :video]
|> Enum.min_by(fn media_type ->
client.last_timestamps[media_type] - client.timestamp_offsets[media_type]
end)
end
end

defp download_chunk(client) do
client = ensure_media_playlist_loaded(client)

case client.media_playlist.timeline do
[%{uri: segment_uri} | rest] ->
client =
with %{demuxing_engine: nil} <- client do
resolve_demuxing_engine(segment_uri, client)
end

request_result =
Path.join(client.media_base_url, segment_uri)
|> Req.get!()

demuxing_engine =
client.demuxing_engine
|> client.demuxing_engine_impl.feed!(request_result.body)

client =
%{
client
| demuxing_engine: demuxing_engine,
media_playlist: %{client.media_playlist | timeline: rest}
}

{:ok, client}

[_other_tag | rest] ->
%{client | media_playlist: %{client.media_playlist | timeline: rest}}
|> download_chunk()

[] ->
client =
client
|> Map.update!(:demuxing_engine, &client.demuxing_engine_impl.end_stream/1)

{:end_of_stream, client}
end
end

defp resolve_demuxing_engine(segment_uri, %{demuxing_engine: nil} = client) do
demuxing_engine_impl =
case Path.extname(segment_uri) do
".ts" -> DemuxingEngine.MPEGTS
".m4s" -> DemuxingEngine.CMAF
".mp4" -> DemuxingEngine.CMAF
_other -> raise "Unsupported segment URI extension: #{segment_uri |> inspect()}"
end

%{
client
| demuxing_engine_impl: demuxing_engine_impl,
demuxing_engine: demuxing_engine_impl.new()
}
end

defp get_track_id!(client, type) when type in [:audio, :video] do
case get_track_id(client, type) do
{:ok, track_id} -> track_id
:error -> raise "Track ID for #{type} not found in client #{inspect(client, pretty: true)}"
end
end

defp get_track_id(client, type) when type in [:audio, :video] do
impl = client.demuxing_engine_impl

with {:ok, tracks_info} <- client.demuxing_engine |> impl.get_tracks_info() do
tracks_info
|> Enum.find_value(:error, fn
{id, %AAC{}} when type == :audio -> {:ok, id}
{id, %RemoteStream{content_format: AAC}} when type == :audio -> {:ok, id}
{id, %H264{}} when type == :video -> {:ok, id}
{id, %RemoteStream{content_format: H264}} when type == :video -> {:ok, id}
_different_type -> false
end)
else
{:error, _reason} -> :error
end
end
end
16 changes: 16 additions & 0 deletions lib/ex_hls/demuxing_engine.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule ExHLS.DemuxingEngine do
@moduledoc false

@type t :: any()

@callback new() :: t()

@callback feed!(t(), binary()) :: t()

@callback get_tracks_info(t()) :: {:ok, %{optional(integer()) => struct()}} | {:error, any()}

@callback pop_chunk(t(), track_id :: any()) ::
{:ok, ExHLS.Chunk.t(), t()} | {:error, :empty_track_data, t()}

@callback end_stream(t()) :: t()
end
Loading