diff --git a/src/core/etl/src/Flow/ETL/Loader/TransformerLoader.php b/src/core/etl/src/Flow/ETL/Loader/TransformerLoader.php index 1d8433cca..233eb7141 100644 --- a/src/core/etl/src/Flow/ETL/Loader/TransformerLoader.php +++ b/src/core/etl/src/Flow/ETL/Loader/TransformerLoader.php @@ -25,12 +25,10 @@ public function closure(FlowContext $context) : void public function load(Rows $rows, FlowContext $context) : void { if ($this->transformer instanceof Transformer) { - $rows = $this->transformer->transform($rows, $context); + $this->loader->load($this->transformer->transform($rows, $context), $context); } else { - $rows = df()->from(from_rows($rows))->with($this->transformer)->fetch(); + df($context->config)->from(from_rows($rows))->with($this->transformer)->load($this->loader)->run(); } - - $this->loader->load($rows, $context); } public function loaders() : array diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Loader/TransformerLoaderTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Loader/TransformerLoaderTest.php new file mode 100644 index 000000000..53420e466 --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Loader/TransformerLoaderTest.php @@ -0,0 +1,173 @@ +read(from_array([ + ['name' => 'Alice', 'age' => 30], + ['name' => 'Bob', 'age' => 25], + ['name' => 'Charlie', 'age' => 35], + ])) + ->collect() + ->write( + to_transformation( + add_row_index('row_num', StartFrom::ONE), + to_memory($memory) + ) + ) + ->run(); + + self::assertSame( + [ + ['name' => 'Alice', 'age' => 30, 'row_num' => 1], + ['name' => 'Bob', 'age' => 25, 'row_num' => 2], + ['name' => 'Charlie', 'age' => 35, 'row_num' => 3], + ], + $memory->dump() + ); + } + + public function test_transformer_loader_with_batch_size_transformation() : void + { + $loader = $this->createMock(Loader::class); + $loader->expects(self::exactly(2)) + ->method('load'); + + df() + ->read( + new FakeStaticOrdersExtractor(1000) + ) + ->collect() + ->write( + to_transformation( + batch_size(500), + $loader + ) + ) + ->run(); + } + + public function test_transformer_loader_with_drop_transformation() : void + { + $memory = new ArrayMemory(); + + df() + ->read(from_array([ + ['id' => 1, 'name' => 'Alice', 'email' => 'alice@example.com', 'password' => 'secret123'], + ['id' => 2, 'name' => 'Bob', 'email' => 'bob@example.com', 'password' => 'secret456'], + ])) + ->write( + to_transformation( + drop('password', 'email'), + to_memory($memory) + ) + ) + ->run(); + + self::assertSame( + [ + ['id' => 1, 'name' => 'Alice'], + ['id' => 2, 'name' => 'Bob'], + ], + $memory->dump() + ); + } + + public function test_transformer_loader_with_limit_transformation() : void + { + $memory = new ArrayMemory(); + + df() + ->read(from_array([ + ['id' => 1, 'name' => 'Alice'], + ['id' => 2, 'name' => 'Bob'], + ['id' => 3, 'name' => 'Charlie'], + ['id' => 4, 'name' => 'Diana'], + ['id' => 5, 'name' => 'Eve'], + ])) + ->collect() + ->write( + to_transformation( + limit(3), + to_memory($memory) + ) + ) + ->run(); + + self::assertSame( + [ + ['id' => 1, 'name' => 'Alice'], + ['id' => 2, 'name' => 'Bob'], + ['id' => 3, 'name' => 'Charlie'], + ], + $memory->dump() + ); + } + + public function test_transformer_loader_with_mask_columns_transformation() : void + { + $memory = new ArrayMemory(); + + df() + ->read(from_array([ + ['id' => 1, 'name' => 'Alice', 'ssn' => '123-45-6789', 'email' => 'alice@example.com'], + ['id' => 2, 'name' => 'Bob', 'ssn' => '987-65-4321', 'email' => 'bob@example.com'], + ])) + ->write( + to_transformation( + mask_columns(['ssn', 'email'], '***'), + to_memory($memory) + ) + ) + ->run(); + + self::assertSame( + [ + ['id' => 1, 'name' => 'Alice', 'ssn' => '***', 'email' => '***'], + ['id' => 2, 'name' => 'Bob', 'ssn' => '***', 'email' => '***'], + ], + $memory->dump() + ); + } + + public function test_transformer_loader_with_select_transformation() : void + { + $memory = new ArrayMemory(); + + df() + ->read(from_array([ + ['id' => 1, 'name' => 'Alice', 'email' => 'alice@example.com', 'age' => 30], + ['id' => 2, 'name' => 'Bob', 'email' => 'bob@example.com', 'age' => 25], + ])) + ->write( + to_transformation( + select('name', 'email'), + to_memory($memory) + ) + ) + ->run(); + + self::assertSame( + [ + ['name' => 'Alice', 'email' => 'alice@example.com'], + ['name' => 'Bob', 'email' => 'bob@example.com'], + ], + $memory->dump() + ); + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Loader/TransformerLoaderTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Loader/TransformerLoaderTest.php index 3a9fec52d..c8352a60f 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Loader/TransformerLoaderTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Loader/TransformerLoaderTest.php @@ -7,9 +7,7 @@ use function Flow\ETL\DSL\{config, df, flow_context, from_array, ref, rows, to_memory, to_transformation}; use function Flow\Types\DSL\type_string; use Flow\ETL\{DataFrame, - FlowContext, Loader, - Loader\Closure, Memory\ArrayMemory, Tests\FlowTestCase, Transformation, @@ -36,44 +34,6 @@ public function test_transformer_loader() : void $transformer->load(rows(), flow_context(config())); } - /** - * Tests that the closure method is called when using a Closure loader. - */ - public function test_transformer_loader_with_closure() : void - { - $closure_loader = $this->createMockForIntersectionOfInterfaces([Loader::class, Closure::class]); - - $closure_loader->expects(self::once()) - ->method('closure') - ->with(self::isInstanceOf(FlowContext::class)); - - \assert($closure_loader instanceof Loader); - $transformer = to_transformation( - new class implements Transformation { - public function transform(DataFrame $data_frame) : DataFrame - { - return $data_frame; - } - }, - $closure_loader - ); - - df() - ->read( - from_array( - [ - ['id' => 1], - ['id' => 2], - ['id' => 3], - ] - ) - ) - ->write( - $transformer - ) - ->run(); - } - public function test_transformer_loader_with_transformation() : void { df()