Skip to content

Commit 089942f

Browse files
committed
Address Vinish remaining comments
1 parent 46baa21 commit 089942f

File tree

7 files changed

+304
-600
lines changed

7 files changed

+304
-600
lines changed

xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@
1818

1919
package org.apache.xtable.conversion;
2020

21+
import static org.apache.xtable.model.storage.TableFormat.DELTA;
22+
import static org.apache.xtable.model.storage.TableFormat.HUDI;
23+
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
24+
25+
import org.apache.hadoop.conf.Configuration;
26+
27+
import org.apache.hudi.common.table.timeline.HoodieInstant;
28+
29+
import org.apache.xtable.delta.DeltaConversionSourceProvider;
30+
import org.apache.xtable.hudi.HudiConversionSourceProvider;
31+
import org.apache.xtable.iceberg.IcebergConversionSourceProvider;
32+
2133
public class ConversionUtils {
2234

2335
public static SourceTable convertToSourceTable(TargetTable table) {
@@ -30,4 +42,26 @@ public static SourceTable convertToSourceTable(TargetTable table) {
3042
table.getCatalogConfig(),
3143
table.getAdditionalProperties());
3244
}
45+
46+
public static ConversionSourceProvider<?> getConversionSourceProvider(
47+
String sourceTableFormat, Configuration hadoopConf) {
48+
if (sourceTableFormat.equalsIgnoreCase(HUDI)) {
49+
ConversionSourceProvider<HoodieInstant> hudiConversionSourceProvider =
50+
new HudiConversionSourceProvider();
51+
hudiConversionSourceProvider.init(hadoopConf);
52+
return hudiConversionSourceProvider;
53+
} else if (sourceTableFormat.equalsIgnoreCase(DELTA)) {
54+
ConversionSourceProvider<Long> deltaConversionSourceProvider =
55+
new DeltaConversionSourceProvider();
56+
deltaConversionSourceProvider.init(hadoopConf);
57+
return deltaConversionSourceProvider;
58+
} else if (sourceTableFormat.equalsIgnoreCase(ICEBERG)) {
59+
ConversionSourceProvider<org.apache.iceberg.Snapshot> icebergConversionSourceProvider =
60+
new IcebergConversionSourceProvider();
61+
icebergConversionSourceProvider.init(hadoopConf);
62+
return icebergConversionSourceProvider;
63+
} else {
64+
throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat);
65+
}
66+
}
3367
}

xtable-service/src/main/java/org/apache/xtable/service/ConversionService.java

Lines changed: 87 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,32 @@
1818

1919
package org.apache.xtable.service;
2020

21-
import static org.apache.xtable.model.storage.TableFormat.DELTA;
22-
import static org.apache.xtable.model.storage.TableFormat.HUDI;
23-
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
21+
import static org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
2422

2523
import java.util.ArrayList;
2624
import java.util.List;
2725

28-
import org.apache.commons.lang3.tuple.Pair;
26+
import lombok.extern.log4j.Log4j2;
27+
28+
import org.apache.hadoop.conf.Configuration;
29+
30+
import org.apache.iceberg.SchemaParser;
2931

3032
import com.google.common.annotations.VisibleForTesting;
3133

34+
import org.apache.xtable.avro.AvroSchemaConverter;
3235
import org.apache.xtable.conversion.ConversionConfig;
3336
import org.apache.xtable.conversion.ConversionController;
34-
import org.apache.xtable.conversion.ConversionSourceProvider;
37+
import org.apache.xtable.conversion.ConversionUtils;
3538
import org.apache.xtable.conversion.SourceTable;
3639
import org.apache.xtable.conversion.TargetTable;
40+
import org.apache.xtable.iceberg.IcebergSchemaExtractor;
41+
import org.apache.xtable.model.InternalTable;
42+
import org.apache.xtable.model.storage.TableFormat;
43+
import org.apache.xtable.schema.SparkSchemaExtractor;
3744
import org.apache.xtable.service.models.ConvertTableRequest;
3845
import org.apache.xtable.service.models.ConvertTableResponse;
3946
import org.apache.xtable.service.models.ConvertedTable;
40-
import org.apache.xtable.service.spark.SparkHolder;
41-
import org.apache.xtable.service.utils.ConversionServiceUtil;
4247

