Skip to content

Conversation

@ShelbyZ
Copy link
Contributor

@ShelbyZ ShelbyZ commented Dec 12, 2025

  • Bring over simple_aggregation from golang plugin versions
  • Add unit/runtime tests

firehose

          [OUTPUT]
            Name                kinesis_firehose
            Match               application.*
            region              ${AWS_REGION}
            delivery_stream     {{ .Values.firehoseStreamName }}
            retry_limit         3
            workers             1
            simple_aggregation  On

kinesis

          [OUTPUT]
            Name                kinesis_streams
            Match               application.*
            region              ${AWS_REGION}
            stream              {{ .Values.dataStreamName }}
            auto_retry_requests On
            simple_aggregation  On

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • [WIP] Debug log output from testing the change
  • [WIP] Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

fluent/fluent-bit-docs#2299

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features
    • Optional "simple_aggregation" mode for Kinesis Streams and Firehose outputs (default: off) to combine multiple records into single API requests, with newline/time/log-key handling and automatic flush on buffer-full.
    • Shared AWS in-memory aggregation support added for consistent aggregation across AWS outputs.
  • Tests
    • New unit and runtime tests covering aggregation behavior, edge cases, compression, time/log key variants, and large/boundary workloads.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 12, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a new AWS in-memory aggregation API and implementation, and integrates an optional simple_aggregation path into Kinesis Streams and Firehose plugins with buffer management, event processing, finalization, send-on-full retry, and accompanying unit and runtime tests.

Changes

Cohort / File(s) Summary
New public aggregation API
include/fluent-bit/aws/flb_aws_aggregation.h
Adds struct flb_aws_agg_buffer and public APIs: flb_aws_aggregation_init, destroy, reset, add, finalize, and process_event with documented return codes and buffer semantics.
Aggregation implementation & build
src/aws/flb_aws_aggregation.c, src/aws/CMakeLists.txt
Implements in-memory aggregation (msgpack→JSON conversion, size checks, time key injection, newline handling), exportable APIs, and adds flb_aws_aggregation.c to flb-aws build.
Firehose plugin integration
plugins/out_kinesis_firehose/firehose.h, plugins/out_kinesis_firehose/firehose.c, plugins/out_kinesis_firehose/firehose_api.c
Adds agg_buf and agg_buf_initialized to flush struct, simple_aggregation flag to context/config, threads ctx into new_flush_buffer, initializes/destroys aggregation buffer, integrates simple aggregation processing, finalize/send/reset and buffer-full retry logic.
Kinesis Streams plugin integration
plugins/out_kinesis_streams/kinesis.h, plugins/out_kinesis_streams/kinesis.c, plugins/out_kinesis_streams/kinesis_api.c
Mirrors Firehose changes: include aggregation header, add agg fields to flush, simple_aggregation flag to ctx, update new_flush_buffer signature, add aggregation processing, finalize/send/reset and cleanup.
Unit tests
tests/internal/CMakeLists.txt, tests/internal/aws_aggregation.c
Adds unit tests for aggregation API covering init/destroy, add/finalize behavior, boundary conditions, resets, large-scale additions, and NULL/edge-case handling; adds test source to CMake.
Runtime tests
tests/runtime/out_firehose.c, tests/runtime/out_kinesis.c
Adds multiple runtime tests for Firehose (9) and Kinesis (4) exercising simple aggregation with time/log keys, compression, many records, error cases, and formatting (note: some duplicated test definitions in Kinesis tests).

Sequence Diagram(s)

