Skip to content

Commit 079c400

Browse files
authored
Merge pull request #1 from membraneframework-labs/varsill/sleep_before_closing_socket
Add sleep before closing SRT socket
2 parents e5f67e9 + 565dc2b commit 079c400

File tree

5 files changed

+112
-29
lines changed

5 files changed

+112
-29
lines changed

bundlex.exs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ defmodule ExLibSRTBundlexProject do
1212
srt_nif: [
1313
sources: ["srt_nif.cpp", "server/server.cpp", "client/client.cpp", "common/srt_socket_stats.cpp"],
1414
deps: [unifex: :unifex],
15-
os_deps: [srt: :pkg_config, openssl: :pkg_config],
15+
os_deps: [srt: [{:precompiled, get_srt_url()}, :pkg_config], openssl: :pkg_config],
1616
libs: ["pthread"],
1717
interface: :nif,
1818
preprocessor: Unifex,
@@ -23,4 +23,19 @@ defmodule ExLibSRTBundlexProject do
2323
]
2424
]
2525
end
26+
27+
defp get_srt_url() do
28+
membrane_precompiled_url_prefix = "https://github.com/membraneframework-precompiled/precompiled_srt/releases/download/v1.5.4/srt"
29+
30+
case Bundlex.get_target() do
31+
%{os: "linux"} ->
32+
"#{membrane_precompiled_url_prefix}_linux.tar.gz"
33+
34+
%{architecture: "aarch64", os: "darwin" <> _rest_of_os_name} ->
35+
"#{membrane_precompiled_url_prefix}_macos_arm.tar.gz"
36+
37+
_other ->
38+
nil
39+
end
40+
end
2641
end