4348
import jakarta.enterprise.context.ApplicationScoped;
4449
import jakarta.inject.Inject;
@@ -50,40 +55,61 @@
5055
* a source table, generating target tables, and then executing the conversion via a designated
5156
* conversion controller.
5257
*/
58+
@Log4j2
5359
@ApplicationScoped
5460
public class ConversionService {
55-
private final SparkHolder sparkHolder;
5661
private final ConversionController conversionController;
57-
private final ConversionServiceUtil conversionServiceUtil;
62+
private final ConversionServiceConfig serviceConfig;
63+
private final Configuration hadoopConf;
5864

5965
/**
6066
* Constructs a ConversionService instance with required dependencies.
6167
*
62-
* @param sparkHolder the Spark holder instance containing the Spark context and configuration
63-
* @param conversionServiceUtil utility for handling metadata operations
68+
* @param serviceConfig the conversion service configuration
6469
*/
6570
@Inject
66-
public ConversionService(SparkHolder sparkHolder, ConversionServiceUtil conversionServiceUtil) {
67-
this.sparkHolder = sparkHolder;
68-
this.conversionServiceUtil = conversionServiceUtil;
69-
this.conversionController = new ConversionController(sparkHolder.jsc().hadoopConfiguration());
71+
public ConversionService(ConversionServiceConfig serviceConfig) {
72+
this.serviceConfig = serviceConfig;
73+
this.hadoopConf = getHadoopConf();
74+
this.conversionController = new ConversionController(hadoopConf);
75+
}
76+
77+
private Configuration getHadoopConf() {
78+
Configuration conf = new Configuration();
79+
String hadoopConfigPath = serviceConfig.getHadoopConfigPath();
80+
try {
81+
// Load configuration from the specified XML file
82+
conf.addResource(hadoopConfigPath);
83+
84+
// If the resource wasn’t found, log a warning
85+
if (conf.size() == 0) {
86+
log.warn(
87+
"Could not load Hadoop configuration from: {}. Using default Hadoop configuration.",
88+
hadoopConfigPath);
89+
}
90+
} catch (Exception e) {
91+
log.error(
92+
"Error loading Hadoop configuration from: {}. Exception: {}",
93+
hadoopConfigPath,
94+
e.getMessage(),
95+
e);
96+
}
97+
return conf;
7098
}
7199

72100
/**
73101
* Constructs a ConversionService instance using dependency injection for testing.
74102
*
75-
* @param sparkHolder the Spark holder instance
76103
* @param conversionController a preconfigured conversion controller
77-
* @param conversionServiceUtil utility for handling metadata operations
78104
*/
79105
@VisibleForTesting
80106
public ConversionService(
81-
SparkHolder sparkHolder,
82-
ConversionServiceUtil conversionServiceUtil,
83-
ConversionController conversionController) {
84-
this.sparkHolder = sparkHolder;
107+
ConversionServiceConfig serviceConfig,
108+
ConversionController conversionController,
109+
Configuration hadoopConf) {
110+
this.serviceConfig = serviceConfig;
85111
this.conversionController = conversionController;
86-
this.conversionServiceUtil = conversionServiceUtil;
112+
this.hadoopConf = hadoopConf;
87113
}
88114

89115
/**
@@ -120,37 +146,47 @@ public ConvertTableResponse convertTable(ConvertTableRequest convertTableRequest
120146
ConversionConfig conversionConfig =
121147
ConversionConfig.builder().sourceTable(sourceTable).targetTables(targetTables).build();
122148

123-
ConversionSourceProvider<?> conversionSourceProvider =
124-
conversionServiceUtil.getConversionSourceProvider(
125-
convertTableRequest.getSourceFormat(), sparkHolder.jsc().hadoopConfiguration());
149+
conversionController.sync(
150+
conversionConfig,
151+
ConversionUtils.getConversionSourceProvider(
152+
convertTableRequest.getSourceFormat(), hadoopConf));
126153

127-
conversionController.sync(conversionConfig, conversionSourceProvider);
128-
129-
List<ConvertedTable> restTargetTables = new ArrayList<>();
130-
for (String targetFormat : convertTableRequest.getTargetFormats()) {
131-
if (targetFormat.equals(ICEBERG)) {
132-
Pair<String, String> responseFields =
133-
conversionServiceUtil.getIcebergSchemaAndMetadataPath(
134-
convertTableRequest.getSourceTablePath(), sparkHolder.jsc().hadoopConfiguration());
135-
ConvertedTable icebergTable =
136-
new ConvertedTable(ICEBERG, responseFields.getLeft(), responseFields.getRight());
137-
restTargetTables.add(icebergTable);
138-
} else if (targetFormat.equals(HUDI)) {
139-
Pair<String, String> responseFields =
140-
conversionServiceUtil.getHudiSchemaAndMetadataPath(
141-
convertTableRequest.getSourceTablePath(), sparkHolder.jsc().hadoopConfiguration());
142-
ConvertedTable hudiTable =
143-
new ConvertedTable(HUDI, responseFields.getLeft(), responseFields.getRight());
144-
restTargetTables.add(hudiTable);
145-
} else if (targetFormat.equals(DELTA)) {
146-
Pair<String, String> responseFields =
147-
conversionServiceUtil.getDeltaSchemaAndMetadataPath(
148-
convertTableRequest.getSourceTablePath(), sparkHolder.spark());
149-
ConvertedTable deltaTable =
150-
new ConvertedTable(DELTA, responseFields.getLeft(), responseFields.getRight());
151-
restTargetTables.add(deltaTable);
154+
List<ConvertedTable> convertedTables = new ArrayList<>();
155+
for (TargetTable targetTable : targetTables) {
156+
InternalTable internalTable =
157+
ConversionUtils.getConversionSourceProvider(targetTable.getFormatName(), hadoopConf)
158+
.getConversionSourceInstance(convertToSourceTable(targetTable))
159+
.getCurrentTable();
160+
String schemaString;
161+
switch (targetTable.getFormatName()) {
162+
case TableFormat.HUDI:
163+
schemaString =
164+
AvroSchemaConverter.getInstance()
165+
.fromInternalSchema(internalTable.getReadSchema())
166+
.toString();
167+
break;
168+
case TableFormat.ICEBERG:
169+
org.apache.iceberg.Schema iceSchema =
170+
IcebergSchemaExtractor.getInstance().toIceberg(internalTable.getReadSchema());
171+
schemaString = SchemaParser.toJson(iceSchema);
172+
break;
173+
case TableFormat.DELTA:
174+
schemaString =
175+
SparkSchemaExtractor.getInstance()
176+
.fromInternalSchema(internalTable.getReadSchema())
177+
.json();
178+
break;
179+
default:
180+
throw new UnsupportedOperationException(
181+
"Unsupported table format: " + targetTable.getFormatName());
152182
}
183+
convertedTables.add(
184+
ConvertedTable.builder()
185+
.targetFormat(internalTable.getName())
186+
.targetSchema(schemaString)
187+
.targetMetadataPath(internalTable.getLatestMetdataPath())
188+
.build());
153189
}
154-
return new ConvertTableResponse(restTargetTables);
190+
return new ConvertTableResponse(convertedTables);
155191
}
156192
}

xtable-service/src/main/java/org/apache/xtable/service/spark/SparkBootstrap.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

xtable-service/src/main/java/org/apache/xtable/service/spark/SparkHolder.java

Lines changed: 0 additions & 114 deletions
This file was deleted.

0 commit comments

Comments
 (0)