sequenceDiagram
    actor Input
    participant Plugin as Kinesis/Firehose Plugin
    participant AggBuf as Aggregation Buffer
    participant AWS as AWS Service

    Input->>Plugin: deliver event(s)
    alt simple_aggregation disabled
        Plugin->>AWS: send batched records immediately
    else simple_aggregation enabled
        loop per event
            Plugin->>AggBuf: flb_aws_aggregation_process_event(...)
            alt FLB_AWS_AGG_OK
                AggBuf->>AggBuf: append JSON + newline
            else FLB_AWS_AGG_FULL
                AggBuf-->>Plugin: buffer full (signal)
                Plugin->>AggBuf: flb_aws_aggregation_finalize(add_final_newline?)
                Plugin->>AWS: send aggregated record (encode/compress as configured)
                Plugin->>AggBuf: flb_aws_aggregation_reset()
                Plugin->>AggBuf: retry current event
            else FLB_AWS_AGG_DISCARD/ERROR
                Plugin->>Plugin: drop/log and continue
            end
        end
        Plugin->>AggBuf: flb_aws_aggregation_finalize(end of batch)
        Plugin->>AWS: send final aggregated record
        Plugin->>AggBuf: flb_aws_aggregation_reset()
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Focus review on:
    • src/aws/flb_aws_aggregation.c — buffer allocation, bounds checks, return-code semantics, and thread-safety assumptions.
    • Integration in plugins/*_api.c — retry-on-full flow, finalize/send/reset semantics, compression/base64 paths.
    • Changes to new_flush_buffer signatures and flush teardown for resource leaks.
    • Tests: duplicated entries and correctness in tests/runtime/out_kinesis.c.

Suggested reviewers

  • edsiper
  • koleini
  • fujimotos

Poem

🐰 I nibble bytes and stitch them tight,
In a burrowed buffer snug by night.
When full I shout, "Finalize — then go!"
Pack records, send, and reset — hop, hop, ho! 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 53.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'aws: Implement simple_aggregation operation' accurately summarizes the main change: adding simple_aggregation functionality to AWS-related plugins.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
plugins/out_kinesis_firehose/firehose_api.c (1)

386-480: Consider adding a guard for events array.

The aggregated record finalization and sending logic is well-structured. The compression and base64 encoding paths are handled correctly, and memory management appears sound.

One minor consideration: at line 461, the code assumes buf->events is allocated. While this should always be true when called via process_and_send_records, a defensive check could prevent potential crashes in edge cases.

Optional defensive check:

+    /* Ensure events array is allocated */
+    if (buf->events == NULL) {
+        flb_plg_error(ctx->ins, "Events buffer not initialized");
+        flb_aws_aggregation_reset(&buf->agg_buf);
+        return -1;
+    }
+
     /* Create event record */
     event = &buf->events[0];
src/aws/flb_aws_aggregation.c (1)

113-233: LGTM: Event processing logic is correct.

The flb_aws_aggregation_process_event function properly handles:

  • msgpack-to-JSON conversion
  • log_key extraction (removing enclosing quotes)
  • Size validation at multiple stages
  • time_key injection with custom formatting
  • Newline appending before aggregation

The function correctly returns different codes for different scenarios (buffer full, record too large, error), enabling proper caller handling.

Optional: Consider future refactoring opportunity.

The event processing logic in flb_aws_aggregation_process_event shares significant code with the non-aggregation process_event function in kinesis_api.c (msgpack conversion, log_key handling, time_key injection). While the current duplication is acceptable for clarity and the aggregation/non-aggregation paths diverge at the end, a future refactor could extract common logic into shared helper functions.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c06c124 and 158fb26.

📒 Files selected for processing (13)
  • include/fluent-bit/aws/flb_aws_aggregation.h (1 hunks)
  • plugins/out_kinesis_firehose/firehose.c (4 hunks)
  • plugins/out_kinesis_firehose/firehose.h (3 hunks)
  • plugins/out_kinesis_firehose/firehose_api.c (7 hunks)
  • plugins/out_kinesis_streams/kinesis.c (4 hunks)
  • plugins/out_kinesis_streams/kinesis.h (3 hunks)
  • plugins/out_kinesis_streams/kinesis_api.c (9 hunks)
  • src/aws/CMakeLists.txt (1 hunks)
  • src/aws/flb_aws_aggregation.c (1 hunks)
  • tests/internal/CMakeLists.txt (1 hunks)
  • tests/internal/aws_aggregation.c (1 hunks)
  • tests/runtime/out_firehose.c (1 hunks)
  • tests/runtime/out_kinesis.c (2 hunks)
🧰 Additional context used
🧠 Learnings (8)
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.

Applied to files:

  • include/fluent-bit/aws/flb_aws_aggregation.h
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
🧬 Code graph analysis (7)
tests/internal/aws_aggregation.c (1)
src/aws/flb_aws_aggregation.c (5)
  • flb_aws_aggregation_init (30-46)
  • flb_aws_aggregation_destroy (48-56)
  • flb_aws_aggregation_add (58-77)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