c_src/ex_libsrt/client/client.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ void Client::Stop() {
112112

113113
epoll = -1;
114114
}
115+
116+
sleep(1); // workaround to make sure all the packets are sent, as shown here: https://github.com/Haivision/srt/blob/952f9495246abc201bac55b8f9ad7409c0572423/examples/test-c-client.c#L94
115117
if (srt_sock != -1) {
116118
srt_close(srt_sock);
117119

c_src/ex_libsrt/client/client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <srt/srt.h>
66
#include <thread>
77
#include "../common/srt_socket_stats.h"
8+
#include <functional>
89

910
class Client {
1011
public:

lib/ex_libsrt/client.ex

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,46 +15,74 @@ defmodule ExLibSRT.Client do
1515
* `t:srt_client_error/0`
1616
"""
1717

18-
@type t :: reference()
18+
use Agent
19+
20+
@type t :: pid()
1921

2022
@type srt_client_started :: :srt_client_started
2123
@type srt_client_disconnected :: :srt_client_started
2224
@type srt_client_error :: {:srt_client_error, reason :: String.t()}
2325

2426
@doc """
25-
Starts a new SRT connection to the target address and port.
27+
Starts a new SRT connection to the target address and port and links to the current process.
28+
"""
29+
@spec start_link(address :: String.t(), port :: non_neg_integer(), stream_id :: String.t()) ::
30+
{:ok, t()} | {:error, reason :: String.t(), error_code :: integer()}
31+
def start_link(address, port, stream_id) do
32+
case ExLibSRT.Native.start_client(address, port, stream_id) do
33+
{:ok, client_ref} ->
34+
Agent.start_link(fn -> client_ref end)
35+
36+
{:error, reason, error_code} ->
37+
{:error, reason, error_code}
38+
end
39+
end
40+
41+
@doc """
42+
Starts a new SRT connection to the target address and port outside the supervision tree.
2643
"""
2744
@spec start(address :: String.t(), port :: non_neg_integer(), stream_id :: String.t()) ::
2845
{:ok, t()} | {:error, reason :: String.t(), error_code :: integer()}
2946
def start(address, port, stream_id) do
30-
ExLibSRT.Native.start_client(address, port, stream_id)
47+
case ExLibSRT.Native.start_client(address, port, stream_id) do
48+
{:ok, client_ref} ->
49+
Agent.start(fn -> client_ref end, name: {:global, client_ref})
50+
51+
{:error, reason, error_code} ->
52+
{:error, reason, error_code}
53+
end
3154
end
3255

3356
@doc """
3457
Stops the active client connection.
3558
"""
3659
@spec stop(t()) :: :ok
37-
def stop(client) do
38-
ExLibSRT.Native.stop_client(client)
60+
def stop(agent) do
61+
client_ref = Agent.get(agent, & &1)
62+
ExLibSRT.Native.stop_client(client_ref)
63+
Agent.stop(agent)
3964
end
4065

4166
@doc """
4267
Sends data through the client connection.
4368
"""
4469
@spec send_data(binary(), t()) :: :ok | {:error, :payload_too_large | (reason :: String.t())}
45-
def send_data(payload, client)
70+
def send_data(payload, agent)
4671

47-
def send_data(payload, _client) when byte_size(payload) > 1316, do: {:error, :payload_too_large}
72+
def send_data(payload, _agent) when byte_size(payload) > 1316, do: {:error, :payload_too_large}
4873

49-
def send_data(payload, client) do
50-
ExLibSRT.Native.send_client_data(payload, client)
74+
def send_data(payload, agent) do
75+
client_ref = Agent.get(agent, & &1)
76+
ExLibSRT.Native.send_client_data(payload, client_ref)
5177
end
5278

5379
@doc """
5480
Reads socket statistics.
5581
"""
5682
@spec read_socket_stats(t()) ::
5783
{:ok, ExLibSRT.SocketStats.t()} | {:error, reason :: String.t()}
58-
def read_socket_stats(client),
59-
do: ExLibSRT.Native.read_client_socket_stats(client)
84+
def read_socket_stats(agent) do
85+
client_ref = Agent.get(agent, & &1)
86+
ExLibSRT.Native.read_client_socket_stats(client_ref)
87+
end
6088
end

lib/ex_libsrt/server.ex

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ defmodule ExLibSRT.Server do
1717
* `t:srt_server_conn_closed/0` - a client connection has been closed
1818
* `t:srt_server_error/0` - server has encountered an error
1919
* `t:srt_data/0` - server has received new data on a client connection
20-
* `t:srt_server_connect_request/0` - server has triggered a new connection request
20+
* `t:srt_server_connect_request/0` - server has triggered a new connection request
2121
(see `accept_awaiting_connect_request/1` and `reject_awaiting_connect_request/1` for answering the request)
2222
2323
### Accepting connections
@@ -33,11 +33,13 @@ defmodule ExLibSRT.Server do
3333
> #### Response timeout {: .warning}
3434
>
3535
> It is very important to answer the connection request as fast as possible.
36-
> Due to how `libsrt` works, while the server waits for the response it blocks the receiving thread
36+
> Due to how `libsrt` works, while the server waits for the response it blocks the receiving thread
3737
> and potentially interrupts other ongoing connections.
3838
"""
3939

40-
@type t :: reference()
40+
use Agent
41+
42+
@type t :: pid()
4143

4244
@type connection_id :: non_neg_integer()
4345

@@ -49,14 +51,37 @@ defmodule ExLibSRT.Server do
4951
{:srt_server_connect_request, address :: String.t(), stream_id :: String.t()}
5052

5153
@doc """
52-
Starts a new SRT server binding to given address and port.
54+
Starts a new SRT server binding to given address and port and links to current process.
55+
56+
One may usually want to bind to `0.0.0.0` address.
57+
"""
58+
@spec start_link(address :: String.t(), port :: non_neg_integer()) ::
59+
{:ok, t()} | {:error, reason :: String.t(), error_code :: integer()}
60+
def start_link(address, port) do
61+
case ExLibSRT.Native.start_server(address, port) do
62+
{:ok, server_ref} ->
63+
Agent.start_link(fn -> server_ref end)
64+
65+
{:error, reason, error_code} ->
66+
{:error, reason, error_code}
67+
end
68+
end
69+
70+
@doc """
71+
Starts a new SRT server outside the supervision tree, binding to given address and port.
5372
5473
One may usually want to bind to `0.0.0.0` address.
5574
"""
5675
@spec start(address :: String.t(), port :: non_neg_integer()) ::
5776
{:ok, t()} | {:error, reason :: String.t(), error_code :: integer()}
5877
def start(address, port) do
59-
ExLibSRT.Native.start_server(address, port)
78+
case ExLibSRT.Native.start_server(address, port) do
79+
{:ok, server_ref} ->
80+
Agent.start(fn -> server_ref end, name: {:global, server_ref})
81+
82+
{:error, reason, error_code} ->
83+
{:error, reason, error_code}
84+
end
6085
end
6186

6287
@doc """
@@ -65,25 +90,31 @@ defmodule ExLibSRT.Server do
6590
Stopping a server should gracefuly close all the client connections.
6691
"""
6792
@spec stop(t()) :: :ok
68-
def stop(server) do
69-
ExLibSRT.Native.stop_server(server)
93+
def stop(agent) do
94+
server_ref = Agent.get(agent, & &1)
95+
ExLibSRT.Native.stop_server(server_ref)
96+
Agent.stop(agent)
7097
end
7198

7299
@doc """
73100
Acccepts the currently awaiting connection request.
74101
"""
75102
@spec accept_awaiting_connect_request(t()) :: :ok | {:error, reason :: String.t()}
76-
def accept_awaiting_connect_request(server),
77-
do: ExLibSRT.Native.accept_awaiting_connect_request(self(), server)
103+
def accept_awaiting_connect_request(agent) do
104+
server_ref = Agent.get(agent, & &1)
105+
ExLibSRT.Native.accept_awaiting_connect_request(self(), server_ref)
106+
end
78107

79108
@doc """
80109
Acccepts the currently awaiting connection request and starts a separate connection process
81110
"""
82111
@spec accept_awaiting_connect_request_with_handler(ExLibSRT.Connection.Handler.t(), t()) ::
83112
{:ok, ExLibSRT.Connection.t()} | {:error, reason :: any()}
84-
def accept_awaiting_connect_request_with_handler(handler, server) do
113+
def accept_awaiting_connect_request_with_handler(handler, agent) do
114+
server_ref = Agent.get(agent, & &1)
115+
85116
with {:ok, handler} <- ExLibSRT.Connection.start(handler) do
86-
case ExLibSRT.Native.accept_awaiting_connect_request(handler, server) do
117+
case ExLibSRT.Native.accept_awaiting_connect_request(handler, server_ref) do
87118
:ok ->
88119
{:ok, handler}
89120

@@ -99,21 +130,27 @@ defmodule ExLibSRT.Server do
99130
Rejects the currently awaiting connection request.
100131
"""
101132
@spec reject_awaiting_connect_request(t()) :: :ok | {:error, reason :: String.t()}
102-
def reject_awaiting_connect_request(server),
103-
do: ExLibSRT.Native.reject_awaiting_connect_request(server)
133+
def reject_awaiting_connect_request(agent) do
134+
server_ref = Agent.get(agent, & &1)
135+
ExLibSRT.Native.reject_awaiting_connect_request(server_ref)
136+
end
104137

105138
@doc """
106139
Closes the connection to the given client.
107140
"""
108141
@spec close_server_connection(connection_id(), t()) :: :ok | {:error, reason :: String.t()}
109-
def close_server_connection(connection_id, server),
110-
do: ExLibSRT.Native.close_server_connection(connection_id, server)
142+
def close_server_connection(connection_id, agent) do
143+
server_ref = Agent.get(agent, & &1)
144+
ExLibSRT.Native.close_server_connection(connection_id, server_ref)
145+
end
111146

112147
@doc """
113148
Reads socket statistics.
114149
"""
115150
@spec read_socket_stats(connection_id(), t()) ::
116151
{:ok, ExLibSRT.SocketStats.t()} | {:error, reason :: String.t()}
117-
def read_socket_stats(connection_id, server),
118-
do: ExLibSRT.Native.read_server_socket_stats(connection_id, server)
152+
def read_socket_stats(connection_id, agent) do
153+
server_ref = Agent.get(agent, & &1)
154+
ExLibSRT.Native.read_server_socket_stats(connection_id, server_ref)
155+
end
119156
end

0 commit comments

Comments
 (0)