Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/core/etl/src/Flow/ETL/Loader/TransformerLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ public function closure(FlowContext $context) : void
public function load(Rows $rows, FlowContext $context) : void
{
if ($this->transformer instanceof Transformer) {
$rows = $this->transformer->transform($rows, $context);
$this->loader->load($this->transformer->transform($rows, $context), $context);
} else {
$rows = df()->from(from_rows($rows))->with($this->transformer)->fetch();
df($context->config)->from(from_rows($rows))->with($this->transformer)->load($this->loader)->run();
}

$this->loader->load($rows, $context);
}

public function loaders() : array
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\Loader;

use function Flow\ETL\DSL\{add_row_index, batch_size, df, drop, from_array, limit, mask_columns, select, to_memory, to_transformation};
use Flow\ETL\Loader;
use Flow\ETL\Memory\ArrayMemory;
use Flow\ETL\Tests\Double\FakeStaticOrdersExtractor;
use Flow\ETL\Tests\FlowTestCase;
use Flow\ETL\Transformation\AddRowIndex\StartFrom;

final class TransformerLoaderTest extends FlowTestCase
{
public function test_transformer_loader_with_add_row_index_transformation() : void
{
$memory = new ArrayMemory();

df()
->read(from_array([
['name' => 'Alice', 'age' => 30],
['name' => 'Bob', 'age' => 25],
['name' => 'Charlie', 'age' => 35],
]))
->collect()
->write(
to_transformation(
add_row_index('row_num', StartFrom::ONE),
to_memory($memory)
)
)
->run();

self::assertSame(
[
['name' => 'Alice', 'age' => 30, 'row_num' => 1],
['name' => 'Bob', 'age' => 25, 'row_num' => 2],
['name' => 'Charlie', 'age' => 35, 'row_num' => 3],
],
$memory->dump()
);
}

public function test_transformer_loader_with_batch_size_transformation() : void
{
$loader = $this->createMock(Loader::class);
$loader->expects(self::exactly(2))
->method('load');

df()
->read(
new FakeStaticOrdersExtractor(1000)
)
->collect()
->write(
to_transformation(
batch_size(500),
$loader
)
)
->run();
}

public function test_transformer_loader_with_drop_transformation() : void
{
$memory = new ArrayMemory();

df()
->read(from_array([
['id' => 1, 'name' => 'Alice', 'email' => '[email protected]', 'password' => 'secret123'],
['id' => 2, 'name' => 'Bob', 'email' => '[email protected]', 'password' => 'secret456'],
]))
->write(
to_transformation(
drop('password', 'email'),
to_memory($memory)
)
)
->run();

self::assertSame(
[
['id' => 1, 'name' => 'Alice'],
['id' => 2, 'name' => 'Bob'],
],
$memory->dump()
);
}

public function test_transformer_loader_with_limit_transformation() : void
{
$memory = new ArrayMemory();

df()
->read(from_array([
['id' => 1, 'name' => 'Alice'],
['id' => 2, 'name' => 'Bob'],
['id' => 3, 'name' => 'Charlie'],
['id' => 4, 'name' => 'Diana'],
['id' => 5, 'name' => 'Eve'],
]))
->collect()
->write(
to_transformation(
limit(3),
to_memory($memory)
)
)
->run();

self::assertSame(
[
['id' => 1, 'name' => 'Alice'],
['id' => 2, 'name' => 'Bob'],
['id' => 3, 'name' => 'Charlie'],
],
$memory->dump()
);
}

public function test_transformer_loader_with_mask_columns_transformation() : void
{
$memory = new ArrayMemory();

df()
->read(from_array([
['id' => 1, 'name' => 'Alice', 'ssn' => '123-45-6789', 'email' => '[email protected]'],
['id' => 2, 'name' => 'Bob', 'ssn' => '987-65-4321', 'email' => '[email protected]'],
]))
->write(
to_transformation(
mask_columns(['ssn', 'email'], '***'),
to_memory($memory)
)
)
->run();

self::assertSame(
[
['id' => 1, 'name' => 'Alice', 'ssn' => '***', 'email' => '***'],
['id' => 2, 'name' => 'Bob', 'ssn' => '***', 'email' => '***'],
],
$memory->dump()
);
}

public function test_transformer_loader_with_select_transformation() : void
{
$memory = new ArrayMemory();

df()
->read(from_array([
['id' => 1, 'name' => 'Alice', 'email' => '[email protected]', 'age' => 30],
['id' => 2, 'name' => 'Bob', 'email' => '[email protected]', 'age' => 25],
]))
->write(
to_transformation(
select('name', 'email'),
to_memory($memory)
)
)
->run();

self::assertSame(
[
['name' => 'Alice', 'email' => '[email protected]'],
['name' => 'Bob', 'email' => '[email protected]'],
],
$memory->dump()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
use function Flow\ETL\DSL\{config, df, flow_context, from_array, ref, rows, to_memory, to_transformation};
use function Flow\Types\DSL\type_string;
use Flow\ETL\{DataFrame,
FlowContext,
Loader,
Loader\Closure,
Memory\ArrayMemory,
Tests\FlowTestCase,
Transformation,
Expand All @@ -36,44 +34,6 @@ public function test_transformer_loader() : void
$transformer->load(rows(), flow_context(config()));
}

/**
* Tests that the closure method is called when using a Closure loader.
*/
public function test_transformer_loader_with_closure() : void
{
$closure_loader = $this->createMockForIntersectionOfInterfaces([Loader::class, Closure::class]);

$closure_loader->expects(self::once())
->method('closure')
->with(self::isInstanceOf(FlowContext::class));

\assert($closure_loader instanceof Loader);
$transformer = to_transformation(
new class implements Transformation {
public function transform(DataFrame $data_frame) : DataFrame
{
return $data_frame;
}
},
$closure_loader
);

df()
->read(
from_array(
[
['id' => 1],
['id' => 2],
['id' => 3],
]
)
)
->write(
$transformer
)
->run();
}

public function test_transformer_loader_with_transformation() : void
{
df()
Expand Down
Loading