include/fluent-bit/aws/flb_aws_aggregation.h (1)
src/aws/flb_aws_aggregation.c (6)
  • flb_aws_aggregation_init (30-46)
  • flb_aws_aggregation_destroy (48-56)
  • flb_aws_aggregation_add (58-77)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_process_event (113-233)
plugins/out_kinesis_firehose/firehose.c (3)
plugins/out_kinesis_streams/kinesis.c (1)
  • new_flush_buffer (311-354)
src/aws/flb_aws_aggregation.c (1)
  • flb_aws_aggregation_init (30-46)
plugins/out_kinesis_firehose/firehose_api.c (1)
  • flush_destroy (1114-1126)
tests/runtime/out_firehose.c (2)
src/flb_lib.c (9)
  • flb_create (143-225)
  • flb_input (266-276)
  • flb_input_set (305-335)
  • flb_output (279-289)
  • flb_output_set (520-551)
  • flb_start (983-994)
  • flb_lib_push (843-870)
  • flb_stop (1011-1055)
  • flb_destroy (228-263)
tests/runtime/out_kinesis.c (1)
  • flb_test_firehose_success (13-45)
src/aws/flb_aws_aggregation.c (4)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
src/flb_pack.c (1)
  • flb_msgpack_to_json (1011-1024)
include/fluent-bit/flb_compat.h (1)
  • gmtime_r (75-81)
src/aws/flb_aws_util.c (1)
  • flb_aws_strftime_precision (1240-1326)
plugins/out_kinesis_streams/kinesis_api.c (2)
src/aws/flb_aws_aggregation.c (4)
  • flb_aws_aggregation_process_event (113-233)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_destroy (48-56)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
plugins/out_kinesis_firehose/firehose_api.c (3)
plugins/out_kinesis_streams/kinesis_api.c (4)
  • send_log_events (486-555)
  • process_event_simple_aggregation (213-230)
  • send_aggregated_record (422-483)
  • reset_flush_buf (414-419)
src/aws/flb_aws_aggregation.c (4)
  • flb_aws_aggregation_process_event (113-233)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_destroy (48-56)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
🔇 Additional comments (35)
src/aws/CMakeLists.txt (1)

18-18: LGTM!

The aggregation source file is correctly added to the AWS library build. The unconditional inclusion is appropriate since aggregation is a core feature of the kinesis/firehose plugins.

tests/internal/CMakeLists.txt (1)

141-141: LGTM!

The aggregation unit test is correctly added under the FLB_AWS conditional, following the established pattern for AWS-related tests.

plugins/out_kinesis_firehose/firehose.h (3)

29-29: LGTM!

The aggregation header include is correctly placed with other Fluent Bit headers.


60-62: LGTM!

The aggregation buffer and initialization flag are appropriately added to the flush structure. The agg_buf_initialized flag properly tracks whether cleanup is needed in flush_destroy.


102-102: LGTM!

The simple_aggregation configuration option is correctly added to the context structure.

plugins/out_kinesis_streams/kinesis.h (3)

29-29: LGTM!

The aggregation header include is correctly placed, consistent with the firehose plugin.


60-62: LGTM!

The aggregation buffer fields are consistent with the firehose plugin structure.


100-100: LGTM!

The simple_aggregation configuration option is correctly added to the Kinesis context structure.

tests/internal/aws_aggregation.c (10)

1-27: LGTM!

Well-structured test file with appropriate includes and constant definition. The MAX_RECORD_SIZE matches the expected usage in production code.


29-47: LGTM!

Good lifecycle test verifying init sets expected values and destroy properly cleans up state.


49-107: LGTM!

Comprehensive tests for single and multiple record additions with proper content verification via memcmp.


109-163: LGTM!

Good boundary testing for buffer-full scenarios, including both oversized single records and gradual fill to capacity.


165-200: LGTM!

Properly tests both Kinesis Streams mode (no newline) and Firehose mode (with newline) finalization paths.


202-254: LGTM!

Good coverage of edge cases: empty buffer finalization and reset/reuse cycles.


256-322: LGTM!

Excellent coverage of large-scale additions and NULL parameter handling. The NULL tests ensure robustness against invalid inputs.


324-404: LGTM!

Good boundary condition tests including exact capacity fill and multiple reset cycles.


406-522: LGTM!

Useful edge case tests covering tiny buffers, double finalize, add-after-finalize, and alternating patterns. These tests verify the API behaves correctly in non-standard usage scenarios.


524-544: LGTM!

