Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,6 +47,7 @@ class RecordWriter {
private final Table table;
private final String absoluteFilename;
private final FileFormat fileFormat;
private @Nullable FileIO io;

RecordWriter(
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
Expand All @@ -72,12 +74,14 @@ class RecordWriter {
}
OutputFile outputFile;
EncryptionKeyMetadata keyMetadata;
try (FileIO io = table.io()) {
OutputFile tmpFile = io.newOutputFile(absoluteFilename);
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
outputFile = encryptedOutputFile.encryptingOutputFile();
keyMetadata = encryptedOutputFile.keyMetadata();
}
// Keep FileIO open for the lifetime of this writer to avoid
// premature shutdown of underlying client pools (e.g., S3),
// which manifests as "Connection pool shut down" (Issue #36438).
this.io = table.io();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahmedabu98 does this make sense to keep it open?

OutputFile tmpFile = io.newOutputFile(absoluteFilename);
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
outputFile = encryptedOutputFile.encryptingOutputFile();
keyMetadata = encryptedOutputFile.keyMetadata();

switch (fileFormat) {
case AVRO:
Expand Down Expand Up @@ -120,16 +124,38 @@ public void write(Record record) {
}

public void close() throws IOException {
IOException closeError = null;
try {
icebergDataWriter.close();
} catch (IOException e) {
throw new IOException(
String.format(
"Failed to close %s writer for table %s, path: %s",
fileFormat, table.name(), absoluteFilename),
e);
closeError =
new IOException(
String.format(
"Failed to close %s writer for table %s, path: %s",
fileFormat, table.name(), absoluteFilename),
e);
} finally {
// Always attempt to close FileIO and decrement metrics
if (io != null) {
try {
io.close();
} catch (Exception ioCloseError) {
if (closeError != null) {
closeError.addSuppressed(ioCloseError);
} else {
closeError = new IOException("Failed to close FileIO", ioCloseError);
}
} finally {
io = null;
}
}
activeIcebergWriters.dec();
}

if (closeError != null) {
throw closeError;
}
activeIcebergWriters.dec();

DataFile dataFile = icebergDataWriter.toDataFile();
LOG.info(
"Closed {} writer for table '{}' ({} records, {} bytes), path: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.values.Row;
Expand All @@ -59,6 +60,10 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
Expand All @@ -77,6 +82,7 @@
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

/** Test class for {@link RecordWriterManager}. */
@RunWith(JUnit4.class)
Expand Down Expand Up @@ -938,4 +944,103 @@ public void testDefaultMetrics() throws IOException {
}
}
}

@Test
public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
TableIdentifier tableId =
TableIdentifier.of(
"default",
"table_"
+ testName.getMethodName()
+ "_"
+ UUID.randomUUID().toString().replace("-", "").substring(0, 6));
Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA);

CloseTrackingFileIO trackingFileIO = new CloseTrackingFileIO(table.io());
Table spyTable = Mockito.spy(table);
Mockito.doReturn(trackingFileIO).when(spyTable).io();

PartitionKey partitionKey = new PartitionKey(spyTable.spec(), spyTable.schema());
RecordWriter writer =
new RecordWriter(spyTable, FileFormat.PARQUET, "file.parquet", partitionKey);

Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();

writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
writer.close();

assertTrue("FileIO should be closed after writer close", trackingFileIO.closed);
}

private static final class CloseTrackingFileIO implements FileIO {
private final FileIO delegate;
volatile boolean closed = false;

CloseTrackingFileIO(FileIO delegate) {
this.delegate = delegate;
}

@Override
public InputFile newInputFile(String path) {
return delegate.newInputFile(path);
}

@Override
public OutputFile newOutputFile(String path) {
OutputFile underlying = delegate.newOutputFile(path);
return new CloseAwareOutputFile(underlying, this);
}

@Override
public void deleteFile(String path) {
delegate.deleteFile(path);
}

@Override
public Map<String, String> properties() {
return delegate.properties();
}

@Override
public void close() {
closed = true;
delegate.close();
}
}

private static final class CloseAwareOutputFile implements OutputFile {
private final OutputFile delegate;
private final CloseTrackingFileIO io;

CloseAwareOutputFile(OutputFile delegate, CloseTrackingFileIO io) {
this.delegate = delegate;
this.io = io;
}

@Override
public PositionOutputStream create() {
if (io.closed) {
throw new IllegalStateException("Connection pool shut down");
}
return delegate.create();
}

@Override
public PositionOutputStream createOrOverwrite() {
if (io.closed) {
throw new IllegalStateException("Connection pool shut down");
}
return delegate.createOrOverwrite();
}

@Override
public String location() {
return delegate.location();
}

@Override
public InputFile toInputFile() {
return delegate.toInputFile();
}
}
}
Loading