Skip to content

Commit d1aa187

Browse files
committed
feat: support function streams in multipart handling
Signed-off-by: Yordis Prieto <[email protected]> fixes #648
1 parent 058bddc commit d1aa187

File tree

2 files changed

+205
-3
lines changed

2 files changed

+205
-3
lines changed

lib/tesla/multipart.ex

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ defmodule Tesla.Multipart do
3333
end
3434

3535
@type part_stream :: Enum.t()
36-
@type part_value :: iodata | part_stream
36+
@type part_value :: iodata | part_stream | function()
3737

3838
defstruct parts: [],
3939
boundary: nil,
@@ -181,12 +181,13 @@ defmodule Tesla.Multipart do
181181

182182
@spec assert_part_value!(any) :: :ok | no_return
183183
defp assert_part_value!(%maybe_stream{})
184-
when maybe_stream in [IO.Stream, File.Stream, Stream],
184+
when maybe_stream in [IO.Stream, File.Stream, Stream, Range],
185185
do: :ok
186186

187187
defp assert_part_value!(value)
188188
when is_list(value)
189-
when is_binary(value),
189+
when is_binary(value)
190+
when is_function(value),
190191
do: :ok
191192

192193
defp assert_part_value!(val) do

test/tesla/multipart_test.exs

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,5 +290,206 @@ defmodule Tesla.MultipartTest do
290290