Comprehensive test list covering all aggregation API functions and edge cases.

plugins/out_kinesis_firehose/firehose_api.c (6)

41-41: LGTM!

The aggregation header is correctly included alongside other AWS headers.


57-58: LGTM!

Forward declaration is necessary since send_aggregated_record calls send_log_events.


148-169: LGTM!

Clean wrapper function that delegates to the shared aggregation implementation. Parameters are correctly passed including MAX_EVENT_SIZE as the limit.


571-600: LGTM!

The simple aggregation path is correctly integrated into add_event. The retry logic properly handles buffer-full conditions by sending the aggregated record and retrying.


761-767: LGTM!

The final send logic correctly branches between aggregation mode and normal mode for any remaining data.


1114-1125: LGTM!

Proper cleanup of the aggregation buffer when agg_buf_initialized flag is set. This prevents memory leaks and double-free issues.

plugins/out_kinesis_streams/kinesis_api.c (5)

55-56: LGTM: Forward declaration added for function ordering.

The forward declaration is necessary since send_aggregated_record (defined at line 422) calls send_log_events before its definition.


209-230: LGTM: Clean delegation to shared aggregation API.

The wrapper appropriately delegates to the shared flb_aws_aggregation_process_event implementation, passing all required context from the Kinesis-specific structures.


573-601: LGTM: Aggregation path correctly handles buffer-full scenario.

The retry logic properly handles the buffer-full case (ret == 1) by sending the accumulated aggregated record and retrying the current event. The mechanism prevents infinite loops since oversized individual records return 2 (discard), which breaks the retry cycle.


751-756: LGTM: Proper end-of-batch handling and resource cleanup.

The end-of-batch logic correctly routes through the aggregation path when enabled, and the cleanup path properly destroys the aggregation buffer when initialized.

Also applies to: 1104-1106


421-483: Buffer sizing concern does not reflect actual constant values.

The buffer size constants are appropriately defined with sufficient margin. PUT_RECORDS_PAYLOAD_SIZE is 5,242,880 bytes (~5.24 MB), while the maximum base64-encoded aggregated record is approximately 1.4 MB (MAX_EVENT_SIZE of 1,048,556 bytes with 33% base64 overhead). The check at line 456 is defensive programming but will not realistically trigger under normal conditions, as the buffer is 3.76x larger than the maximum possible encoded output.

Likely an incorrect or invalid review comment.

plugins/out_kinesis_firehose/firehose.c (1)

317-354: LGTM: Firehose aggregation initialization consistent with Kinesis.

The changes properly integrate aggregation support:

  • Function signature updated to accept context for conditional initialization
  • Aggregation buffer initialized when simple_aggregation is enabled
  • Proper error handling with buffer destruction on initialization failure
  • Configuration option added with appropriate defaults and description

The implementation mirrors the Kinesis Streams changes, ensuring consistency across both AWS output plugins.

Also applies to: 371-371, 524-529

tests/runtime/out_firehose.c (1)

192-524: LGTM: Comprehensive test coverage for Firehose aggregation.

The nine new tests cover a wide range of scenarios:

  • Basic aggregation functionality
  • Integration with time_key and log_key features
  • Volume testing with many records
  • Interaction with compression
  • Combined parameter configurations
  • Edge cases (empty records, error handling)
  • Custom time formats

All tests follow the established testing patterns and are properly registered.

Also applies to: 533-541

plugins/out_kinesis_streams/kinesis.c (1)

311-351: LGTM: Kinesis Streams aggregation initialization.

The implementation is consistent with the Firehose changes:

  • Context-aware buffer initialization
  • Proper error handling with cleanup on failure
  • Configuration option matching Firehose conventions

Also applies to: 368-368, 518-523

include/fluent-bit/aws/flb_aws_aggregation.h (1)

1-90: LGTM: Well-designed aggregation API.

The header defines a clean, focused API:

  • Simple buffer structure with clear fields
  • Lifecycle management functions (init/destroy/reset)
  • Data manipulation functions (add/finalize)
  • High-level event processing function
  • Return value semantics are clearly documented

Based on learnings, function descriptions are not required for Fluent Bit headers.

tests/runtime/out_kinesis.c (1)

298-443: LGTM: Adequate test coverage for Kinesis Streams aggregation.

The four new tests cover key scenarios:

  • Basic aggregation with multiple small records
  • Integration with time_key and log_key features
  • Volume testing with 50 records to validate aggregation efficiency

