diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index bd3a443..6110f4b 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -15,7 +15,7 @@ jobs: - ubuntu-latest name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: ruby/setup-ruby@v1 with: ruby-version: ${{ matrix.ruby }} @@ -23,6 +23,6 @@ jobs: env: CI: true run: | - gem install bundler rake + gem install rake bundle install --jobs 4 --retry 3 bundle exec rake test diff --git a/fluent-plugin-s3.gemspec b/fluent-plugin-s3.gemspec index 8088a41..10ec13b 100644 --- a/fluent-plugin-s3.gemspec +++ b/fluent-plugin-s3.gemspec @@ -19,6 +19,7 @@ Gem::Specification.new do |gem| gem.add_dependency "fluentd", [">= 0.14.22", "< 2"] gem.add_dependency "aws-sdk-s3", "~> 1.60" gem.add_dependency "aws-sdk-sqs", "~> 1.23" + gem.add_dependency 'zstd-ruby' gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", ">= 1.0.3" diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index dc63f07..001bb4f 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -6,6 +6,7 @@ require 'time' require 'tempfile' require 'securerandom' +require 'zstd-ruby' module Fluent::Plugin class S3Output < Output @@ -630,6 +631,28 @@ def compress(chunk, tmp) end end + class ZstdCompressor < Compressor + def ext + 'zst'.freeze + end + + def content_type + 'application/x-zst'.freeze + end + + def compress(chunk, tmp) + uncompressed_data = '' + chunk.open do |io| + uncompressed_data = io.read + end + compressed_data = Zstd.compress(uncompressed_data, level: @level) + tmp.write(compressed_data) + rescue => e + log.warn "zstd compression failed: #{e.message}" + raise e + end + end + class TextCompressor < Compressor def ext 'txt'.freeze @@ -658,7 +681,8 @@ def content_type { 'gzip' => GzipCompressor, 'json' => JsonCompressor, - 'text' => TextCompressor + 'text' => TextCompressor, + 'zstd' => ZstdCompressor }.each { |name, compressor| COMPRESSOR_REGISTRY.register(name, compressor) } diff --git a/test/test_out_s3.rb b/test/test_out_s3.rb index cbf7860..1f62b10 100644 --- a/test/test_out_s3.rb +++ b/test/test_out_s3.rb @@ -109,6 +109,14 @@ def test_configure_with_mime_type_lzo assert(e.is_a?(Fluent::ConfigError)) end + def test_configure_with_mime_type_zstd + conf = CONFIG.clone + conf << "\nstore_as zstd\n" + d = create_driver(conf) + assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext + assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type + end + def test_configure_with_path_style conf = CONFIG.clone conf << "\nforce_path_style true\n" @@ -456,6 +464,33 @@ def test_write_with_custom_s3_object_key_format_containing_hex_random_placeholde FileUtils.rm_f(s3_local_file_path) end + def test_write_with_zstd + setup_mocks(true) + s3_local_file_path = "/tmp/s3-test.zst" + + expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst" + + setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path) + + config = CONFIG_TIME_SLICE + "\nstore_as zstd\n" + d = create_time_sliced_driver(config) + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, { "a" => 1 }) + d.feed(time, { "a" => 2 }) + end + + File.open(s3_local_file_path, 'rb') do |file| + compressed_data = file.read + uncompressed_data = Zstd.decompress(compressed_data) + expected_data = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + assert_equal expected_data, uncompressed_data + end + FileUtils.rm_f(s3_local_file_path) + end + class MockResponse attr_reader :data