291291
assert is_function(Multipart.body(mp))
292292
end
293+
294+
test "function/2 streaming response" do
295+
stream_fun =
296+
Stream.resource(
297+
fn -> ["chunk1", "chunk2", "final"] end,
298+
fn
299+
[head | tail] -> {[head], tail}
300+
[] -> {:halt, []}
301+
end,
302+
fn _ -> :ok end
303+
)
304+
305+
mp =
306+
Multipart.new()
307+
|> Multipart.add_field("stream_field", stream_fun)
308+
309+
assert is_function(Multipart.body(mp))
310+
end
311+
312+
test "function/2 streaming response with file content" do
313+
stream_fun =
314+
Stream.resource(
315+
fn -> ["file data chunk 1", "file data chunk 2"] end,
316+
fn
317+
[head | tail] -> {[head], tail}
318+
[] -> {:halt, []}
319+
end,
320+
fn _ -> :ok end
321+
)
322+
323+
mp =
324+
Multipart.new()
325+
|> Multipart.add_file_content(stream_fun, "streamed_file.mp4")
326+
|> Multipart.add_field("model", "whisper-1")
327+
|> Multipart.add_field("response_format", "json")
328+
329+
assert is_function(Multipart.body(mp))
330+
331+
body_stream = Multipart.body(mp)
332+
body_content = body_stream |> Enum.to_list() |> IO.iodata_to_binary()
333+
334+
assert body_content =~ ~s(name="file"; filename="streamed_file.mp4")
335+
assert body_content =~ "file data chunk 1"
336+
assert body_content =~ "file data chunk 2"
337+
assert body_content =~ ~s(name="model")
338+
assert body_content =~ "whisper-1"
339+
assert body_content =~ ~s(name="response_format")
340+
assert body_content =~ "json"
341+
end
342+
343+
test "raw function/2 like Tesla adapter returns" do
344+
stream_fun = fn
345+
{:cont, acc} -> {:suspended, "data", acc}
346+
{:halt, acc} -> {:halted, acc}
347+
end
348+
349+
mp =
350+
Multipart.new()
351+
|> Multipart.add_file_content(stream_fun, "test.mp4")
352+
353+
assert %Multipart{} = mp
354+
assert length(mp.parts) == 1
355+
end
356+
end
357+
358+
describe "streaming function support" do
359+
@http "http://localhost:#{Application.compile_env(:httparrot, :http_port)}"
360+
361+
defp call_adapter(adapter, env, opts \\ []) do
362+
case adapter do
363+
{adapter_module, adapter_opts} ->
364+
adapter_module.call(env, Keyword.merge(opts, adapter_opts))
365+
366+
adapter_module ->
367+
adapter_module.call(env, opts)
368+
end
369+
end
370+
371+
test "accepts Stream.resource functions from Tesla adapters" do
372+
stream_data =
373+
Stream.resource(
374+
fn -> ["chunk1", "chunk2", "chunk3"] end,
375+
fn
376+
[head | tail] -> {[head], tail}
377+
[] -> {:halt, []}
378+
end,
379+
fn _ -> :ok end
380+
)
381+
382+
mp =
383+
Multipart.new()
384+
|> Multipart.add_file_content(stream_data, "streamed_file.txt")
385+
|> Multipart.add_field("type", "stream_test")
386+
387+
request = %Tesla.Env{
388+
method: :post,
389+
url: "#{@http}/post",
390+
body: mp
391+
}
392+
393+
assert {:ok, response} = call_adapter(Tesla.Adapter.Mint, request)
394+
assert response.status == 200
395+
396+
{:ok, response} = Tesla.Middleware.JSON.decode(response, [])
397+
398+
assert response.body["form"]["type"] == "stream_test"
399+
assert response.body["files"]["file"] == "chunk1chunk2chunk3"
400+
401+
content_type = response.body["headers"]["content-type"]
402+
assert content_type =~ "multipart/form-data"
403+
assert content_type =~ "boundary="
404+
end
405+
406+
test "works with File.Stream across different adapters" do
407+
file_stream = File.stream!("test/tesla/multipart_test_file.sh")
408+
409+
mp =
410+
Multipart.new()
411+
|> Multipart.add_file_content(file_stream, "test_script.sh")
412+
|> Multipart.add_field("adapter", "hackney")
413+
414+
request = %Tesla.Env{
415+
method: :post,
416+
url: "#{@http}/post",
417+
body: mp
418+
}
419+
420+
assert {:ok, response} = call_adapter(Tesla.Adapter.Hackney, request)
421+
assert response.status == 200
422+
423+
{:ok, response} = Tesla.Middleware.JSON.decode(response, [])
424+
425+
assert response.body["form"]["adapter"] == "hackney"
426+
427+
assert response.body["files"]["file"] ==
428+
"#!/usr/bin/env bash\necho \"test multipart file\"\n"
429+
end
430+
431+
test "reproduces GitHub issue #648 scenario" do
432+
gcp_stream_response =
433+
Stream.resource(
434+
fn -> ["audio_data_chunk_1", "audio_data_chunk_2", "audio_data_chunk_3"] end,
435+
fn
436+
[head | tail] -> {[head], tail}
437+
[] -> {:halt, []}
438+
end,
439+
fn _ -> :ok end
440+
)
441+
442+
upload_body =
443+
Multipart.new()
444+
|> Multipart.add_file_content(gcp_stream_response, "audio.mp4")
445+
|> Multipart.add_field("model", "whisper-1")
446+
|> Multipart.add_field("response_format", "json")
447+
448+
request = %Tesla.Env{
449+
method: :post,
450+
url: "#{@http}/post",
451+
body: upload_body
452+
}
453+
454+
assert {:ok, response} = call_adapter(Tesla.Adapter.Mint, request)
455+
assert response.status == 200
456+
457+
{:ok, response} = Tesla.Middleware.JSON.decode(response, [])
458+
459+
assert response.body["form"]["model"] == "whisper-1"
460+
assert response.body["form"]["response_format"] == "json"
461+
462+
assert response.body["files"]["file"] ==
463+
"audio_data_chunk_1audio_data_chunk_2audio_data_chunk_3"
464+
end
465+
466+
test "handles large streaming data efficiently" do
467+
large_stream = Stream.repeatedly(fn -> "data_chunk_" end) |> Stream.take(100)
468+
469+
mp =
470+
Multipart.new()
471+
|> Multipart.add_file_content(large_stream, "large_file.dat")
472+
|> Multipart.add_field("size", "large")
473+
474+
request = %Tesla.Env{
475+
method: :post,
476+
url: "#{@http}/post",
477+
body: mp
478+
}
479+
480+
assert {:ok, response} = call_adapter(Tesla.Adapter.Mint, request)
481+
assert response.status == 200
482+
483+
{:ok, response} = Tesla.Middleware.JSON.decode(response, [])
484+
485+
assert response.body["form"]["size"] == "large"
486+
487+
file_content = response.body["files"]["file"]
488+
assert String.contains?(file_content, "data_chunk_")
489+
490+
expected_length = String.length("data_chunk_") * 100
491+
actual_length = String.length(file_content)
492+
assert abs(actual_length - expected_length) < 50
493+
end
293494
end
294495
end

0 commit comments

Comments
 (0)