Tests follow established patterns and are properly registered. Coverage aligns with Firehose testing approach.

Also applies to: 455-458

src/aws/flb_aws_aggregation.c (1)

30-107: LGTM: Core aggregation functions are well-implemented.

The buffer lifecycle and manipulation functions demonstrate:

  • Proper null pointer checks
  • Correct memory management with flb_malloc/flb_free
  • Appropriate return codes matching the API documentation
  • Buffer overflow prevention in flb_aws_aggregation_add

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link
Contributor

@cosmo0920 cosmo0920 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation concept is good but our commit message linter complains error and we usually separate commits for this type of PR like:

  • aws: aws_aggregation: Implement aggregation operation
  • out_kinesis_firehose: Add simple_aggregation operation
  • out_kinesis_stream: Add simple_aggregation operation
  • tests: internal: aws_aggregation
  • out_kinesis_firehose: tests: Add a test case of simple_aggregation operation
  • out_kinesis_stream: tests: Add a test case of Add simple_aggregation operation

Plus, compilation errors are also happening. Could you take a look on this?

@ShelbyZ
Copy link
Contributor Author

ShelbyZ commented Dec 18, 2025

The implementation concept is good but our commit message linter complains error and we usually separate commits for this type of PR like:

  • aws: aws_aggregation: Implement aggregation operation
  • out_kinesis_firehose: Add simple_aggregation operation
  • out_kinesis_stream: Add simple_aggregation operation
  • tests: internal: aws_aggregation
  • out_kinesis_firehose: tests: Add a test case of simple_aggregation operation
  • out_kinesis_stream: tests: Add a test case of Add simple_aggregation operation

Plus, compilation errors are also happening. Could you take a look on this?

Should be updated in the last force push. The build failure was some weird refactoring bug that removed the struct value which not needed

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
tests/runtime/out_kinesis.c (2)

298-443: Consider adding assertions to verify aggregation behavior.

The new aggregation tests successfully set up the plugin configuration and push records, but they rely solely on the test mode mocking and don't include explicit assertions to verify that:

  • Records were actually aggregated into fewer API calls
  • The aggregated payload format is correct
  • Buffer boundaries are respected

While the sleep-and-succeed pattern is consistent with existing tests in this file, consider whether additional test infrastructure could validate the aggregation behavior more thoroughly.


435-438: Verify buffer size for formatted records.

The loop uses a 100-byte buffer with snprintf to format records. While this should be sufficient for the test pattern [1, {"id":%d,"msg":"test"}], consider using sizeof in the snprintf call for explicit bounds checking:

🔎 Suggested improvement:
     for (i = 0; i < 50; i++) {
-        snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"test\"}]", i);
+        ret = snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"test\"}]", i);
+        if (ret >= sizeof(record)) {
+            flb_plg_warn(ctx->ins, "Record truncated in test");
+        }
         flb_lib_push(ctx, in_ffd, record, strlen(record));
     }
src/aws/flb_aws_aggregation.c (1)

179-183: Handle early return after time formatting failure.

At lines 179-182, when flb_aws_strftime_precision returns 0 (formatting failed), the code logs an error and frees out_buf, but then falls through to the else block at line 184 which will try to use out_buf.

Looking more carefully, when len == 0, the code frees out_buf at line 182, then at line 184 there's an else block that won't execute because we're in the if (len == 0) branch. So the control flow continues to line 204.

Actually, I misread this. Let me recheck:

if (len == 0) {
    flb_plg_error(ins, "Failed to add time_key %s to record, %s",
                  time_key, stream_name);
    flb_free(out_buf);
}
else {
    // time_key_ptr manipulation
    ...
}

This is correct - when len == 0, it frees out_buf and falls through. When len > 0, it goes into the else block. The code then continues to line 204 for the size check. This is safe because written hasn't been modified when len == 0, so the size check still works.

However, there's no explicit return or discard after time formatting failure. The record will be added without the time_key, which may not be the intended behavior.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 158fb26 and dc1eb6b.

