diff --git a/.gitignore b/.gitignore index 3edd668..fd597b3 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,9 @@ /classpath/ build/ .idea +/.settings/ +/.metadata/ +.classpath +.project *.iml +out diff --git a/README.md b/README.md index 0e1dca8..264a784 100644 --- a/README.md +++ b/README.md @@ -9,20 +9,35 @@ This plugin combine rows from file having data format like a table, based on a c ## Configuration * **on**: - * **in_column**: name of the column on input. (string, required) - * **file_column**: name of the column on file. (string, default is the same as **in_column**) + * **input_column**: name of the column on input. (string, required) + * **file_column**: name of the column on file. (string, required) * **file**: - * **path**: path of file (string, required) - * **format**: file format (string, required, supported: `json`) - * **encode**: file encode (string, default is `raw`, supported: `raw`, `gzip`) + * **path_prefix**: Path prefix of input files (string, required) + * **parser**: Parser configurations except **columns** option (see below [Supported Parser Type](#supported-parser-type)) (hash, required) + * **decoders**: Decoder configuration (see below [Supported Decorder Type](#supported-decorder-type)) (array of hash, optional) + * **follow_symlinks**: If true, follow symbolic link directories (boolean, default: `false`) * **columns**: required columns of data from the file (array of hash, required) * **name**: name of the column - * **type**: type of the column (see below) + * **type**: type of the column (see below [Type of the column](#type-of-the-column)) * **format**: format of the timestamp if type is timestamp - * **timezone**: timezone of the timestamp if type is timestamp + * **timezone**: timezone of the timestamp if type is timestamp + * **column_prefix**: column name prefix added to file `columns` for prevent duplicating column name (string, default: `"_join_by_embulk_""`) + * **parser_plugin_columns_option**: Set the **file.columns** value as this option name into **file.parser** options. (optional, default: `"columns"`, see [Supported Parser Type](#supported-parser-type) in details.) ---- -**type of the column** +### Supported Parser Type + +* You can use all embulk file-parser plugins. + * [built-in parser plugins](http://www.embulk.org/docs/built-in.html) + * [parser plugins](http://www.embulk.org/plugins/#file-parser). +* You don't need to define the option like **columns** into **file.parser** options, because **file.columns** value is set into **file.parser**'s **columns** option. If you set a value to **file.parser_plugin_columns_option**, this plugin sets **file.columns** value as the option name that is set into into **file.parser** options as **file.parser_plugin_columns_option** option. + +### Supported Decorder Type + +* You can use all embulk file-decorder plugins. + * [built-in decorder plugins](http://www.embulk.org/docs/built-in.html) + * [decorder plugins](http://www.embulk.org/plugins/#file-decoder) + +### Type of the column |name|description| |:---|:---| @@ -39,47 +54,30 @@ This plugin combine rows from file having data format like a table, based on a c filters: - type: join_file on: - in_column: name_id + input_column: id file_column: id file: - path: ./master.json - format: json - encode: raw + path_prefix: ./example/json_array_of_hash/*.json + parser: + type: jsonpath + root: "$." columns: - {name: id, type: long} - {name: name, type: string} - joined_column_prefix: _joined_by_embulk_ + - {name: created_at, type: timestamp, format: "%Y-%m-%d"} + - {name: point, type: double} + - {name: time_zone, type: string} + column_prefix: _join_by_embulk_ ``` +See [more examples](./example). + ## Run Example ``` $ ./gradlew classpath -$ embulk run -I lib example/config.yml -``` - -## Supported Data Format -* json - -### Supported Data Format Example - -#### JSON - -``` -[ - { - "id": 0, - "name": "civitaspo" - }, - { - "id": 2, - "name": "moriogai" - }, - { - "id": 5, - "name": "natsume.soseki" - } -] +$ embulk bundle install --gemfile=example/Gemfile --path vendor/bundle +$ embulk run -b example -Ilib example/config.yml ``` ## Build diff --git a/build.gradle b/build.gradle index 7fed37f..57c05ef 100644 --- a/build.gradle +++ b/build.gradle @@ -14,12 +14,19 @@ configurations { } version = "0.1.0" -sourceCompatibility = 1.7 -targetCompatibility = 1.7 + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 dependencies { - compile "org.embulk:embulk-core:0.8.+" - provided "org.embulk:embulk-core:0.8.+" + compile "org.embulk:embulk-core:0.8.29" + provided "org.embulk:embulk-core:0.8.29" + compile "org.embulk:embulk-standards:0.8.29" + provided "org.embulk:embulk-standards:0.8.29" + compile "com.google.guava:guava:23.0" + provided "com.google.guava:guava:23.0" + compile "org.komamitsu:fluency:1.4.0" + compile "com.okumin:influent-java:0.3.0" testCompile "junit:junit:4.+" } @@ -46,6 +53,7 @@ task checkstyle(type: Checkstyle) { classpath = sourceSets.main.output + sourceSets.test.output source = sourceSets.main.allJava + sourceSets.test.allJava } + task gem(type: JRubyExec, dependsOn: ["gemspec", "classpath"]) { jrubyArgs "-rrubygems/gem_runner", "-eGem::GemRunner.new.run(ARGV)", "build" script "${project.name}.gemspec" @@ -57,9 +65,11 @@ task gemPush(type: JRubyExec, dependsOn: ["gem"]) { script "pkg/${project.name}-${project.version}.gem" } -task "package"(dependsOn: ["gemspec", "classpath"]) << { - println "> Build succeeded." - println "> You can run embulk with '-L ${file(".").absolutePath}' argument." +task "package"(dependsOn: ["gemspec", "classpath"]) { + doLast { + println "> Build succeeded." + println "> You can run embulk with '-L ${file(".").absolutePath}' argument." + } } task gemspec { diff --git a/example/Gemfile b/example/Gemfile new file mode 100644 index 0000000..4a0892f --- /dev/null +++ b/example/Gemfile @@ -0,0 +1,3 @@ +source 'https://rubygems.org' + +gem 'embulk-parser-jsonpath' diff --git a/example/config.yml b/example/config.yml deleted file mode 100644 index 90198bf..0000000 --- a/example/config.yml +++ /dev/null @@ -1,32 +0,0 @@ -in: - type: file - path_prefix: example/data.csv - parser: - type: csv - charset: UTF-8 - newline: CRLF - null_string: 'NULL' - skip_header_lines: 1 - comment_line_marker: '#' - columns: - - {name: time, type: timestamp, format: "%Y-%m-%d"} - - {name: id, type: long} - - {name: name, type: string} - - {name: score, type: double} - -filters: - - type: join_file - base_column: {name: id, type: long} - counter_column: {name: id, type: long} - joined_column_prefix: _joined_by_embulk_ - file_path: example/master.json - file_format: json - columns: - - {name: id, type: long} - - {name: name, type: string} - - {name: created_at, type: timestamp, format: "%Y-%m-%d"} - - {name: point, type: double} - - {name: time_zone, type: string} - -out: - type: stdout diff --git a/example/json_array_of_hash/config.yml b/example/json_array_of_hash/config.yml new file mode 100644 index 0000000..2f0ffbf --- /dev/null +++ b/example/json_array_of_hash/config.yml @@ -0,0 +1,37 @@ +in: + type: file + path_prefix: example/data.csv + parser: + type: csv + charset: UTF-8 + newline: CRLF + null_string: 'NULL' + skip_header_lines: 1 + comment_line_marker: '#' + columns: + - {name: time, type: timestamp, format: "%Y-%m-%d"} + - {name: id, type: long} + - {name: name, type: string} + - {name: score, type: double} + +filters: + - type: join_file + on: + input_column: id + file_column: id + file: + path_prefix: ./example/json_array_of_hash/*.json + parser: + type: jsonpath + root: "$." + columns: + - {name: id, type: long} + - {name: name, type: string} + - {name: created_at, type: timestamp, format: "%Y-%m-%d"} + - {name: point, type: double} + - {name: time_zone, type: string} + column_prefix: _join_by_embulk_ + + +out: + type: stdout diff --git a/example/master.json b/example/json_array_of_hash/master.json similarity index 100% rename from example/master.json rename to example/json_array_of_hash/master.json diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 51288f9..deedc7f 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 890aec9..722184f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,4 +1,4 @@ -#Wed Jun 21 00:12:09 JST 2017 +#Sun Jan 08 00:35:58 PST 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index 4453cce..9aa616c 100755 --- a/gradlew +++ b/gradlew @@ -1,4 +1,4 @@ -#!/usr/bin/env sh +#!/usr/bin/env bash ############################################################################## ## @@ -154,19 +154,16 @@ if $cygwin ; then esac fi -# Escape application args -save ( ) { - for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done - echo " " +# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules +function splitJvmOpts() { + JVM_OPTS=("$@") } -APP_ARGS=$(save "$@") - -# Collect all arguments for the java command, following the shell quoting and substitution rules -eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" +eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS +JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" # by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong -if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then +if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then cd "$(dirname "$0")" fi -exec "$JAVACMD" "$@" +exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" diff --git a/lib/embulk/filter/join_file.rb b/lib/embulk/filter/join_file.rb index a81c46f..464b299 100644 --- a/lib/embulk/filter/join_file.rb +++ b/lib/embulk/filter/join_file.rb @@ -1,3 +1,7 @@ Embulk::JavaPlugin.register_filter( "join_file", "org.embulk.filter.join_file.JoinFileFilterPlugin", File.expand_path('../../../../classpath', __FILE__)) + +Embulk::JavaPlugin.register_output( + "internal_forward", "org.embulk.filter.join_file.plugin.InternalForwardOutputPlugin", + File.expand_path('../../../../classpath', __FILE__)) diff --git a/src/main/java/org/embulk/filter/join_file/FileLoader.java b/src/main/java/org/embulk/filter/join_file/FileLoader.java new file mode 100644 index 0000000..a5ddef6 --- /dev/null +++ b/src/main/java/org/embulk/filter/join_file/FileLoader.java @@ -0,0 +1,155 @@ +package org.embulk.filter.join_file; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Service; +import com.google.common.util.concurrent.ServiceManager; +import org.embulk.config.Config; +import org.embulk.config.ConfigException; +import org.embulk.config.ConfigSource; +import org.embulk.filter.join_file.JoinFileFilterPlugin.FileTask; +import org.embulk.filter.join_file.join.Joinee; +import org.embulk.filter.join_file.join.JoineeBuilder; +import org.embulk.filter.join_file.plugin.InternalForwardOutputPlugin; +import org.embulk.spi.DataException; +import org.embulk.spi.Exec; +import org.embulk.spi.Schema; +import org.embulk.standards.LocalFileInputPlugin; +import org.slf4j.Logger; +import pro.civitaspo.embulk.forward.InForwardEventReader; +import pro.civitaspo.embulk.forward.InForwardService; +import pro.civitaspo.embulk.runner.AsyncEmbulkRunnerService; +import pro.civitaspo.embulk.runner.EmbulkRunner; +import pro.civitaspo.embulk.spi.ElapsedTime; +import pro.civitaspo.embulk.spi.StandardColumnVisitor; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; + +import static org.embulk.filter.join_file.plugin.InternalForwardOutputPlugin.PLUGIN_NAME; + +public class FileLoader +{ + private static final String LOCK = "ForInForwardServerCallback"; + private static final Logger logger = Exec.getLogger(FileLoader.class); + + + public static Joinee loadFile(FileTask task) + { + FileTask.putParserColumnsIfAbsent(task); + + Schema fileSchema = task.getColumns().toSchema(); + JoineeBuilder builder = new JoineeBuilder(fileSchema); + InForwardEventReader reader = new InForwardEventReader(fileSchema); + StandardColumnVisitor visitor = new StandardColumnVisitor(reader, builder); + + Service inForwardService = InForwardService.builder() + .task(task) + .forEachEventCallback(es -> + { + synchronized (LOCK) { + reader.setEvent(es); + + while (reader.nextRecord()) { + fileSchema.visitColumns(visitor); + builder.addRecord(); + } + } + }) + .build(); + + Service embulkRunnerService = buildEmbulkService(task); + + runServices(ImmutableList.of(inForwardService, embulkRunnerService)); + + return builder.build(); + } + + private static Service buildEmbulkService(FileTask task) + { + EmbulkRunner.Builder builder = EmbulkRunner.builder(); + + ConfigSource inputConfig = Exec.newConfigSource(); + inputConfig.set("type", "file"); // TODO: p-r to define LocalFileInputPlugin.PLUGIN_NAME = "file" + for (Method m : LocalFileInputPlugin.PluginTask.class.getMethods()) { + Config a = m.getAnnotation(Config.class); + if (a == null) { + continue; + } + try { + inputConfig.set(a.value(), m.invoke(task)); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new ConfigException(e); + } + } + inputConfig.set("parser", task.getParser()); + inputConfig.set("decoders", task.getDecorders()); + builder.inputConfig(inputConfig); + + ConfigSource outputConfig = Exec.newConfigSource(); + outputConfig.set("type", PLUGIN_NAME); + for (Method m : InternalForwardOutputPlugin.PluginTask.class.getMethods()) { + Config a = m.getAnnotation(Config.class); + if (a == null) { + continue; + } + try { + outputConfig.set(a.value(), m.invoke(task)); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new ConfigException(e); + } + } + builder.outputConfig(outputConfig); + + EmbulkRunner runner = builder.build(); + return new AsyncEmbulkRunnerService(runner); + } + + private static void runServices(List services) + { + ServiceManager m = new ServiceManager(services); + m.addListener(new ServiceManager.Listener() { + @Override + public void failure(Service service) + { + throw new DataException(service.failureCause()); + } + }); + + ElapsedTime.measureWithPolling(new ElapsedTime.Pollable() { + @Override + public boolean poll() + { + return m.isHealthy(); + } + + @Override + public Void getResult() + { + return null; + } + + @Override + public void onStart() + { + logger.info("FileLoader: Start"); + m.startAsync().awaitHealthy(); + } + + @Override + public void onWaiting(long elapsedMillis) + { + logger.info("FileLoader: Running (Elapsed: {}ms)", elapsedMillis); + } + + @Override + public void onFinished(long elapsedMillis) + { + logger.info("FileLoader: Finished (Elapsed: {}ms)", elapsedMillis); + m.awaitStopped(); + } + }); + } +} diff --git a/src/main/java/org/embulk/filter/join_file/JoinFileFilterPlugin.java b/src/main/java/org/embulk/filter/join_file/JoinFileFilterPlugin.java index de61418..e4ecb6e 100644 --- a/src/main/java/org/embulk/filter/join_file/JoinFileFilterPlugin.java +++ b/src/main/java/org/embulk/filter/join_file/JoinFileFilterPlugin.java @@ -1,156 +1,127 @@ package org.embulk.filter.join_file; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; +import com.google.common.collect.Lists; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskSource; +import org.embulk.filter.join_file.join.Joinee; import org.embulk.spi.Column; -import org.embulk.spi.ColumnConfig; -import org.embulk.spi.Exec; import org.embulk.spi.FilterPlugin; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema; -import org.embulk.spi.time.TimestampParser; -import org.embulk.spi.type.Types; -import org.joda.time.DateTimeZone; -import org.jruby.embed.ScriptingContainer; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; +import org.embulk.spi.SchemaConfig; +import org.embulk.standards.LocalFileInputPlugin; +import pro.civitaspo.embulk.forward.InForwardService; +import pro.civitaspo.embulk.forward.OutForwardService; + import java.util.List; -import java.util.Map; public class JoinFileFilterPlugin implements FilterPlugin { - private static final Logger logger = Exec.getLogger(JoinFileFilterPlugin.class); - public interface PluginTask - extends Task, TimestampParser.Task + extends Task { - @Config("base_column") - ColumnConfig getBaseColumn(); - - @Config("counter_column") - @ConfigDefault("{name: id, type: long}") - ColumnConfig getCounterColumn(); + @Config("on") + OnTask getOnTask(); - @Config("joined_column_prefix") - @ConfigDefault("\"_joined_by_embulk_\"") - String getJoinedColumnPrefix(); + @Config("file") + FileTask getFileTask(); - @Config("file_path") - String getFilePath(); + Joinee getJoinee(); + void setJoinee(Joinee joinee); - @Config("file_format") - String getFileFormat(); + static Schema buildOutputSchema(PluginTask task, Schema inputSchema) + { + Schema.Builder builder = Schema.builder(); - @Config("columns") - List getColumns(); + for (Column c : inputSchema.getColumns()) { + builder.add(c.getName(), c.getType()); + } - @Config("time_zone") - @ConfigDefault("\"UTC\"") - String getTimeZone(); + FileTask fileTask = task.getFileTask(); + Schema joineeSchema = fileTask.getColumns().toSchema(); + for (Column c : joineeSchema.getColumns()) { + String joinedColumn = new StringBuilder() + // NOTE: `FileTask#getColumnPrefix()` is only used here, + // because this plugin manage columns only by index hereafter. + .append(fileTask.getColumnPrefix()) + .append(c.getName()) + .toString(); + builder.add(joinedColumn, c.getType()); + } - HashMap> getTable(); - void setTable(HashMap> jsonTable); + return builder.build(); + } } - @Override - public void transaction(ConfigSource config, Schema inputSchema, - FilterPlugin.Control control) + public interface OnTask + extends Task { - PluginTask task = config.loadConfig(PluginTask.class); - - try { - TableBuilder tableBuilder = new TableBuilder( - task.getFilePath(), - task.getFileFormat(), - task.getColumns(), - task.getCounterColumn().getName(), - task.getJoinedColumnPrefix()); - - task.setTable(tableBuilder.build()); - } - catch (IOException e) { - logger.error(e.getMessage()); - throw new RuntimeException(e); - } + @Config("input_column") + String getInputColumnName(); - Schema outputSchema = buildOutputSchema(inputSchema, task.getColumns(), task.getJoinedColumnPrefix()); - logger.info("output schema: {}", outputSchema); - - control.run(task.dump(), outputSchema); + @Config("file_column") + String getFileColumnName(); } - @Override - public PageOutput open(TaskSource taskSource, Schema inputSchema, - Schema outputSchema, PageOutput output) + public interface FileTask + extends Task, LocalFileInputPlugin.PluginTask, InForwardService.Task, OutForwardService.Task { - PluginTask task = taskSource.loadTask(PluginTask.class); - - // create joinColumns/baseColumn - final List outputColumns = outputSchema.getColumns(); - final List inputColumns = inputSchema.getColumns(); - - Map inputColumnMap = Maps.newHashMap(); - final List joinColumns = new ArrayList<>(); - for (Column column : outputColumns) { - if (!inputColumns.contains(column)) { - joinColumns.add(column); - } else { - inputColumnMap.put(column.getName(), column); - } - } + @Config("parser") + ConfigSource getParser(); + void setParser(ConfigSource parser); - final Column baseColumn = inputColumnMap.get(task.getBaseColumn().getName()); + @Config("decoders") + @ConfigDefault("[]") + List getDecorders(); - final HashMap timestampParserMap = buildTimestampParserMap( - task.getJRuby(), - task.getColumns(), - task.getJoinedColumnPrefix(), - task.getTimeZone()); + @Config("columns") + SchemaConfig getColumns(); - return new JoinFilePageOutput(inputSchema, outputSchema, baseColumn, task.getTable(), joinColumns, timestampParserMap, output); + @Config("column_prefix") + @ConfigDefault("\"_joined_by_embulk_\"") + String getColumnPrefix(); + + @Config("parser_plugin_columns_option") + @ConfigDefault("\"columns\"") + String getParserPluginColumnsOption(); + + // NOTE: This plugin doesn't worry about format and timezone, + // because the responsibility is transferred to Embulk Parser Plugin. + static void putParserColumnsIfAbsent(FileTask task) + { + ConfigSource parserTask = task.getParser(); + List columnsOption = parserTask.get(List.class, task.getParserPluginColumnsOption(), null); + if (columnsOption == null || columnsOption.isEmpty()) { + parserTask.set(task.getParserPluginColumnsOption(), task.getColumns()); + task.setParser(parserTask); + } + } } - private Schema buildOutputSchema(Schema inputSchema, List columns, String joinedColumnPrefix) + @Override + public void transaction(ConfigSource config, Schema inputSchema, + FilterPlugin.Control control) { - ImmutableList.Builder builder = ImmutableList.builder(); + PluginTask task = config.loadConfig(PluginTask.class); + Schema outputSchema = PluginTask.buildOutputSchema(task, inputSchema); - int i = 0; // columns index - for (Column inputColumn: inputSchema.getColumns()) { - Column outputColumn = new Column(i++, inputColumn.getName(), inputColumn.getType()); - builder.add(outputColumn); - } - for (ColumnConfig columnConfig: columns) { - String columnName = joinedColumnPrefix + columnConfig.getName(); - builder.add(new Column(i++, columnName, columnConfig.getType())); - } + Joinee joinee = FileLoader.loadFile(task.getFileTask()); + task.setJoinee(joinee); - return new Schema(builder.build()); + task.getFileTask().setFiles(Lists.newArrayList()); // TODO: why is this required? + + control.run(task.dump(), outputSchema); } - private HashMap buildTimestampParserMap(ScriptingContainer jruby, List columns, String joinedColumnPrefix, String timeZone) + @Override + public PageOutput open(TaskSource taskSource, Schema inputSchema, + Schema outputSchema, PageOutput output) { - final HashMap timestampParserMap = Maps.newHashMap(); - for (ColumnConfig columnConfig: columns) { - if (Types.TIMESTAMP.equals(columnConfig.getType())) { - String format = columnConfig.getOption().get(String.class, "format"); - DateTimeZone timezone = DateTimeZone.forID(timeZone); - TimestampParser parser = new TimestampParser(jruby, format, timezone); - - String columnName = joinedColumnPrefix + columnConfig.getName(); - - timestampParserMap.put(columnName, parser); - } - } - - return timestampParserMap; + PluginTask task = taskSource.loadTask(PluginTask.class); + return new JoinFilePageOutput(task, inputSchema, outputSchema, output); } } diff --git a/src/main/java/org/embulk/filter/join_file/JoinFilePageOutput.java b/src/main/java/org/embulk/filter/join_file/JoinFilePageOutput.java index 5769b57..b9759ed 100644 --- a/src/main/java/org/embulk/filter/join_file/JoinFilePageOutput.java +++ b/src/main/java/org/embulk/filter/join_file/JoinFilePageOutput.java @@ -1,57 +1,45 @@ package org.embulk.filter.join_file; -import com.google.common.base.Throwables; +import org.embulk.filter.join_file.JoinFileFilterPlugin.PluginTask; +import org.embulk.filter.join_file.join.JoinOnColumnVisitor; +import org.embulk.filter.join_file.join.Joinee; +import org.embulk.filter.join_file.join.JoineeOn; +import org.embulk.filter.join_file.join.JoineeReader; import org.embulk.spi.Column; import org.embulk.spi.Exec; import org.embulk.spi.Page; -import org.embulk.spi.PageBuilder; import org.embulk.spi.PageOutput; -import org.embulk.spi.PageReader; import org.embulk.spi.Schema; -import org.embulk.spi.time.TimestampParser; -import org.embulk.spi.type.Types; - -import java.util.HashMap; -import java.util.List; +import pro.civitaspo.embulk.spi.PageBuilder; +import pro.civitaspo.embulk.spi.PageReader; public class JoinFilePageOutput implements PageOutput { - private final org.slf4j.Logger logger = Exec.getLogger(JoinFilePageOutput.class); - private final PageReader pageReader; private final PageBuilder pageBuilder; - private final Column joinBaseColumn; - private final HashMap> table; - private final List joinColumns; - private final HashMap timestampParserMap; + private final PageReader pageReader; + private final Column onPageColumn; + private final JoineeReader joineeReader; + private final JoinOnColumnVisitor visitor; - JoinFilePageOutput( - Schema inputSchema, - Schema outputSchema, - Column joinBaseColumn, - HashMap> table, - List joinColumns, - HashMap timestampParserMap, - PageOutput pageOutput - ) + JoinFilePageOutput(PluginTask task, Schema inputSchema, Schema outputSchema, PageOutput output) { + this.pageBuilder = new PageBuilder(Exec.getBufferAllocator(), outputSchema, output); this.pageReader = new PageReader(inputSchema); - this.pageBuilder = new PageBuilder(Exec.getBufferAllocator(), outputSchema, pageOutput); - this.joinBaseColumn = joinBaseColumn; - this.table = table; - this.joinColumns = joinColumns; - this.timestampParserMap = timestampParserMap; + this.onPageColumn = inputSchema.lookupColumn(task.getOnTask().getInputColumnName()); + Joinee joinee = task.getJoinee(); + JoineeOn joineeOn = joinee.buildJoineeOn(task.getOnTask().getFileColumnName()); + this.joineeReader = new JoineeReader(joinee, joineeOn); + this.visitor = new JoinOnColumnVisitor(pageBuilder, pageReader, joineeReader); } - @Override public void add(Page page) { pageReader.setPage(page); - while (pageReader.nextRecord()) { - setInputValues(pageBuilder); - setJoinedValues(pageBuilder); + joineeReader.updateJoineeRowNumber(pageReader, onPageColumn); + pageBuilder.getSchema().visitColumns(visitor); pageBuilder.addRecord(); } } @@ -65,91 +53,6 @@ public void finish() @Override public void close() { - pageReader.close(); pageBuilder.close(); } - - private void setInputValues(PageBuilder pageBuilder) { - for (Column inputColumn: pageReader.getSchema().getColumns()) { - if (pageReader.isNull(inputColumn)) { - pageBuilder.setNull(inputColumn); - continue; - } - - if (Types.STRING.equals(inputColumn.getType())) { - pageBuilder.setString(inputColumn, pageReader.getString(inputColumn)); - } - else if (Types.BOOLEAN.equals(inputColumn.getType())) { - pageBuilder.setBoolean(inputColumn, pageReader.getBoolean(inputColumn)); - } - else if (Types.DOUBLE.equals(inputColumn.getType())) { - pageBuilder.setDouble(inputColumn, pageReader.getDouble(inputColumn)); - } - else if (Types.LONG.equals(inputColumn.getType())) { - pageBuilder.setLong(inputColumn, pageReader.getLong(inputColumn)); - } - else if (Types.TIMESTAMP.equals(inputColumn.getType())) { - pageBuilder.setTimestamp(inputColumn, pageReader.getTimestamp(inputColumn)); - } - } - } - - private void setJoinedValues(PageBuilder pageBuilder) { - for (Column column: joinColumns) { - // get value from Table - String rowKey = getCurrentJoinBaseColumnValue(pageReader, joinBaseColumn); - if (!table.containsKey(rowKey) || !table.get(rowKey).containsKey(column.getName())) { - pageBuilder.setNull(column); - continue; - } - - String value = table.get(rowKey).get(column.getName()); - if (value == null) { - pageBuilder.setNull(column); - continue; - } - - if (Types.STRING.equals(column.getType())) { - pageBuilder.setString(column, value); - } - else if (Types.BOOLEAN.equals(column.getType())) { - pageBuilder.setBoolean(column, Boolean.parseBoolean(value)); - } - else if (Types.DOUBLE.equals(column.getType())) { - pageBuilder.setDouble(column, Double.parseDouble(value)); - } - else if (Types.LONG.equals(column.getType())) { - pageBuilder.setLong(column, Long.parseLong(value)); - } - else if (Types.TIMESTAMP.equals(column.getType())) { - TimestampParser parser = timestampParserMap.get(column.getName()); - pageBuilder.setTimestamp(column, parser.parse(value)); - } - } - } - - private static String getCurrentJoinBaseColumnValue(PageReader pageReader, Column joinBaseColumn) - { - if (pageReader.isNull(joinBaseColumn)) { - return null; - } - - if (Types.STRING.equals(joinBaseColumn.getType())) { - return pageReader.getString(joinBaseColumn); - } - else if (Types.BOOLEAN.equals(joinBaseColumn.getType())) { - return String.valueOf(pageReader.getBoolean(joinBaseColumn)); - } - else if (Types.DOUBLE.equals(joinBaseColumn.getType())) { - return String.valueOf(pageReader.getDouble(joinBaseColumn)); - } - else if (Types.LONG.equals(joinBaseColumn.getType())) { - return String.valueOf(pageReader.getLong(joinBaseColumn)); - } - else if (Types.TIMESTAMP.equals(joinBaseColumn.getType())) { - return String.valueOf(pageReader.getTimestamp(joinBaseColumn)); - } - - throw Throwables.propagate(new Throwable("Unsupported Column Type: " + joinBaseColumn.getType())); - } } diff --git a/src/main/java/org/embulk/filter/join_file/TableBuilder.java b/src/main/java/org/embulk/filter/join_file/TableBuilder.java deleted file mode 100644 index bb52691..0000000 --- a/src/main/java/org/embulk/filter/join_file/TableBuilder.java +++ /dev/null @@ -1,83 +0,0 @@ -package org.embulk.filter.join_file; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; -import org.embulk.config.ConfigException; -import org.embulk.spi.ColumnConfig; -import org.embulk.spi.Exec; -import org.slf4j.Logger; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -public class TableBuilder -{ - private final Logger logger = Exec.getLogger(TableBuilder.class); - private final String filePath; - private final String fileFormat; - private final List columns; - private final String rowKeyName; - private final String columnPrefix; - - public TableBuilder(String filePath, String fileFormat, List columns, String rowKeyName, String columnPrefix) - { - this.filePath = filePath; - this.fileFormat = fileFormat; - this.columns = columns; - this.rowKeyName = rowKeyName; - this.columnPrefix = columnPrefix; - } - - public HashMap> build() - throws IOException - { - HashMap> table = Maps.newHashMap(); - - for (HashMap rawRecord: loadFile()) { - - HashMap record = Maps.newHashMap(); - - for (ColumnConfig column: columns) { - String columnKey = columnPrefix + column.getName(); - String value = rawRecord.get(column.getName()); - - record.put(columnKey, value); - } - - String rowKey = rawRecord.get(rowKeyName); - table.put(rowKey, record); - } - - return table; - } - - private List> loadFile() - throws IOException - { - List> rawData; - switch (fileFormat) { - case "csv": - logger.error("will support csv format, but not yet."); - throw new NotImplementedException(); // TODO: will support csv format, but not yet. - case "tsv": - logger.error("will support tsv format, but not yet."); - throw new NotImplementedException(); // TODO: will support tsv format, but not yet. - case "yaml": - logger.error("will support yaml format, but not yet."); - throw new NotImplementedException(); // TODO: will support yaml format, but not yet. - case "json": - ObjectMapper mapper = new ObjectMapper(); - rawData = mapper.readValue(new File(filePath), new TypeReference>>(){}); - break; - default: - throw new ConfigException("Unsupported File Format: " + fileFormat); - } - - return rawData; - } -} diff --git a/src/main/java/org/embulk/filter/join_file/join/JoinOnColumnVisitor.java b/src/main/java/org/embulk/filter/join_file/join/JoinOnColumnVisitor.java new file mode 100644 index 0000000..95a96da --- /dev/null +++ b/src/main/java/org/embulk/filter/join_file/join/JoinOnColumnVisitor.java @@ -0,0 +1,152 @@ +package org.embulk.filter.join_file.join; + +import org.embulk.spi.Column; +import org.embulk.spi.ColumnVisitor; +import pro.civitaspo.embulk.spi.DataBuilder; +import pro.civitaspo.embulk.spi.DataReader; + +public class JoinOnColumnVisitor + implements ColumnVisitor +{ + private final DataBuilder dataBuilder; + private final DataReader dataReader; + private final JoineeReader joineeReader; + + private final int joineeOffset; + + public JoinOnColumnVisitor(DataBuilder dataBuilder, DataReader dataReader, JoineeReader joineeReader) + { + this.dataBuilder = dataBuilder; + this.dataReader = dataReader; + this.joineeReader = joineeReader; + this.joineeOffset = dataReader.getSchema().getColumnCount(); + } + + /* + TODO: More accurate explanation. + NOTE: This visitor do not mind column names. + So, dataBuilder.getSchema() != dataReader.getSchema() + joineeReader.getSchema() + Actually, dataBuilder.getSchema() = dataReader.getSchema() + joineeReader.getSchema() with column_prefix. + */ + + private boolean isJoinerColumn(Column column) + { + return column.getIndex() < joineeOffset; + } + + private int toJoineeIndex(Column column) + { + return column.getIndex() - joineeOffset; + } + + private void nullOr(Column column, Runnable runnable) + { + if (isJoinerColumn(column)) { + if (dataReader.isNull(column)) { + dataBuilder.setNull(column); + return; + } + } + else { + int joineeIndex = toJoineeIndex(column); + if (joineeReader.isNull(joineeIndex)) { + dataBuilder.setNull(column); + return; + } + } + + runnable.run(); + } + + @Override + public void booleanColumn(Column column) + { + nullOr(column, () -> + { + if (isJoinerColumn(column)) { + dataBuilder.setBoolean(column, dataReader.getBoolean(column)); + } + else { + int joineeIndex = toJoineeIndex(column); + dataBuilder.setBoolean(column, joineeReader.getBoolean(joineeIndex)); + } + }); + + } + + @Override + public void longColumn(Column column) + { + nullOr(column, () -> + { + if (isJoinerColumn(column)) { + dataBuilder.setLong(column, dataReader.getLong(column)); + } + else { + int joineeIndex = toJoineeIndex(column); + dataBuilder.setLong(column, joineeReader.getLong(joineeIndex)); + } + }); + } + + @Override + public void doubleColumn(Column column) + { + nullOr(column, () -> + { + if (isJoinerColumn(column)) { + dataBuilder.setDouble(column, dataReader.getDouble(column)); + } + else { + int joineeIndex = toJoineeIndex(column); + dataBuilder.setDouble(column, joineeReader.getDouble(joineeIndex)); + } + }); + + } + + @Override + public void stringColumn(Column column) + { + nullOr(column, () -> + { + if (isJoinerColumn(column)) { + dataBuilder.setString(column, dataReader.getString(column)); + } + else { + int joineeIndex = toJoineeIndex(column); + dataBuilder.setString(column, joineeReader.getString(joineeIndex)); + } + }); + } + + @Override + public void timestampColumn(Column column) + { + nullOr(column, () -> + { + if (isJoinerColumn(column)) { + dataBuilder.setTimestamp(column, dataReader.getTimestamp(column)); + } + else { + int joineeIndex = toJoineeIndex(column); + dataBuilder.setTimestamp(column, joineeReader.getTimestamp(joineeIndex)); + } + }); + } + + @Override + public void jsonColumn(Column column) + { + nullOr(column, () -> + { + if (isJoinerColumn(column)) { + dataBuilder.setJson(column, dataReader.getJson(column)); + } + else { + int joineeIndex = toJoineeIndex(column); + dataBuilder.setJson(column, joineeReader.getJson(joineeIndex)); + } + }); + } +} diff --git a/src/main/java/org/embulk/filter/join_file/join/Joinee.java b/src/main/java/org/embulk/filter/join_file/join/Joinee.java new file mode 100644 index 0000000..2e264fb --- /dev/null +++ b/src/main/java/org/embulk/filter/join_file/join/Joinee.java @@ -0,0 +1,153 @@ +package org.embulk.filter.join_file.join; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import org.embulk.config.ConfigException; +import org.embulk.spi.Column; +import org.embulk.spi.Schema; +import org.embulk.spi.json.JsonParser; +import org.embulk.spi.time.Timestamp; +import org.embulk.spi.type.Type; +import org.msgpack.value.Value; + +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.embulk.spi.type.Types.BOOLEAN; +import static org.embulk.spi.type.Types.DOUBLE; +import static org.embulk.spi.type.Types.JSON; +import static org.embulk.spi.type.Types.LONG; +import static org.embulk.spi.type.Types.STRING; +import static org.embulk.spi.type.Types.TIMESTAMP; + +public class Joinee +{ + private final Schema schema; + private final Map>> booleanData; + private final Map>> stringData; + private final Map>> longData; + private final Map>> doubleData; + private final Map>> timestampData; + private final Map>> jsonData; // Embulk cannot deserialize org.msgpack.value.Value. + private final long maxRowNumber; + + @JsonCreator + public Joinee( + @JsonProperty("schema") Schema schema, + @JsonProperty("boolean_data") Map>> booleanData, + @JsonProperty("string_data") Map>> stringData, + @JsonProperty("long_data") Map>> longData, + @JsonProperty("double_data") Map>> doubleData, + @JsonProperty("timestamp_data") Map>> timestampData, + @JsonProperty("json_data") Map>> jsonData, + @JsonProperty("max_row_number") long maxRowNumber) + { + this.schema = schema; + this.booleanData = booleanData; + this.stringData = stringData; + this.longData = longData; + this.doubleData = doubleData; + this.timestampData = timestampData; + this.jsonData = jsonData; + this.maxRowNumber = maxRowNumber; + } + + @JsonProperty("schema") + public Schema getSchema() + { + return schema; + } + + @JsonIgnore + public JoineeOn buildJoineeOn(String onColumnName) + { + int onColumnIndex = schema.lookupColumn(onColumnName).getIndex(); + return buildJoineeOn(onColumnIndex); + } + + @JsonIgnore + public JoineeOn buildJoineeOn(int onColumnIndex) + { + JoineeOn.Builder builder = JoineeOn.builder(); + + Column c = schema.getColumn(onColumnIndex); + int i = c.getIndex(); + Type t = c.getType(); + + + if (t.equals(BOOLEAN)) { + booleanData.get(i).forEach((k, v) -> builder.put(v, k)); + } + else if (t.equals(STRING)) { + stringData.get(i).forEach((k, v) -> builder.put(v, k)); + } + else if (t.equals(LONG)) { + longData.get(i).forEach((k, v) -> builder.put(v, k)); + } + else if (t.equals(DOUBLE)) { + doubleData.get(i).forEach((k, v) -> builder.put(v, k)); + } + else if (t.equals(TIMESTAMP)) { + timestampData.get(i).forEach((k, v) -> builder.put(v, k)); + } + else if (t.equals(JSON)) { + JsonParser p = new JsonParser(); + jsonData.get(i).forEach((k, v) -> + { + String jsonString = v.or("null"); + Value json = p.parse(jsonString); + builder.put(Optional.of(json), k); + }); + } + else { + throw new ConfigException(String.format("Unsupported t: %s", t.getName())); + } + + return builder.build(); + } + + @JsonProperty("boolean_data") + public Map>> getBooleanData() + { + return booleanData; + } + + @JsonProperty("string_data") + public Map>> getStringData() + { + return stringData; + } + + @JsonProperty("long_data") + public Map>> getLongData() + { + return longData; + } + + @JsonProperty("double_data") + public Map>> getDoubleData() + { + return doubleData; + } + + @JsonProperty("timestamp_data") + public Map>> getTimestampData() + { + return timestampData; + } + + @JsonProperty("json_data") + public Map>> getJsonData() + { + return jsonData; + } + + @JsonProperty("max_row_number") + public long getMaxRowNumber() + { + return maxRowNumber; + } +} diff --git a/src/main/java/org/embulk/filter/join_file/join/JoineeBuilder.java b/src/main/java/org/embulk/filter/join_file/join/JoineeBuilder.java new file mode 100644 index 0000000..20a262a --- /dev/null +++ b/src/main/java/org/embulk/filter/join_file/join/JoineeBuilder.java @@ -0,0 +1,186 @@ +package org.embulk.filter.join_file.join; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import org.embulk.config.ConfigException; +import org.embulk.spi.Column; +import org.embulk.spi.Schema; +import org.embulk.spi.time.Timestamp; +import org.embulk.spi.type.Type; +import org.msgpack.value.Value; +import pro.civitaspo.embulk.spi.DataBuilder; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.embulk.spi.type.Types.BOOLEAN; +import static org.embulk.spi.type.Types.DOUBLE; +import static org.embulk.spi.type.Types.JSON; +import static org.embulk.spi.type.Types.LONG; +import static org.embulk.spi.type.Types.STRING; +import static org.embulk.spi.type.Types.TIMESTAMP; + +public class JoineeBuilder + implements DataBuilder +{ + private final Schema schema; + private final AtomicLong rowNumber = new AtomicLong(0); + private final Map>> booleanData = Maps.newConcurrentMap(); + private final Map>> stringData = Maps.newConcurrentMap(); + private final Map>> longData = Maps.newConcurrentMap(); + private final Map>> doubleData = Maps.newConcurrentMap(); + private final Map>> timestampData = Maps.newConcurrentMap(); + private final Map>> jsonData = Maps.newConcurrentMap(); // Embulk cannot deserialize org.msgpack.value.Value. + + public JoineeBuilder(Schema schema) + { + this.schema = schema; + } + + private void setValue(int columnIndex, Map>> data, Optional v) + { + if (!data.containsKey(columnIndex)) { + data.put(columnIndex, Maps.newConcurrentMap()); + } + data.get(columnIndex).put(rowNumber.get(), v); + } + + public Joinee build() + { + return new Joinee( + schema, + booleanData, + stringData, + longData, + doubleData, + timestampData, + jsonData, + rowNumber.get()); + } + + @Override + public Schema getSchema() + { + return schema; + } + + @Override + public void addRecord() + { + rowNumber.incrementAndGet(); + } + + @Override + public void setNull(Column column) + { + setNull(column.getIndex()); + } + + @Override + public void setNull(int columnIndex) + { + Column c = getSchema().getColumn(columnIndex); + Type t = c.getType(); + + if (t.equals(BOOLEAN)) { + setValue(columnIndex, booleanData, Optional.absent()); + } + else if (t.equals(STRING)) { + setValue(columnIndex, stringData, Optional.absent()); + } + else if (t.equals(LONG)) { + setValue(columnIndex, longData, Optional.absent()); + } + else if (t.equals(DOUBLE)) { + setValue(columnIndex, doubleData, Optional.absent()); + } + else if (t.equals(TIMESTAMP)) { + setValue(columnIndex, timestampData, Optional.absent()); + } + else if (t.equals(JSON)) { + setValue(columnIndex, jsonData, Optional.absent()); + } + else { + throw new ConfigException(String.format("Unsupported t: %s", t.getName())); + } + } + + @Override + public void setBoolean(Column column, boolean v) + { + setBoolean(column.getIndex(), v); + } + + @Override + public void setBoolean(int columnIndex, boolean v) + { + // TODO: check column type? + setValue(columnIndex, booleanData, Optional.fromNullable(v)); + } + + @Override + public void setString(Column column, String v) + { + setString(column.getIndex(), v); + } + + @Override + public void setString(int columnIndex, String v) + { + // TODO: check column type? + setValue(columnIndex, stringData, Optional.fromNullable(v)); + } + + @Override + public void setLong(Column column, long v) + { + setLong(column.getIndex(), v); + } + + @Override + public void setLong(int columnIndex, long v) + { + // TODO: check column type? + setValue(columnIndex, longData, Optional.fromNullable(v)); + } + + @Override + public void setDouble(Column column, double v) + { + setDouble(column.getIndex(), v); + } + + @Override + public void setDouble(int columnIndex, double v) + { + // TODO: check column type? + setValue(columnIndex, doubleData, Optional.fromNullable(v)); + } + + @Override + public void setTimestamp(Column column, Timestamp v) + { + setTimestamp(column.getIndex(), v); + } + + @Override + public void setTimestamp(int columnIndex, Timestamp v) + { + // TODO: check column type? + setValue(columnIndex, timestampData, Optional.fromNullable(v)); + } + + @Override + public void setJson(Column column, Value v) + { + setJson(column.getIndex(), v); + } + + @Override + public void setJson(int columnIndex, Value v) + { + // TODO: check column type? + // TODO: use org.msgpack.value.Value if jackson-databind supports + setValue(columnIndex, jsonData, Optional.fromNullable(v.toJson())); + } +} diff --git a/src/main/java/org/embulk/filter/join_file/join/JoineeOn.java b/src/main/java/org/embulk/filter/join_file/join/JoineeOn.java new file mode 100644 index 0000000..4d45c7f --- /dev/null +++ b/src/main/java/org/embulk/filter/join_file/join/JoineeOn.java @@ -0,0 +1,46 @@ +package org.embulk.filter.join_file.join; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +public class JoineeOn +{ + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private final ImmutableMap.Builder, Long> builder = ImmutableMap.builder(); + + public Builder() + { + } + + public Builder put(Optional key, long value) + { + builder.put(key, value); + return this; + } + + public JoineeOn build() + { + return new JoineeOn(builder.build()); + } + } + + private final Map, Long> indexMap; + + private JoineeOn(Map, Long> indexMap) + { + this.indexMap = indexMap; + } + + public long get(Optional joiner) + { + return indexMap.get(joiner); + } +} diff --git a/src/main/java/org/embulk/filter/join_file/join/JoineeReader.java b/src/main/java/org/embulk/filter/join_file/join/JoineeReader.java new file mode 100644 index 0000000..189a3a3 --- /dev/null +++ b/src/main/java/org/embulk/filter/join_file/join/JoineeReader.java @@ -0,0 +1,138 @@ +package org.embulk.filter.join_file.join; + +import com.google.common.base.Optional; +import org.embulk.config.ConfigException; +import org.embulk.spi.Column; +import org.embulk.spi.Schema; +import org.embulk.spi.json.JsonParseException; +import org.embulk.spi.json.JsonParser; +import org.embulk.spi.time.Timestamp; +import org.embulk.spi.type.Type; +import org.msgpack.value.Value; +import pro.civitaspo.embulk.spi.DataReader; + +import static org.embulk.spi.type.Types.BOOLEAN; +import static org.embulk.spi.type.Types.DOUBLE; +import static org.embulk.spi.type.Types.JSON; +import static org.embulk.spi.type.Types.LONG; +import static org.embulk.spi.type.Types.STRING; +import static org.embulk.spi.type.Types.TIMESTAMP; + +public class JoineeReader +{ + private final Joinee joinee; + private final JoineeOn on; + private long currentRowNumber = 0L; + + private final JsonParser jsonParser = new JsonParser(); + + public JoineeReader(Joinee joinee, JoineeOn joineeOn) + { + this.joinee = joinee; + this.on = joineeOn; + } + + public Schema getSchema() + { + return joinee.getSchema(); + } + + public void updateJoineeRowNumber(DataReader dataReader, Column onReaderColumn) + { + Type t = onReaderColumn.getType(); + + if (t.equals(BOOLEAN)) { + boolean v = dataReader.getBoolean(onReaderColumn); + this.currentRowNumber = on.get(Optional.fromNullable(v)); + } + else if (t.equals(STRING)) { + String v = dataReader.getString(onReaderColumn); + this.currentRowNumber = on.get(Optional.fromNullable(v)); + } + else if (t.equals(LONG)) { + long v = dataReader.getLong(onReaderColumn); + this.currentRowNumber = on.get(Optional.fromNullable(v)); + } + else if (t.equals(DOUBLE)) { + double v = dataReader.getDouble(onReaderColumn); + this.currentRowNumber = on.get(Optional.fromNullable(v)); + } + else if (t.equals(TIMESTAMP)) { + Timestamp v = dataReader.getTimestamp(onReaderColumn); + this.currentRowNumber = on.get(Optional.fromNullable(v)); + } + else if (t.equals(JSON)) { + Value v = dataReader.getJson(onReaderColumn); + this.currentRowNumber = on.get(Optional.fromNullable(v)); + } + else { + throw new ConfigException(String.format("Unsupported t: %s", t.getName())); + } + } + + public boolean isNull(int columnIndex) + { + Column c = getSchema().getColumn(columnIndex); + Type t = c.getType(); + + if (t.equals(BOOLEAN)) { + return !joinee.getBooleanData().get(columnIndex).get(currentRowNumber).isPresent(); + } + else if (t.equals(STRING)) { + return !joinee.getStringData().get(columnIndex).get(currentRowNumber).isPresent(); + } + else if (t.equals(LONG)) { + return !joinee.getLongData().get(columnIndex).get(currentRowNumber).isPresent(); + } + else if (t.equals(DOUBLE)) { + return !joinee.getDoubleData().get(columnIndex).get(currentRowNumber).isPresent(); + } + else if (t.equals(TIMESTAMP)) { + return !joinee.getTimestampData().get(columnIndex).get(currentRowNumber).isPresent(); + } + else if (t.equals(JSON)) { + return !joinee.getJsonData().get(columnIndex).get(currentRowNumber).isPresent(); + } + else { + throw new ConfigException(String.format("Unsupported t: %s", t.getName())); + } + } + + public boolean getBoolean(int columnIndex) + { + // TODO: check column type? + return joinee.getBooleanData().get(columnIndex).get(currentRowNumber).get(); + } + + public String getString(int columnIndex) + { + // TODO: check column type? + return joinee.getStringData().get(columnIndex).get(currentRowNumber).get(); + } + + public long getLong(int columnIndex) + { + // TODO: check column type? + return joinee.getLongData().get(columnIndex).get(currentRowNumber).get(); + } + + public double getDouble(int columnIndex) + { + // TODO: check column type? + return joinee.getDoubleData().get(columnIndex).get(currentRowNumber).get(); + } + + public Timestamp getTimestamp(int columnIndex) + { + // TODO: check column type? + return joinee.getTimestampData().get(columnIndex).get(currentRowNumber).get(); + } + + public Value getJson(int columnIndex) + { + // TODO: check column type? + // TODO: use org.msgpack.value.Value if jackson-databind supports + String jsonString = joinee.getJsonData().get(columnIndex).get(currentRowNumber).get(); + return jsonParser.parse(jsonString); + } +} diff --git a/src/main/java/org/embulk/filter/join_file/plugin/InternalForwardOutputPlugin.java b/src/main/java/org/embulk/filter/join_file/plugin/InternalForwardOutputPlugin.java new file mode 100644 index 0000000..ca16910 --- /dev/null +++ b/src/main/java/org/embulk/filter/join_file/plugin/InternalForwardOutputPlugin.java @@ -0,0 +1,100 @@ +package org.embulk.filter.join_file.plugin; + +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigException; +import org.embulk.config.ConfigSource; +import org.embulk.config.TaskReport; +import org.embulk.config.TaskSource; +import org.embulk.spi.ColumnVisitor; +import org.embulk.spi.Exec; +import org.embulk.spi.OutputPlugin; +import org.embulk.spi.Page; +import org.embulk.spi.Schema; +import org.embulk.spi.TransactionalPageOutput; +import org.slf4j.Logger; +import pro.civitaspo.embulk.forward.OutForwardEventBuilder; +import pro.civitaspo.embulk.forward.OutForwardService; +import pro.civitaspo.embulk.spi.PageReader; +import pro.civitaspo.embulk.spi.StandardColumnVisitor; + +import java.util.List; + +public class InternalForwardOutputPlugin + implements OutputPlugin +{ + private static final Logger logger = Exec.getLogger(InternalForwardOutputPlugin.class); + public static final String PLUGIN_NAME = "internal_forward"; + + public interface PluginTask + extends OutForwardService.Task + { + } + + @Override + public ConfigDiff transaction(ConfigSource config, Schema schema, int taskCount, Control control) + { + PluginTask task = config.loadConfig(PluginTask.class); + control.run(task.dump()); + OutForwardService.sendShutdownMessage(task); + return Exec.newConfigDiff(); + } + + @Override + public ConfigDiff resume(TaskSource taskSource, Schema schema, int taskCount, Control control) + { + throw new ConfigException("This plugin is not resumable."); + } + + @Override + public void cleanup(TaskSource taskSource, Schema schema, int taskCount, List successTaskReports) + { + } + + @Override + public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int taskIndex) + { + PluginTask task = taskSource.loadTask(PluginTask.class); + + return new TransactionalPageOutput() { + private PageReader reader = new PageReader(schema); + private OutForwardService service = new OutForwardService(task); + private OutForwardEventBuilder builder = new OutForwardEventBuilder(schema, service); + private ColumnVisitor visitor = new StandardColumnVisitor(reader, builder); + + @Override + public void add(Page page) + { + reader.setPage(page); + + while (reader.nextRecord()) { + schema.visitColumns(visitor); + builder.addRecord(); + } + } + + @Override + public void finish() + { + service.finish(); + } + + @Override + public void close() + { + service.close(); + } + + @Override + public void abort() + { + logger.warn("output plugin: {} does not support #abort", PLUGIN_NAME); + } + + @Override + public TaskReport commit() + { + return Exec.newTaskReport(); + } + }; + } +} diff --git a/src/main/java/org/embulk/filter/join_file/plugin/InternalPluginExtention.java b/src/main/java/org/embulk/filter/join_file/plugin/InternalPluginExtention.java new file mode 100644 index 0000000..183f9e2 --- /dev/null +++ b/src/main/java/org/embulk/filter/join_file/plugin/InternalPluginExtention.java @@ -0,0 +1,18 @@ +package org.embulk.filter.join_file.plugin; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Module; +import org.embulk.config.ConfigSource; +import org.embulk.spi.Extension; + +import java.util.List; + +public class InternalPluginExtention + implements Extension +{ + @Override + public List getModules(ConfigSource systemConfig) // TODO: need META-INF? + { + return ImmutableList.of(new InternalPluginModule()); + } +} diff --git a/src/main/java/org/embulk/filter/join_file/plugin/InternalPluginModule.java b/src/main/java/org/embulk/filter/join_file/plugin/InternalPluginModule.java new file mode 100644 index 0000000..45df562 --- /dev/null +++ b/src/main/java/org/embulk/filter/join_file/plugin/InternalPluginModule.java @@ -0,0 +1,20 @@ +package org.embulk.filter.join_file.plugin; + +import com.google.common.base.Preconditions; +import com.google.inject.Binder; +import com.google.inject.Module; +import org.embulk.spi.OutputPlugin; + +import static org.embulk.filter.join_file.plugin.InternalForwardOutputPlugin.PLUGIN_NAME; +import static org.embulk.plugin.InjectedPluginSource.registerPluginTo; + +public class InternalPluginModule + implements Module +{ + @Override + public void configure(Binder binder) + { + Preconditions.checkNotNull(binder); + registerPluginTo(binder, OutputPlugin.class, PLUGIN_NAME, InternalForwardOutputPlugin.class); + } +} diff --git a/src/main/java/pro/civitaspo/embulk/forward/ForwardParentTask.java b/src/main/java/pro/civitaspo/embulk/forward/ForwardParentTask.java new file mode 100644 index 0000000..726f2f8 --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/forward/ForwardParentTask.java @@ -0,0 +1,17 @@ +package pro.civitaspo.embulk.forward; + +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.config.Task; + +public interface ForwardParentTask + extends Task +{ + @Config("shutdown_tag") + @ConfigDefault("\"shutdown\"") + String getShutdownTag(); + + @Config("message_tag") + @ConfigDefault("\"message\"") + String getMessageTag(); +} diff --git a/src/main/java/pro/civitaspo/embulk/forward/InForwardEventReader.java b/src/main/java/pro/civitaspo/embulk/forward/InForwardEventReader.java new file mode 100644 index 0000000..1c7688e --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/forward/InForwardEventReader.java @@ -0,0 +1,152 @@ +package pro.civitaspo.embulk.forward; + +import com.google.common.collect.Lists; +import influent.EventStream; +import org.embulk.spi.Column; +import org.embulk.spi.Schema; +import org.embulk.spi.time.Timestamp; +import org.msgpack.value.Value; +import pro.civitaspo.embulk.spi.DataReader; + +import java.util.List; +import java.util.Optional; + +public class InForwardEventReader + implements DataReader +{ + private final static String VALUES_KEY = "values"; + + private final Schema schema; + private EventStream event = null; + private int eventMessageCount = 0; + + private int readCount = 0; + private List values; + + public InForwardEventReader(Schema schema) + { + this.schema = schema; + } + + public void setEvent(EventStream event) + { + this.event = event; + this.eventMessageCount = event.getEntries().size(); + } + + @Override + public Schema getSchema() + { + return schema; + } + + @Override + public boolean nextRecord() + { + if (eventMessageCount <= readCount) { + return false; + } + + Optional> values = event.getEntries().get(readCount++).getRecord().entrySet() + .stream() + .filter(valueValueEntry -> valueValueEntry.getKey().asStringValue().asString().contentEquals(VALUES_KEY)) + .map(valueValueEntry -> valueValueEntry.getValue().asArrayValue().list()) + .findFirst(); + + this.values = values.orElse(Lists.newArrayListWithCapacity(schema.getColumnCount())); + return true; + } + + private Value getValue(int columnIndex) + { + return values.get(columnIndex); + } + + @Override + public boolean isNull(Column column) + { + return isNull(column.getIndex()); + } + + @Override + public boolean isNull(int columnIndex) + { + return getValue(columnIndex).isNilValue(); + } + + @Override + public boolean getBoolean(Column column) + { + return getBoolean(column.getIndex()); + } + + @Override + public boolean getBoolean(int columnIndex) + { + return getValue(columnIndex).asBooleanValue().getBoolean(); + } + + @Override + public String getString(Column column) + { + return getString(column.getIndex()); + } + + @Override + public String getString(int columnIndex) + { + return getValue(columnIndex).asStringValue().asString(); + } + + @Override + public long getLong(Column column) + { + return getLong(column.getIndex()); + } + + @Override + public long getLong(int columnIndex) + { + return getValue(columnIndex).asIntegerValue().asLong(); + } + + @Override + public double getDouble(Column column) + { + return getDouble(column.getIndex()); + } + + @Override + public double getDouble(int columnIndex) + { + return getValue(columnIndex).asFloatValue().toDouble(); + } + + @Override + public Timestamp getTimestamp(Column column) + { + return getTimestamp(column.getIndex()); + } + + @Override + public Timestamp getTimestamp(int columnIndex) + { + List seed = getValue(columnIndex).asArrayValue().list(); + long epochSecond = seed.get(0).asIntegerValue().asLong(); + long nanoAdjustment = seed.get(1).asIntegerValue().asLong(); + return Timestamp.ofEpochSecond(epochSecond, nanoAdjustment); + } + + @Override + public Value getJson(Column column) + { + return getJson(column.getIndex()); + } + + @Override + public Value getJson(int columnIndex) + { + return getValue(columnIndex); + } + +} diff --git a/src/main/java/pro/civitaspo/embulk/forward/InForwardService.java b/src/main/java/pro/civitaspo/embulk/forward/InForwardService.java new file mode 100644 index 0000000..29eaaca --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/forward/InForwardService.java @@ -0,0 +1,254 @@ +package pro.civitaspo.embulk.forward; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import influent.EventStream; +import influent.Tag; +import influent.forward.ForwardCallback; +import influent.forward.ForwardServer; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.spi.DataException; +import org.embulk.spi.Exec; +import org.slf4j.Logger; +import pro.civitaspo.embulk.spi.ElapsedTime; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +public class InForwardService + extends AbstractExecutionThreadService +{ + private final static Logger logger = Exec.getLogger(InForwardService.class); + + public interface InForwardTask + extends org.embulk.config.Task + { + @Config("port") + @ConfigDefault("24224") + int getPort(); + + @Config("chunk_size_limit") + @ConfigDefault("null") + Optional getChunkSizeLimit(); + + @Config("backlog") + @ConfigDefault("null") + Optional getBacklog(); + + @Config("send_buffer_size") + @ConfigDefault("null") + Optional getSendBufferSize(); + + @Config("receive_buffer_size") + @ConfigDefault("null") + Optional getReceiveBufferSize(); + + @Config("keep_alive_enabled") + @ConfigDefault("null") + Optional getKeepAliveEnabled(); + + @Config("tcp_no_delay_enabled") + @ConfigDefault("null") + Optional getTcpNoDelayEnabled(); + } + + public interface Task + extends ForwardParentTask + { + @Config("in_forward") + @ConfigDefault("{}") + InForwardTask getInForwardTask(); + } + + public static class Builder + { + private Task task; + private Consumer eventConsumer; + + public Builder() + { + } + + public Builder task(Task task) + { + this.task = task; + return this; + } + + public Builder forEachEventCallback(Consumer eventConsumer) + { + this.eventConsumer = eventConsumer; + return this; + } + + public InForwardService build() + { + return new InForwardService(task, eventConsumer); + } + } + + public static Builder builder() + { + return new Builder(); + } + + private final Task task; + private final ForwardServer server; + private final AtomicBoolean shouldShutdown = new AtomicBoolean(false); + + private InForwardService(Task task, Consumer eventConsumer) + { + this.task = task; + this.server = buildServer(task.getInForwardTask(), eventConsumer); + } + + private ForwardServer buildServer(InForwardTask t, Consumer eventConsumer) + { + ForwardServer.Builder builder = new ForwardServer.Builder(wrapEventConsumer(eventConsumer)); + + builder.localAddress(t.getPort()); + + if (t.getChunkSizeLimit().isPresent()) { + builder.chunkSizeLimit(t.getChunkSizeLimit().get()); + } + if (t.getBacklog().isPresent()) { + builder.backlog(t.getBacklog().get()); + } + if (t.getSendBufferSize().isPresent()) { + builder.sendBufferSize(t.getSendBufferSize().get()); + } + if (t.getReceiveBufferSize().isPresent()) { + builder.receiveBufferSize(t.getReceiveBufferSize().get()); + } + if (t.getKeepAliveEnabled().isPresent()) { + builder.keepAliveEnabled(t.getKeepAliveEnabled().get()); + } + if (t.getTcpNoDelayEnabled().isPresent()) { + builder.tcpNoDelayEnabled(t.getTcpNoDelayEnabled().get()); + } + // TODO: builder.workerPoolSize(1); + + return builder.build(); + } + + private ForwardCallback wrapEventConsumer(Consumer eventConsumer) + { + return ForwardCallback.of(es -> + { + if (isShutdownTag(es.getTag())) { + logger.info("Receive shutdown tag: {}", es.getTag()); + shouldShutdown.set(true); + } + else if (isMessageTag(es.getTag())) { + eventConsumer.accept(es); + } + else { + throw new DataException(String.format("Unknown Tag received: %s", es.getTag().getName())); + } + return CompletableFuture.completedFuture(null); + }); + } + + private boolean isShutdownTag(Tag tag) + { + return tag.getName().contentEquals(task.getShutdownTag()); + } + + private boolean isMessageTag(Tag tag) + { + return tag.getName().contentEquals(task.getMessageTag()); + } + + @Override + protected void startUp() + throws Exception + { + server.start(); + } + + @Override + protected void shutDown() + throws Exception + { + ElapsedTime.measureWithPolling(new ElapsedTime.Pollable() { + private CompletableFuture future; + + @Override + public boolean poll() + { + return future.isCancelled() || future.isCompletedExceptionally() || future.isDone(); + } + + @Override + public Void getResult() + { + try { + return future.get(); + } + catch (InterruptedException | ExecutionException e) { + logger.warn("InForwardService: Server Shutdown is failed.", e); + return null; + } + } + + @Override + public void onStart() + { + logger.info("InForwardService: Server Shutdown Start."); + this.future = server.shutdown(); + } + + @Override + public void onWaiting(long elapsedMillis) + { + logger.info("InForwardService: Server Shutdown Running. (Elapsed: {} ms)", elapsedMillis); + } + + @Override + public void onFinished(long elapsedMillis) + { + logger.info("InForwardService: Server Shutdown Finish. (Elapsed: {} ms)", elapsedMillis); + } + }); + } + + @Override + protected void run() + throws Exception + { + ElapsedTime.measureWithPolling(new ElapsedTime.Pollable() { + @Override + public boolean poll() + { + return shouldShutdown.get(); + } + + @Override + public Void getResult() + { + return null; + } + + @Override + public void onStart() + { + logger.info("InForwardService: Server Start."); + } + + @Override + public void onWaiting(long elapsedMillis) + { + logger.info("InForwardService: Server Running. (Elapsed: {} ms)", elapsedMillis); + } + + @Override + public void onFinished(long elapsedMillis) + { + logger.info("InForwardService: Server Shutdown. (Elapsed: {} ms)", elapsedMillis); + } + }); + } +} diff --git a/src/main/java/pro/civitaspo/embulk/forward/OutForwardEventBuilder.java b/src/main/java/pro/civitaspo/embulk/forward/OutForwardEventBuilder.java new file mode 100644 index 0000000..637efea --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/forward/OutForwardEventBuilder.java @@ -0,0 +1,147 @@ +package pro.civitaspo.embulk.forward; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.embulk.spi.Column; +import org.embulk.spi.Schema; +import org.embulk.spi.time.Timestamp; +import org.msgpack.value.Value; +import pro.civitaspo.embulk.spi.DataBuilder; + +import java.util.List; +import java.util.Map; + +public class OutForwardEventBuilder + implements DataBuilder +{ + private final static String VALUES_KEY = "values"; + + private final Schema schema; + private final OutForwardService outForward; + + private List values; + + public OutForwardEventBuilder( + Schema schema, + OutForwardService outForward) + { + this.schema = schema; + this.outForward = outForward; + + setNewMessage(); + } + + private void setNewMessage() + { + this.values = Lists.newArrayListWithCapacity(schema.getColumnCount()); + } + + @Override + public Schema getSchema() + { + return schema; + } + + @Override + public void addRecord() + { + Map message = Maps.newHashMap(); + message.put(VALUES_KEY, values); + outForward.emit(message); + setNewMessage(); + } + + private void setValue(int columnIndex, Object v) + { + values.add(columnIndex, v); + } + + @Override + public void setNull(Column column) + { + setNull(column.getIndex()); + } + + @Override + public void setNull(int columnIndex) + { + setValue(columnIndex, null); + } + + @Override + public void setBoolean(Column column, boolean v) + { + setBoolean(column.getIndex(), v); + } + + @Override + public void setBoolean(int columnIndex, boolean v) + { + setValue(columnIndex, v); + } + + @Override + public void setString(Column column, String v) + { + setString(column.getIndex(), v); + } + + @Override + public void setString(int columnIndex, String v) + { + setValue(columnIndex, v); + } + + @Override + public void setLong(Column column, long v) + { + setLong(column.getIndex(), v); + } + + @Override + public void setLong(int columnIndex, long v) + { + setValue(columnIndex, v); + } + + @Override + public void setDouble(Column column, double v) + { + setDouble(column.getIndex(), v); + } + + @Override + public void setDouble(int columnIndex, double v) + { + setValue(columnIndex, v); + } + + @Override + public void setTimestamp(Column column, Timestamp v) + { + + setTimestamp(column.getIndex(), v); + } + + @Override + public void setTimestamp(int columnIndex, Timestamp v) + { + // TODO: confirm correct value is stored + long epochSecond = v.getEpochSecond(); + long nanoAdjustment = v.getNano(); + setValue(columnIndex, new long[]{epochSecond, nanoAdjustment}); + } + + @Override + public void setJson(Column column, Value v) + { + setJson(column.getIndex(), v); + } + + @Override + public void setJson(int columnIndex, Value v) + { + // TODO: confirm correct value is stored + setValue(columnIndex, v); + } +} diff --git a/src/main/java/pro/civitaspo/embulk/forward/OutForwardService.java b/src/main/java/pro/civitaspo/embulk/forward/OutForwardService.java new file mode 100644 index 0000000..174f878 --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/forward/OutForwardService.java @@ -0,0 +1,170 @@ +package pro.civitaspo.embulk.forward; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.spi.Exec; +import org.komamitsu.fluency.Fluency; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.Map; + +public class OutForwardService +{ + private final static Logger logger = Exec.getLogger(OutForwardService.class); + + public interface OutForwardTask + extends org.embulk.config.Task + { + @Config("host") + @ConfigDefault("\"localhost\"") + String getHost(); + + @Config("port") + @ConfigDefault("24224") + int getPort(); + + @Config("max_buffer_size") + @ConfigDefault("null") + Optional getMaxBufferSize(); + + @Config("buffer_chunk_initial_size") + @ConfigDefault("null") + Optional getBufferChunkInitialSize(); + + @Config("buffer_chunk_retention_size") + @ConfigDefault("null") + Optional getBufferChunkRetentionSize(); + + @Config("flush_interval_millis") + @ConfigDefault("null") + Optional getFlushIntervalMillis(); + + @Config("sender_max_retry_count") + @ConfigDefault("null") + Optional getSenderMaxRetryCount(); + + @Config("ack_response_mode") + @ConfigDefault("null") + Optional getAckResponseMode(); + + @Config("file_backup_dir") + @ConfigDefault("null") + Optional getFileBackupDir(); + + @Config("wait_until_buffer_flushed") + @ConfigDefault("null") + Optional getWaitUntilBufferFlushed(); + + @Config("wait_until_flusher_terminated") + @ConfigDefault("null") + Optional getWaitUntilFlusherTerminated(); + } + + public interface Task + extends ForwardParentTask + { + @Config("out_forward") + @ConfigDefault("{}") + OutForwardTask getOutForwardTask(); + } + + public static void sendShutdownMessage(Task task) + { + logger.info("OutForwardService: Send a Shutdown Message."); + OutForwardService outForward = new OutForwardService(task); + outForward.emit(task.getShutdownTag(), Maps.newHashMap()); + outForward.finish(); + outForward.close(); + } + + private final Task task; + private final Fluency client; + + public OutForwardService(Task task) + { + this.task = task; + this.client = newFluency(task.getOutForwardTask()); + } + + private Fluency.Config configureFluencyConfig(OutForwardTask t) + { + Fluency.Config c = new Fluency.Config(); + if (t.getMaxBufferSize().isPresent()) { + c.setMaxBufferSize(t.getMaxBufferSize().get()); + } + if (t.getBufferChunkInitialSize().isPresent()) { + c.setBufferChunkInitialSize(t.getBufferChunkInitialSize().get()); + } + if (t.getBufferChunkRetentionSize().isPresent()) { + c.setBufferChunkRetentionSize(t.getBufferChunkRetentionSize().get()); + } + if (t.getFlushIntervalMillis().isPresent()) { + c.setFlushIntervalMillis(t.getFlushIntervalMillis().get()); + } + if (t.getSenderMaxRetryCount().isPresent()) { + c.setSenderMaxRetryCount(t.getSenderMaxRetryCount().get()); + } + if (t.getAckResponseMode().isPresent()) { + c.setAckResponseMode(t.getAckResponseMode().get()); + } + if (t.getFileBackupDir().isPresent()) { + c.setFileBackupDir(t.getFileBackupDir().get()); + } + if (t.getWaitUntilBufferFlushed().isPresent()) { + c.setWaitUntilBufferFlushed(t.getWaitUntilBufferFlushed().get()); + } + if (t.getWaitUntilFlusherTerminated().isPresent()) { + c.setWaitUntilFlusherTerminated(t.getWaitUntilFlusherTerminated().get()); + } + return c; + } + + private Fluency newFluency(OutForwardTask t) + { + Fluency.Config c = configureFluencyConfig(t); + try { + return Fluency.defaultFluency(t.getHost(), t.getPort(), c); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void emit(String tag, Map message) + { + try { + client.emit(tag, message); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void emit(Map message) + { + emit(task.getMessageTag(), message); + } + + public void finish() + { + try { + client.flush(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void close() + { + try { + client.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/pro/civitaspo/embulk/runner/AsyncEmbulkRunnerService.java b/src/main/java/pro/civitaspo/embulk/runner/AsyncEmbulkRunnerService.java new file mode 100644 index 0000000..18fb129 --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/runner/AsyncEmbulkRunnerService.java @@ -0,0 +1,32 @@ +package pro.civitaspo.embulk.runner; + +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import org.embulk.exec.ExecutionResult; +import org.embulk.spi.Exec; +import org.slf4j.Logger; + +public class AsyncEmbulkRunnerService + extends AbstractExecutionThreadService +{ + private static final Logger logger = Exec.getLogger(AsyncEmbulkRunnerService.class); + private final EmbulkRunner runner; + + public AsyncEmbulkRunnerService(EmbulkRunner runner) + { + this.runner = runner; + } + + @Override + protected void run() + throws Exception + { + ExecutionResult result = runner.run(); + if (result.isSkipped()) { + logger.warn("ExecutionResult: EmbulkRunner.run is skipped"); + } + if (!result.getIgnoredExceptions().isEmpty()) { + result.getIgnoredExceptions().forEach(e -> logger.warn("Ignored Error is found", e)); + } + logger.debug("Execution Result: {}", result); + } +} diff --git a/src/main/java/pro/civitaspo/embulk/runner/EmbulkRunner.java b/src/main/java/pro/civitaspo/embulk/runner/EmbulkRunner.java new file mode 100644 index 0000000..17cb042 --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/runner/EmbulkRunner.java @@ -0,0 +1,125 @@ +package pro.civitaspo.embulk.runner; + +import com.google.common.collect.Lists; +import org.embulk.EmbulkEmbed; +import org.embulk.config.ConfigException; +import org.embulk.config.ConfigSource; +import org.embulk.exec.ExecutionResult; +import org.embulk.guice.LifeCycleInjector; +import org.embulk.spi.Exec; +import org.slf4j.Logger; +import pro.civitaspo.embulk.spi.ElapsedTime; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.Optional; + +public class EmbulkRunner +{ + private static final Logger logger = Exec.getLogger(EmbulkRunner.class); + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private ConfigSource execConfig; + private ConfigSource inputConfig; + private List filtersConfig; + private ConfigSource outputConfig; + + public Builder() + { + } + + public Builder execConfig(ConfigSource execConfig) + { + this.execConfig = execConfig; + return this; + } + + public Builder inputConfig(ConfigSource inputConfig) + { + this.inputConfig = inputConfig; + return this; + } + + public Builder filterConfig(List filtersConfig) + { + this.filtersConfig = filtersConfig; + return this; + } + + public Builder outputConfig(ConfigSource outputConfig) + { + this.outputConfig = outputConfig; + return this; + } + + public EmbulkRunner build() + { + return new EmbulkRunner(buildConfig()); + } + + protected ConfigSource buildConfig() + { + ConfigSource config = Exec.newConfigSource(); + + config.set("exec", Optional.ofNullable(execConfig).orElse(Exec.newConfigSource())); + config.set("in", Optional.ofNullable(inputConfig).orElseThrow(() -> new ConfigException("in: is null."))); + config.set("filters", Optional.ofNullable(filtersConfig).orElse(Lists.newArrayList())); + config.set("out", Optional.ofNullable(outputConfig).orElseThrow(() -> new ConfigException("out: is null."))); + + return config; + } + } + + private final ConfigSource config; + + EmbulkRunner(ConfigSource config) + { + this.config = config; + } + + private EmbulkEmbed newEmbulkEmbed() + { + try { + Constructor constructor = EmbulkEmbed.class + .getDeclaredConstructor(ConfigSource.class, LifeCycleInjector.class); + constructor.setAccessible(true); + return constructor.newInstance(null, Exec.getInjector()); + } + catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + throw new ConfigException(e); + } + } + + public ExecutionResult run() + { + // TODO: expose config without secret configurations. + + return ElapsedTime.measure(new ElapsedTime.Measurable() + { + @Override + public void onStart() + { + logger.info("Start: Embulk Run"); + } + + @Override + public void onFinished(long elapsedMillis) + { + logger.info("Finished: Embulk Run (Elapsed: {} ms)", elapsedMillis); + } + + @Override + public ExecutionResult run() + { + return newEmbulkEmbed().run(config); + } + }); + } +} diff --git a/src/main/java/pro/civitaspo/embulk/spi/DataBuilder.java b/src/main/java/pro/civitaspo/embulk/spi/DataBuilder.java new file mode 100644 index 0000000..8daaaa9 --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/spi/DataBuilder.java @@ -0,0 +1,41 @@ +package pro.civitaspo.embulk.spi; + +import org.embulk.spi.Column; +import org.embulk.spi.Schema; +import org.embulk.spi.time.Timestamp; +import org.msgpack.value.Value; + +public interface DataBuilder +{ + Schema getSchema(); + + void addRecord(); + + void setNull(Column column); + + void setNull(int columnIndex); + + void setBoolean(Column column, boolean v); + + void setBoolean(int columnIndex, boolean v); + + void setString(Column column, String v); + + void setString(int columnIndex, String v); + + void setLong(Column column, long v); + + void setLong(int columnIndex, long v); + + void setDouble(Column column, double v); + + void setDouble(int columnIndex, double v); + + void setTimestamp(Column column, Timestamp v); + + void setTimestamp(int columnIndex, Timestamp v); + + void setJson(Column column, Value v); + + void setJson(int columnIndex, Value v); +} diff --git a/src/main/java/pro/civitaspo/embulk/spi/DataReader.java b/src/main/java/pro/civitaspo/embulk/spi/DataReader.java new file mode 100644 index 0000000..4f87b34 --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/spi/DataReader.java @@ -0,0 +1,41 @@ +package pro.civitaspo.embulk.spi; + +import org.embulk.spi.Column; +import org.embulk.spi.Schema; +import org.embulk.spi.time.Timestamp; +import org.msgpack.value.Value; + +public interface DataReader +{ + Schema getSchema(); + + boolean nextRecord(); + + boolean isNull(Column column); + + boolean isNull(int columnIndex); + + boolean getBoolean(Column column); + + boolean getBoolean(int columnIndex); + + String getString(Column column); + + String getString(int columnIndex); + + long getLong(Column column); + + long getLong(int columnIndex); + + double getDouble(Column column); + + double getDouble(int columnIndex); + + Timestamp getTimestamp(Column column); + + Timestamp getTimestamp(int columnIndex); + + Value getJson(Column column); + + Value getJson(int columnIndex); +} diff --git a/src/main/java/pro/civitaspo/embulk/spi/ElapsedTime.java b/src/main/java/pro/civitaspo/embulk/spi/ElapsedTime.java new file mode 100644 index 0000000..7c51d40 --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/spi/ElapsedTime.java @@ -0,0 +1,81 @@ +package pro.civitaspo.embulk.spi; + +public class ElapsedTime +{ + private final static long DEFAULT_POLLING_INTERVAL = 1_000; // ms + + private static long getNow() + { + return System.currentTimeMillis(); + } + + private static long getElapsed(long start) + { + return System.currentTimeMillis() - start; + } + + private static void waitUntilNextPolling(long pollingInterval) + { + try { + Thread.sleep(pollingInterval); + } + catch (InterruptedException e) { + // Do Nothing + } + } + + public interface Measurable + { + T run(); + + void onStart(); + + void onFinished(long elapsedMillis); + } + + public interface Pollable + { + boolean poll(); + + T getResult(); + + void onStart(); + + void onWaiting(long elapsedMillis); + + void onFinished(long elapsedMillis); + } + + public static T measure(Measurable measurable) + { + long start = getNow(); + measurable.onStart(); + try { + return measurable.run(); + } + finally { + measurable.onFinished(getElapsed(start)); + } + } + + public static T measureWithPolling(long pollingInterval, Pollable pollable) + { + long start = getNow(); + pollable.onStart(); + while (!pollable.poll()) { + pollable.onWaiting(getElapsed(start)); + waitUntilNextPolling(pollingInterval); + } + try { + return pollable.getResult(); + } + finally { + pollable.onFinished(getElapsed(start)); + } + } + + public static T measureWithPolling(Pollable pollable) + { + return measureWithPolling(DEFAULT_POLLING_INTERVAL, pollable); + } +} diff --git a/src/main/java/pro/civitaspo/embulk/spi/PageBuilder.java b/src/main/java/pro/civitaspo/embulk/spi/PageBuilder.java new file mode 100644 index 0000000..40ac47d --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/spi/PageBuilder.java @@ -0,0 +1,15 @@ +package pro.civitaspo.embulk.spi; + +import org.embulk.spi.BufferAllocator; +import org.embulk.spi.PageOutput; +import org.embulk.spi.Schema; + +public class PageBuilder + extends org.embulk.spi.PageBuilder + implements DataBuilder +{ + public PageBuilder(BufferAllocator allocator, Schema schema, PageOutput output) + { + super(allocator, schema, output); + } +} diff --git a/src/main/java/pro/civitaspo/embulk/spi/PageReader.java b/src/main/java/pro/civitaspo/embulk/spi/PageReader.java new file mode 100644 index 0000000..8853125 --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/spi/PageReader.java @@ -0,0 +1,13 @@ +package pro.civitaspo.embulk.spi; + +import org.embulk.spi.Schema; + +public class PageReader + extends org.embulk.spi.PageReader + implements DataReader +{ + public PageReader(Schema schema) + { + super(schema); + } +} diff --git a/src/main/java/pro/civitaspo/embulk/spi/StandardColumnVisitor.java b/src/main/java/pro/civitaspo/embulk/spi/StandardColumnVisitor.java new file mode 100644 index 0000000..64a14bb --- /dev/null +++ b/src/main/java/pro/civitaspo/embulk/spi/StandardColumnVisitor.java @@ -0,0 +1,66 @@ +package org.embulk.filter.copy.util; + +import org.embulk.spi.Column; +import org.embulk.spi.ColumnVisitor; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.PageReader; +import pro.civitaspo.embulk.spi.DataBuilder; +import pro.civitaspo.embulk.spi.DataReader; + +public class StandardColumnVisitor + implements ColumnVisitor +{ + private final DataReader reader; + private final DataBuilder builder; + + public StandardColumnVisitor(DataReader reader, DataBuilder builder) + { + this.reader = reader; + this.builder = builder; + } + + private void nullOr(Column column, Runnable runnable) + { + if (reader.isNull(column)) { + builder.setNull(column); + return; + } + runnable.run(); + } + + @Override + public void booleanColumn(Column column) + { + nullOr(column, () -> builder.setBoolean(column, reader.getBoolean(column))); + } + + @Override + public void longColumn(Column column) + { + nullOr(column, () -> builder.setLong(column, reader.getLong(column))); + } + + @Override + public void doubleColumn(Column column) + { + nullOr(column, () -> builder.setDouble(column, reader.getDouble(column))); + } + + @Override + public void stringColumn(Column column) + { + nullOr(column, () -> builder.setString(column, reader.getString(column))); + } + + @Override + public void timestampColumn(Column column) + { + nullOr(column, () -> builder.setTimestamp(column, reader.getTimestamp(column))); + } + + @Override + public void jsonColumn(Column column) + { + nullOr(column, () -> builder.setJson(column, reader.getJson(column))); + } +} diff --git a/src/main/resources/META-INF/services/org.embulk.spi.Extension b/src/main/resources/META-INF/services/org.embulk.spi.Extension new file mode 100644 index 0000000..b167923 --- /dev/null +++ b/src/main/resources/META-INF/services/org.embulk.spi.Extension @@ -0,0 +1 @@ +org.embulk.filter.join_file.plugin.InternalPluginExtention