From 836a83eba044a535337f3e7c8f6fda6dc5920480 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Wed, 1 Oct 2025 12:27:13 +0200 Subject: [PATCH 01/13] return skipped upserts in bulk_create --- lib/data_layer.ex | 61 ++++++++++++++++++++++++++++++++++++ mix.exs | 4 +-- mix.lock | 1 + test/bulk_create_test.exs | 65 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 129 insertions(+), 2 deletions(-) diff --git a/lib/data_layer.ex b/lib/data_layer.ex index 07812631..2769ecb2 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -2035,6 +2035,66 @@ defmodule AshPostgres.DataLayer do repo.insert_all(source, ecto_changesets, opts) end) + result = + if options[:return_skipped_upsert?] do + identity = options[:identity] + [changeset | _] = changesets + + results = + result + |> elem(1) + |> List.wrap() + |> Enum.reduce(%{}, fn r, acc -> + Map.put(acc, Map.take(r, identity.keys), r) + end) + + ash_query = + resource + |> Ash.Query.do_filter( + or: + changesets + |> Enum.filter(fn changeset -> + not Map.has_key?(results, Map.take(changeset.attributes, identity.keys)) + end) + |> Enum.map(fn changeset -> + changeset.attributes + |> Map.take(identity.keys) + |> Keyword.new() + end) + ) + |> then(fn + query when is_nil(identity) or is_nil(identity.where) -> query + query -> Ash.Query.do_filter(query, identity.where) + end) + |> Ash.Query.set_tenant(changeset.tenant) + + skipped_upserts = + with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query), + {:ok, results} <- run_query(ecto_query, resource) do + results + |> Enum.map(fn result -> + Ash.Resource.put_metadata(result, :upsert_skipped, true) + end) + |> Enum.reduce(%{}, fn r, acc -> + Map.put(acc, Map.take(r, identity.keys), r) + end) + end + + results = + changesets + |> Enum.map(fn changeset -> + identity = + changeset.attributes + |> Map.take(identity.keys) + + Map.get(results, identity, Map.get(skipped_upserts, identity)) + end) + + {length(results), results} + else + result + end + case result do {_, nil} -> :ok @@ -2045,6 +2105,7 @@ defmodule AshPostgres.DataLayer do {:ok, results} else + # TODO: what if there are less results than changesets because of upsert conditions? {:ok, Stream.zip_with(results, changesets, fn result, changeset -> if !opts[:upsert?] do diff --git a/mix.exs b/mix.exs index cf59bf71..b03849fd 100644 --- a/mix.exs +++ b/mix.exs @@ -180,7 +180,7 @@ defmodule AshPostgres.MixProject do {:ash, ash_version("~> 3.5 and >= 3.5.35")}, {:spark, "~> 2.3 and >= 2.3.4"}, {:ash_sql, ash_sql_version("~> 0.3 and >= 0.3.2")}, - {:igniter, "~> 0.6 and >= 0.6.14", optional: true}, + {:igniter, "~> 0.6 and >= 0.6.29", optional: true}, {:ecto_sql, "~> 3.13"}, {:ecto, "~> 3.13"}, {:jason, "~> 1.0"}, @@ -197,7 +197,7 @@ defmodule AshPostgres.MixProject do {:credo, ">= 0.0.0", only: [:dev, :test], runtime: false}, {:mix_audit, ">= 0.0.0", only: [:dev, :test], runtime: false}, {:dialyxir, ">= 0.0.0", only: [:dev, :test], runtime: false}, - {:sobelow, ">= 0.0.0", only: [:dev, :test], runtime: false} + {:mix_test_watch, "~> 1.0", only: [:dev, :test]} ] end diff --git a/mix.lock b/mix.lock index 1827dcb4..796d2dff 100644 --- a/mix.lock +++ b/mix.lock @@ -33,6 +33,7 @@ "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, "mix_audit": {:hex, :mix_audit, "2.1.5", "c0f77cee6b4ef9d97e37772359a187a166c7a1e0e08b50edf5bf6959dfe5a016", [:make, :mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.11", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "87f9298e21da32f697af535475860dc1d3617a010e0b418d2ec6142bc8b42d69"}, + "mix_test_watch": {:hex, :mix_test_watch, "1.3.0", "2ffc9f72b0d1f4ecf0ce97b044e0e3c607c3b4dc21d6228365e8bc7c2856dc77", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f9e5edca976857ffac78632e635750d158df14ee2d6185a15013844af7570ffe"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, diff --git a/test/bulk_create_test.exs b/test/bulk_create_test.exs index e48d0238..ad4ffa53 100644 --- a/test/bulk_create_test.exs +++ b/test/bulk_create_test.exs @@ -176,6 +176,71 @@ defmodule AshPostgres.BulkCreateTest do end) end + @tag :focus + test "bulk upsert returns skipped records with return_skipped_upsert?" do + assert [ + {:ok, %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}}, + {:ok, %{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20}}, + {:ok, %{title: "herbert", uniq_if_contains_foo: "3", price: 30}} + ] = + Ash.bulk_create!( + [ + %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}, + %{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20}, + %{title: "herbert", uniq_if_contains_foo: "3", price: 30} + ], + Post, + :create, + return_stream?: true, + return_records?: true + ) + |> Enum.sort_by(fn {:ok, result} -> result.title end) + + results = + Ash.bulk_create!( + [ + %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}, + %{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20_000}, + %{title: "herbert", uniq_if_contains_foo: "3", price: 30} + ], + Post, + :upsert_with_no_filter, + return_stream?: true, + upsert_condition: expr(price != upsert_conflict(:price)), + return_errors?: true, + return_records?: true, + return_skipped_upsert?: true + ) + |> Enum.sort_by(fn + {:ok, result} -> + result.title + + _ -> + nil + end) + + assert [ + {:ok, skipped}, + {:ok, updated}, + {:ok, no_conflict} + ] = results + + # "fredfoo" was skipped because price matches (10 == 10) + assert skipped.title == "fredfoo" + assert skipped.price == 10 + assert Ash.Resource.get_metadata(skipped, :upsert_skipped) == true + + # "georgefoo" was updated because price differs (20 -> 20_000) + assert updated.title == "georgefoo" + assert updated.price == 20_000 + refute Ash.Resource.get_metadata(updated, :upsert_skipped) + + # "herbert" had no conflict (doesn't match identity) + assert no_conflict.title == "herbert" + assert no_conflict.price == 30 + refute Ash.Resource.get_metadata(no_conflict, :upsert_skipped) + end + # confirmed that this doesn't work because it can't. An upsert must map to a potentially successful insert. # leaving this test here for posterity # test "bulk creates can upsert with id" do From 81bc0f43969ac6b19b231762995fb50c4121d1d1 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Thu, 2 Oct 2025 16:36:24 +0200 Subject: [PATCH 02/13] add can? for bulk_uspert_return_skipped --- lib/data_layer.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/data_layer.ex b/lib/data_layer.ex index 2769ecb2..23b5ed2a 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -662,6 +662,7 @@ defmodule AshPostgres.DataLayer do def can?(_, :combine), do: true def can?(_, {:combine, _}), do: true def can?(_, :bulk_create), do: true + def can?(_, :bulk_upsert_return_skipped), do: true def can?(_, :action_select), do: true From 4fbc33c9525fdd6dde12d5196105c8cd07b7ef7b Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Thu, 2 Oct 2025 16:45:29 +0200 Subject: [PATCH 03/13] revert deps changes --- mix.exs | 2 +- mix.lock | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index b03849fd..0ede3a11 100644 --- a/mix.exs +++ b/mix.exs @@ -197,7 +197,7 @@ defmodule AshPostgres.MixProject do {:credo, ">= 0.0.0", only: [:dev, :test], runtime: false}, {:mix_audit, ">= 0.0.0", only: [:dev, :test], runtime: false}, {:dialyxir, ">= 0.0.0", only: [:dev, :test], runtime: false}, - {:mix_test_watch, "~> 1.0", only: [:dev, :test]} + {:sobelow, ">= 0.0.0", only: [:dev, :test], runtime: false} ] end diff --git a/mix.lock b/mix.lock index 796d2dff..1827dcb4 100644 --- a/mix.lock +++ b/mix.lock @@ -33,7 +33,6 @@ "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, "mix_audit": {:hex, :mix_audit, "2.1.5", "c0f77cee6b4ef9d97e37772359a187a166c7a1e0e08b50edf5bf6959dfe5a016", [:make, :mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.11", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "87f9298e21da32f697af535475860dc1d3617a010e0b418d2ec6142bc8b42d69"}, - "mix_test_watch": {:hex, :mix_test_watch, "1.3.0", "2ffc9f72b0d1f4ecf0ce97b044e0e3c607c3b4dc21d6228365e8bc7c2856dc77", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f9e5edca976857ffac78632e635750d158df14ee2d6185a15013844af7570ffe"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, From 309dffafd1d3efe4a43dc3adbaa06f0d4827cd83 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Thu, 2 Oct 2025 17:00:07 +0200 Subject: [PATCH 04/13] return results in the order of changesets --- lib/data_layer.ex | 78 ++++++++++++++++++++++++++++------------------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/lib/data_layer.ex b/lib/data_layer.ex index 23b5ed2a..aaff6537 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -2036,26 +2036,30 @@ defmodule AshPostgres.DataLayer do repo.insert_all(source, ecto_changesets, opts) end) + identity = options[:identity] + + results_by_identity = + result + |> elem(1) + |> List.wrap() + |> Enum.reduce(%{}, fn r, acc -> + Map.put(acc, Map.take(r, identity.keys), r) + end) + result = if options[:return_skipped_upsert?] do - identity = options[:identity] [changeset | _] = changesets - results = - result - |> elem(1) - |> List.wrap() - |> Enum.reduce(%{}, fn r, acc -> - Map.put(acc, Map.take(r, identity.keys), r) - end) - ash_query = resource |> Ash.Query.do_filter( or: changesets |> Enum.filter(fn changeset -> - not Map.has_key?(results, Map.take(changeset.attributes, identity.keys)) + not Map.has_key?( + results_by_identity, + Map.take(changeset.attributes, identity.keys) + ) end) |> Enum.map(fn changeset -> changeset.attributes @@ -2088,8 +2092,9 @@ defmodule AshPostgres.DataLayer do changeset.attributes |> Map.take(identity.keys) - Map.get(results, identity, Map.get(skipped_upserts, identity)) + Map.get(results_by_identity, identity, Map.get(skipped_upserts, identity)) end) + |> Enum.filter(& &1) {length(results), results} else @@ -2106,26 +2111,37 @@ defmodule AshPostgres.DataLayer do {:ok, results} else - # TODO: what if there are less results than changesets because of upsert conditions? - {:ok, - Stream.zip_with(results, changesets, fn result, changeset -> - if !opts[:upsert?] do - maybe_create_tenant!(resource, result) - end - - case get_bulk_operation_metadata(changeset) do - {index, metadata_key} -> - Ash.Resource.put_metadata(result, metadata_key, index) - - nil -> - # Compatibility fallback - Ash.Resource.put_metadata( - result, - :bulk_create_index, - changeset.context[:bulk_create][:index] - ) - end - end)} + results = + changesets + |> Enum.map(fn changeset -> + identity = + changeset.attributes + |> Map.take(identity.keys) + + result_for_changeset = Map.get(results_by_identity, identity) + + if result_for_changeset do + if !opts[:upsert?] do + maybe_create_tenant!(resource, result_for_changeset) + end + + case get_bulk_operation_metadata(changeset) do + {index, metadata_key} -> + Ash.Resource.put_metadata(result, metadata_key, index) + + nil -> + # Compatibility fallback + Ash.Resource.put_metadata( + result, + :bulk_create_index, + changeset.context[:bulk_create][:index] + ) + end + end + end) + |> Enum.filter(& &1) + + {:ok, results} end end rescue From 67facf823d4f947d6a7408bc745c2645fdbce5af Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Thu, 2 Oct 2025 17:01:52 +0200 Subject: [PATCH 05/13] remove focus tag from test --- test/bulk_create_test.exs | 1 - 1 file changed, 1 deletion(-) diff --git a/test/bulk_create_test.exs b/test/bulk_create_test.exs index ad4ffa53..4cf1ef87 100644 --- a/test/bulk_create_test.exs +++ b/test/bulk_create_test.exs @@ -176,7 +176,6 @@ defmodule AshPostgres.BulkCreateTest do end) end - @tag :focus test "bulk upsert returns skipped records with return_skipped_upsert?" do assert [ {:ok, %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}}, From 3d4b95e76472b05f448534c8658581137a3daa62 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Thu, 2 Oct 2025 17:03:59 +0200 Subject: [PATCH 06/13] remove comments from test --- test/bulk_create_test.exs | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/bulk_create_test.exs b/test/bulk_create_test.exs index 4cf1ef87..420ff219 100644 --- a/test/bulk_create_test.exs +++ b/test/bulk_create_test.exs @@ -224,17 +224,14 @@ defmodule AshPostgres.BulkCreateTest do {:ok, no_conflict} ] = results - # "fredfoo" was skipped because price matches (10 == 10) assert skipped.title == "fredfoo" assert skipped.price == 10 assert Ash.Resource.get_metadata(skipped, :upsert_skipped) == true - # "georgefoo" was updated because price differs (20 -> 20_000) assert updated.title == "georgefoo" assert updated.price == 20_000 refute Ash.Resource.get_metadata(updated, :upsert_skipped) - # "herbert" had no conflict (doesn't match identity) assert no_conflict.title == "herbert" assert no_conflict.price == 30 refute Ash.Resource.get_metadata(no_conflict, :upsert_skipped) From 8844568f1bbb4a0876f8987c25f5ea0cb5986f52 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Thu, 2 Oct 2025 17:16:36 +0200 Subject: [PATCH 07/13] fix dialyzer --- lib/data_layer.ex | 93 +++++++++++++++++++++++------------------------ 1 file changed, 46 insertions(+), 47 deletions(-) diff --git a/lib/data_layer.ex b/lib/data_layer.ex index aaff6537..a0fd41a6 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -2046,60 +2046,59 @@ defmodule AshPostgres.DataLayer do Map.put(acc, Map.take(r, identity.keys), r) end) - result = - if options[:return_skipped_upsert?] do - [changeset | _] = changesets + if options[:return_skipped_upsert?] do + [changeset | _] = changesets - ash_query = - resource - |> Ash.Query.do_filter( - or: - changesets - |> Enum.filter(fn changeset -> - not Map.has_key?( - results_by_identity, - Map.take(changeset.attributes, identity.keys) - ) - end) - |> Enum.map(fn changeset -> - changeset.attributes - |> Map.take(identity.keys) - |> Keyword.new() - end) - ) - |> then(fn - query when is_nil(identity) or is_nil(identity.where) -> query - query -> Ash.Query.do_filter(query, identity.where) - end) - |> Ash.Query.set_tenant(changeset.tenant) - - skipped_upserts = - with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query), - {:ok, results} <- run_query(ecto_query, resource) do - results - |> Enum.map(fn result -> - Ash.Resource.put_metadata(result, :upsert_skipped, true) - end) - |> Enum.reduce(%{}, fn r, acc -> - Map.put(acc, Map.take(r, identity.keys), r) + ash_query = + resource + |> Ash.Query.do_filter( + or: + changesets + |> Enum.filter(fn changeset -> + not Map.has_key?( + results_by_identity, + Map.take(changeset.attributes, identity.keys) + ) end) - end - - results = - changesets - |> Enum.map(fn changeset -> - identity = + |> Enum.map(fn changeset -> changeset.attributes |> Map.take(identity.keys) + |> Keyword.new() + end) + ) + |> then(fn + query when is_nil(identity) or is_nil(identity.where) -> query + query -> Ash.Query.do_filter(query, identity.where) + end) + |> Ash.Query.set_tenant(changeset.tenant) - Map.get(results_by_identity, identity, Map.get(skipped_upserts, identity)) + skipped_upserts = + with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query), + {:ok, results} <- run_query(ecto_query, resource) do + results + |> Enum.map(fn result -> + Ash.Resource.put_metadata(result, :upsert_skipped, true) + end) + |> Enum.reduce(%{}, fn r, acc -> + Map.put(acc, Map.take(r, identity.keys), r) end) - |> Enum.filter(& &1) + end - {length(results), results} - else - result - end + results = + changesets + |> Enum.map(fn changeset -> + identity = + changeset.attributes + |> Map.take(identity.keys) + + Map.get(results_by_identity, identity, Map.get(skipped_upserts, identity)) + end) + |> Enum.filter(& &1) + + {length(results), results} + else + result + end case result do {_, nil} -> From fd5220f088417e4eac16b8fd779e72627539cb6b Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Thu, 2 Oct 2025 17:41:08 +0200 Subject: [PATCH 08/13] handle upsert without identity option --- lib/data_layer.ex | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/lib/data_layer.ex b/lib/data_layer.ex index a0fd41a6..34ed220b 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -2036,17 +2036,18 @@ defmodule AshPostgres.DataLayer do repo.insert_all(source, ecto_changesets, opts) end) - identity = options[:identity] + identity = options[:identity] + keys = Map.get(identity || %{}, :keys) || Ash.Resource.Info.primary_key(resource) results_by_identity = result |> elem(1) |> List.wrap() |> Enum.reduce(%{}, fn r, acc -> - Map.put(acc, Map.take(r, identity.keys), r) + Map.put(acc, Map.take(r, keys), r) end) - if options[:return_skipped_upsert?] do + if options[:return_skipped_upsert?] && !opts[:single] do [changeset | _] = changesets ash_query = @@ -2057,7 +2058,7 @@ defmodule AshPostgres.DataLayer do |> Enum.filter(fn changeset -> not Map.has_key?( results_by_identity, - Map.take(changeset.attributes, identity.keys) + Map.take(changeset.attributes, keys) ) end) |> Enum.map(fn changeset -> @@ -2080,7 +2081,7 @@ defmodule AshPostgres.DataLayer do Ash.Resource.put_metadata(result, :upsert_skipped, true) end) |> Enum.reduce(%{}, fn r, acc -> - Map.put(acc, Map.take(r, identity.keys), r) + Map.put(acc, Map.take(r, keys), r) end) end @@ -2089,7 +2090,7 @@ defmodule AshPostgres.DataLayer do |> Enum.map(fn changeset -> identity = changeset.attributes - |> Map.take(identity.keys) + |> Map.take(keys) Map.get(results_by_identity, identity, Map.get(skipped_upserts, identity)) end) @@ -2115,7 +2116,7 @@ defmodule AshPostgres.DataLayer do |> Enum.map(fn changeset -> identity = changeset.attributes - |> Map.take(identity.keys) + |> Map.take(keys) result_for_changeset = Map.get(results_by_identity, identity) From d6201cb38aaba4d92029145a3029bb2d09e886cf Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Thu, 2 Oct 2025 17:43:16 +0200 Subject: [PATCH 09/13] add comment for `single?` handling --- lib/data_layer.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/data_layer.ex b/lib/data_layer.ex index 34ed220b..56e1c256 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -2047,6 +2047,8 @@ defmodule AshPostgres.DataLayer do Map.put(acc, Map.take(r, keys), r) end) + # if it's single the return_skipped_upsert? is handled at the + # call site https://github.com/ash-project/ash_postgres/blob/0b21d4a99cc3f6d8676947e291ac9b9d57ad6e2e/lib/data_layer.ex#L3046-L3046 if options[:return_skipped_upsert?] && !opts[:single] do [changeset | _] = changesets From c4c16def6de066505e14f3e83cb8f4d1881b5b27 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Thu, 2 Oct 2025 17:44:01 +0200 Subject: [PATCH 10/13] fix typo --- lib/data_layer.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/data_layer.ex b/lib/data_layer.ex index 56e1c256..f21a9009 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -2036,7 +2036,7 @@ defmodule AshPostgres.DataLayer do repo.insert_all(source, ecto_changesets, opts) end) - identity = options[:identity] + identity = options[:identity] keys = Map.get(identity || %{}, :keys) || Ash.Resource.Info.primary_key(resource) results_by_identity = @@ -2049,7 +2049,7 @@ defmodule AshPostgres.DataLayer do # if it's single the return_skipped_upsert? is handled at the # call site https://github.com/ash-project/ash_postgres/blob/0b21d4a99cc3f6d8676947e291ac9b9d57ad6e2e/lib/data_layer.ex#L3046-L3046 - if options[:return_skipped_upsert?] && !opts[:single] do + if options[:return_skipped_upsert?] && !opts[:single?] do [changeset | _] = changesets ash_query = From 6c1c0addb7a0fb7612613972460769f85b2fd30e Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Thu, 2 Oct 2025 18:20:06 +0200 Subject: [PATCH 11/13] fix logic error --- lib/data_layer.ex | 117 ++++++++++++++++++++++++---------------------- 1 file changed, 62 insertions(+), 55 deletions(-) diff --git a/lib/data_layer.ex b/lib/data_layer.ex index f21a9009..bd2e7603 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -2039,69 +2039,70 @@ defmodule AshPostgres.DataLayer do identity = options[:identity] keys = Map.get(identity || %{}, :keys) || Ash.Resource.Info.primary_key(resource) - results_by_identity = - result - |> elem(1) - |> List.wrap() - |> Enum.reduce(%{}, fn r, acc -> - Map.put(acc, Map.take(r, keys), r) - end) - # if it's single the return_skipped_upsert? is handled at the # call site https://github.com/ash-project/ash_postgres/blob/0b21d4a99cc3f6d8676947e291ac9b9d57ad6e2e/lib/data_layer.ex#L3046-L3046 - if options[:return_skipped_upsert?] && !opts[:single?] do - [changeset | _] = changesets + result = + if options[:return_skipped_upsert?] && !opts[:single?] do + [changeset | _] = changesets + + results_by_identity = + result + |> elem(1) + |> List.wrap() + |> Enum.into(%{}, fn r -> + {Map.take(r, keys), r} + end) - ash_query = - resource - |> Ash.Query.do_filter( - or: - changesets - |> Enum.filter(fn changeset -> - not Map.has_key?( - results_by_identity, - Map.take(changeset.attributes, keys) - ) + ash_query = + resource + |> Ash.Query.do_filter( + or: + changesets + |> Enum.filter(fn changeset -> + not Map.has_key?( + results_by_identity, + Map.take(changeset.attributes, keys) + ) + end) + |> Enum.map(fn changeset -> + changeset.attributes + |> Map.take(keys) + |> Keyword.new() + end) + ) + |> then(fn + query when is_nil(identity) or is_nil(identity.where) -> query + query -> Ash.Query.do_filter(query, identity.where) + end) + |> Ash.Query.set_tenant(changeset.tenant) + + skipped_upserts = + with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query), + {:ok, results} <- run_query(ecto_query, resource) do + results + |> Enum.map(fn result -> + Ash.Resource.put_metadata(result, :upsert_skipped, true) end) - |> Enum.map(fn changeset -> - changeset.attributes - |> Map.take(identity.keys) - |> Keyword.new() + |> Enum.reduce(%{}, fn r, acc -> + Map.put(acc, Map.take(r, keys), r) end) - ) - |> then(fn - query when is_nil(identity) or is_nil(identity.where) -> query - query -> Ash.Query.do_filter(query, identity.where) - end) - |> Ash.Query.set_tenant(changeset.tenant) - - skipped_upserts = - with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query), - {:ok, results} <- run_query(ecto_query, resource) do - results - |> Enum.map(fn result -> - Ash.Resource.put_metadata(result, :upsert_skipped, true) - end) - |> Enum.reduce(%{}, fn r, acc -> - Map.put(acc, Map.take(r, keys), r) - end) - end + end - results = - changesets - |> Enum.map(fn changeset -> - identity = - changeset.attributes - |> Map.take(keys) + results = + changesets + |> Enum.map(fn changeset -> + identity = + changeset.attributes + |> Map.take(keys) - Map.get(results_by_identity, identity, Map.get(skipped_upserts, identity)) - end) - |> Enum.filter(& &1) + Map.get(results_by_identity, identity, Map.get(skipped_upserts, identity)) + end) + |> Enum.filter(& &1) - {length(results), results} - else - result - end + {length(results), results} + else + result + end case result do {_, nil} -> @@ -2113,6 +2114,12 @@ defmodule AshPostgres.DataLayer do {:ok, results} else + results_by_identity = + results + |> Enum.into(%{}, fn r -> + {Map.take(r, keys), r} + end) + results = changesets |> Enum.map(fn changeset -> From 67134766fb7392c5889b4f2e9ccb96432b43ca30 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Sat, 11 Oct 2025 13:51:12 +0200 Subject: [PATCH 12/13] update ash version --- mix.exs | 2 +- mix.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index 0ede3a11..af4055bf 100644 --- a/mix.exs +++ b/mix.exs @@ -177,7 +177,7 @@ defmodule AshPostgres.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ - {:ash, ash_version("~> 3.5 and >= 3.5.35")}, + {:ash, ash_version("~> 3.5 and >= 3.6.2")}, {:spark, "~> 2.3 and >= 2.3.4"}, {:ash_sql, ash_sql_version("~> 0.3 and >= 0.3.2")}, {:igniter, "~> 0.6 and >= 0.6.29", optional: true}, diff --git a/mix.lock b/mix.lock index 1827dcb4..05bd993c 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,5 @@ %{ - "ash": {:hex, :ash, "3.5.43", "222f9a8ac26ad3b029f8e69306cc83091c992d858b4538af12e33a148f301cab", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.6.29 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.11", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, ">= 0.2.6 and < 1.0.0-0", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "48b2aa274c524f5b968c563dd56aec8f9b278c529c8aa46e6fe0ca564c26cc1c"}, + "ash": {:hex, :ash, "3.6.2", "90d1c8296be777b90caabf51b99323d6618a0b92594dfab92b02bdf848ac38bf", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.6.29 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.11", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, ">= 0.2.6 and < 1.0.0-0", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3546b5798dd24576cc451f6e03f3d6e3bb62666c0921bfe8aae700c599d9c38d"}, "ash_sql": {:hex, :ash_sql, "0.3.2", "e2d65dac1c813cbd2569a750bf1c063109778e840052e44535ced294d7638a19", [:mix], [{:ash, ">= 3.5.43 and < 4.0.0-0", [hex: :ash, repo: "hexpm", optional: false]}, {:ecto, "~> 3.9", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.9", [hex: :ecto_sql, repo: "hexpm", optional: false]}], "hexpm", "1f6e5d827c0eb55fc5a07f58eb97f9bb3e6b290d83df75883f422537b98c9c68"}, "benchee": {:hex, :benchee, "1.4.0", "9f1f96a30ac80bab94faad644b39a9031d5632e517416a8ab0a6b0ac4df124ce", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "299cd10dd8ce51c9ea3ddb74bb150f93d25e968f93e4c1fa31698a8e4fa5d715"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, From 0c80433c605f7906949ecefb9660e7697dc42a5d Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Sat, 11 Oct 2025 13:51:24 +0200 Subject: [PATCH 13/13] fix: rebase error --- lib/data_layer.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/data_layer.ex b/lib/data_layer.ex index bd2e7603..9755eedf 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -2136,12 +2136,12 @@ defmodule AshPostgres.DataLayer do case get_bulk_operation_metadata(changeset) do {index, metadata_key} -> - Ash.Resource.put_metadata(result, metadata_key, index) + Ash.Resource.put_metadata(result_for_changeset, metadata_key, index) nil -> # Compatibility fallback Ash.Resource.put_metadata( - result, + result_for_changeset, :bulk_create_index, changeset.context[:bulk_create][:index] )