📒 Files selected for processing (13)
  • include/fluent-bit/aws/flb_aws_aggregation.h (1 hunks)
  • plugins/out_kinesis_firehose/firehose.c (4 hunks)
  • plugins/out_kinesis_firehose/firehose.h (3 hunks)
  • plugins/out_kinesis_firehose/firehose_api.c (7 hunks)
  • plugins/out_kinesis_streams/kinesis.c (4 hunks)
  • plugins/out_kinesis_streams/kinesis.h (3 hunks)
  • plugins/out_kinesis_streams/kinesis_api.c (9 hunks)
  • src/aws/CMakeLists.txt (1 hunks)
  • src/aws/flb_aws_aggregation.c (1 hunks)
  • tests/internal/CMakeLists.txt (1 hunks)
  • tests/internal/aws_aggregation.c (1 hunks)
  • tests/runtime/out_firehose.c (1 hunks)
  • tests/runtime/out_kinesis.c (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • plugins/out_kinesis_streams/kinesis.h
🧰 Additional context used
🧠 Learnings (9)
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-09-04T12:35:36.904Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10825
File: plugins/out_s3/s3.c:3275-3282
Timestamp: 2025-09-04T12:35:36.904Z
Learning: The out_s3 plugin intentionally uses a simple numeric comparison for retry_limit (chunk->failures >= ctx->ins->retry_limit) rather than the standard Fluent Bit pattern that checks for FLB_OUT_RETRY_UNLIMITED (-1). The maintainer wants to keep this current behavior for consistency within the plugin.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.

Applied to files:

  • plugins/out_kinesis_firehose/firehose.h
🧬 Code graph analysis (8)
plugins/out_kinesis_firehose/firehose_api.c (1)
src/aws/flb_aws_aggregation.c (4)
  • flb_aws_aggregation_process_event (113-233)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_destroy (48-56)
plugins/out_kinesis_streams/kinesis_api.c (2)
src/aws/flb_aws_aggregation.c (4)
  • flb_aws_aggregation_process_event (113-233)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_destroy (48-56)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
tests/runtime/out_kinesis.c (1)
src/flb_lib.c (9)
  • flb_create (143-225)
  • flb_input (266-276)
  • flb_input_set (305-335)
  • flb_output (279-289)
  • flb_output_set (520-551)
  • flb_start (983-994)
  • flb_lib_push (843-870)
  • flb_stop (1011-1055)
  • flb_destroy (228-263)
plugins/out_kinesis_streams/kinesis.c (2)
plugins/out_kinesis_firehose/firehose.c (1)
  • new_flush_buffer (317-357)
src/aws/flb_aws_aggregation.c (1)
  • flb_aws_aggregation_init (30-46)
tests/internal/aws_aggregation.c (1)
src/aws/flb_aws_aggregation.c (5)
  • flb_aws_aggregation_init (30-46)
  • flb_aws_aggregation_destroy (48-56)
  • flb_aws_aggregation_add (58-77)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
include/fluent-bit/aws/flb_aws_aggregation.h (1)
src/aws/flb_aws_aggregation.c (6)
  • flb_aws_aggregation_init (30-46)
  • flb_aws_aggregation_destroy (48-56)
  • flb_aws_aggregation_add (58-77)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_process_event (113-233)
src/aws/flb_aws_aggregation.c (4)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
src/flb_pack.c (1)
  • flb_msgpack_to_json (1011-1024)
include/fluent-bit/flb_compat.h (1)
  • gmtime_r (75-81)
src/aws/flb_aws_util.c (1)
  • flb_aws_strftime_precision (1240-1326)
plugins/out_kinesis_firehose/firehose.c (3)
plugins/out_kinesis_streams/kinesis.c (1)
  • new_flush_buffer (311-354)
src/aws/flb_aws_aggregation.c (1)
  • flb_aws_aggregation_init (30-46)
plugins/out_kinesis_firehose/firehose_api.c (1)
  • flush_destroy (1121-1133)
🔇 Additional comments (23)
src/aws/CMakeLists.txt (1)

18-18: LGTM!

The new aggregation source file is correctly added to the AWS library build list, following the existing pattern for unconditional AWS components.

tests/internal/CMakeLists.txt (1)

141-141: LGTM!

The aggregation unit test is correctly added within the FLB_AWS conditional block, following the established pattern for AWS test files.

plugins/out_kinesis_firehose/firehose.h (1)

29-29: LGTM!

The header changes are well-structured:

  • New include for the aggregation API.
  • agg_buf and agg_buf_initialized fields in struct flush for per-flush aggregation state.
  • simple_aggregation flag in struct flb_firehose for runtime configuration.

The agg_buf_initialized flag pattern correctly mirrors the Kinesis Streams implementation.

Also applies to: 60-63, 102-102

tests/internal/aws_aggregation.c (1)

1-544: Excellent test coverage!

The test suite comprehensively covers the AWS aggregation API:

  • Lifecycle management (init/destroy/reset)
  • Single and multiple record additions
  • Buffer full detection and boundary conditions
  • Both Firehose (with newline) and Kinesis (without newline) finalization modes
  • Edge cases (NULL params, empty buffer, tiny buffer, exact boundaries)
  • Reuse patterns (reset cycles, add after finalize, double finalize)

This thorough coverage will help catch regressions in the aggregation module.

plugins/out_kinesis_firehose/firehose_api.c (3)

148-169: LGTM!

The process_event_simple_aggregation function is a clean wrapper around the shared flb_aws_aggregation_process_event implementation, correctly passing the Firehose-specific parameters including MAX_EVENT_SIZE and the delivery stream name.


386-480: Aggregation send logic is well-structured.

The send_aggregated_record function correctly handles:

  • Finalization with add_final_newline=1 for Firehose
  • Compression and base64 encoding paths
  • Proper memory management for both paths
  • Creating a single event record for the aggregated payload
  • Buffer reset after sending

768-774: LGTM!

Correct finalization and cleanup:

  • Lines 768-774: Properly routes to send_aggregated_record or send_log_events based on simple_aggregation flag.
  • Lines 1124-1126: Correctly checks agg_buf_initialized before calling destroy, preventing double-free or use-after-free issues.

Also applies to: 1124-1126

plugins/out_kinesis_streams/kinesis.c (2)

311-354: LGTM!

The aggregation initialization in new_flush_buffer is correctly implemented:

  • Accepts context parameter for config access
  • Initializes agg_buf_initialized to FLB_FALSE before attempting init
  • Properly handles init failure with cleanup
  • Sets agg_buf_initialized to FLB_TRUE only on success
  • Mirrors the Firehose implementation pattern for consistency

518-523: LGTM!

The simple_aggregation config map entry is correctly defined with:

  • Boolean type with "false" default
  • Proper offset binding to struct flb_kinesis
  • Clear description of the feature's purpose
plugins/out_kinesis_firehose/firehose.c (2)

317-357: LGTM: Aggregation buffer initialization is well-guarded.

The changes properly integrate aggregation support:

  • The ctx parameter enables access to the simple_aggregation flag
  • Initialization failure correctly cleans up and returns NULL
  • The agg_buf_initialized flag tracks state for proper cleanup

The integration follows the same pattern used in the Kinesis Streams plugin (see plugins/out_kinesis_streams/kinesis.c:310-353), ensuring consistency across both plugins.


524-529: Config option well-documented.

The simple_aggregation configuration properly:

  • Defaults to false (opt-in behavior)
  • Includes clear description of benefits
  • Uses boolean type for simple on/off control
  • Aligns with the similar option in kinesis_streams
tests/runtime/out_firehose.c (1)

192-524: Good coverage of edge cases with aggregation.

The Firehose aggregation tests are more comprehensive than the Kinesis tests, covering:

  • Compression combined with aggregation (line 339)
  • Empty/minimal records (line 415)
  • Error handling during aggregation (line 450)
  • Custom time formats (line 489)

This broader test coverage helps validate the aggregation feature works correctly across various configuration combinations.

plugins/out_kinesis_streams/kinesis_api.c (5)

55-56: Forward declaration enables clean separation.

The forward declaration of send_log_events allows the new send_aggregated_record function to call it without reordering the entire file.


209-230: Effective wrapper delegates to shared implementation.

The process_event_simple_aggregation wrapper correctly forwards all parameters to the shared flb_aws_aggregation_process_event API, avoiding code duplication between Kinesis Streams and Firehose plugins.


573-608: Aggregation retry logic addresses previous concern.

The guard at lines 584-590 checks if the aggregation buffer is empty after attempting to send, preventing the infinite loop flagged in the previous review. When ret == 0 && buf->agg_buf.agg_buf_offset == 0, the code discards the unprocessable record.

However, as noted in the previous comment, there may be an issue with checking agg_buf_offset after flb_aws_aggregation_reset has already been called. This needs verification to ensure the guard works as intended.


758-763: End-of-batch logic correctly handles both modes.

The conditional at lines 758-763 properly routes the final send through send_aggregated_record when aggregation is enabled, otherwise using the standard send_log_events path. This ensures any buffered aggregated records are flushed at the end of processing.


1111-1113: Aggregation buffer cleanup is properly guarded.

The destruction logic at lines 1111-1113 checks agg_buf_initialized before calling flb_aws_aggregation_destroy, preventing double-free or use of uninitialized buffers.

include/fluent-bit/aws/flb_aws_aggregation.h (1)

20-90: Well-designed aggregation API with clear documentation.

The header provides a clean public API for AWS aggregation:

  • Clear structure definition with descriptive field names
  • All functions documented with return value semantics
  • Return codes are consistent and well-explained (0=success, -1=error, 1=buffer full, 2=discard)
  • Include guards properly defined

The API design allows both Kinesis Streams and Firehose plugins to share the same aggregation logic, reducing code duplication.

src/aws/flb_aws_aggregation.c (5)

30-46: LGTM: Initialization properly validates and allocates.

The flb_aws_aggregation_init function:

  • Validates the buffer pointer
  • Allocates the requested size
  • Initializes all fields (size and offset)
  • Returns appropriate error codes

48-56: LGTM: Destruction safely handles all cases.

The flb_aws_aggregation_destroy function:

  • Checks both buf and buf->agg_buf before freeing
  • Nulls the pointer after freeing
  • Resets size and offset to maintain consistency

58-77: LGTM: Bounds checking prevents buffer overflow.

The flb_aws_aggregation_add function correctly:

  • Validates all input parameters
  • Checks if adding data would exceed max_record_size before copying
  • Returns 1 (buffer full) when limit would be exceeded, allowing caller to finalize and retry
  • Uses safe memcpy after validation

79-100: LGTM: Finalization handles both Kinesis and Firehose cases.

The flb_aws_aggregation_finalize function:

  • Returns -1 when there's no data to finalize (empty buffer)
  • The add_final_newline parameter allows Firehose (which needs it) and Kinesis Streams (which doesn't) to use the same API
  • Checks buffer size before adding the final newline

113-233: Shared event processing logic eliminates duplication.

The flb_aws_aggregation_process_event function consolidates the msgpack-to-JSON conversion, log_key extraction, time_key injection, and aggregation buffer management that was previously duplicated between Kinesis and Firehose plugins.

The function correctly:

  • Converts msgpack to JSON
  • Discards empty messages (return 2)
  • Handles log_key by stripping enclosing quotes
  • Checks size limits before and after time_key addition
  • Appends newline
  • Attempts to add to aggregation buffer
  • Returns appropriate codes: 0=success, 1=buffer full, 2=discard, -1=error

- Add helper methods to support aggregating records for aws outputs

Signed-off-by: Shelby Hagman <[email protected]>
- Adds testing for aws_aggregation

Signed-off-by: Shelby Hagman <[email protected]>
- Add simple_aggregation config parameter and implementation to plugin

Signed-off-by: Shelby Hagman <[email protected]>
- Adds tests to out_firehose

Signed-off-by: Shelby Hagman <[email protected]>
- Add simple_aggregation config parameter and implementation to plugin

Signed-off-by: Shelby Hagman <[email protected]>
- Adds tests to out_kinesis

Signed-off-by: Shelby Hagman <[email protected]>
@edsiper
Copy link
Member

edsiper commented Dec 22, 2025

@codex review

@chatgpt-codex-connector
Copy link

Codex Review: Didn't find any major issues. Keep it up!

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@ShelbyZ ShelbyZ requested a review from cosmo0920 December 22, 2025 22:27
@ShelbyZ
Copy link
Contributor Author

ShelbyZ commented Dec 22, 2025

@cosmo0920 the Commit Prefix Lint issue looks like a possible flexibility issue with the linter :)

❌ Commit c44d008482 failed:
Subject prefix 'aws:' does not match files changed.
Expected one of: aws:, aws_aggregation:, build:

aws: Implement simple_aggregation operation

@edsiper
Copy link
Member

edsiper commented Dec 23, 2025

we need to fix the linter, all good

@edsiper edsiper merged commit 0572b9c into fluent:master Dec 23, 2025
56 of 61 checks passed
@ShelbyZ ShelbyZ deleted the simple-aggregation branch December 23, 2025 16:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants