Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1df7215
removed "just"
bradhanks Apr 1, 2025
f99824c
removed "simply"
bradhanks Apr 1, 2025
e3800d9
removed "kind of/sort of"
bradhanks Apr 1, 2025
5b56d1f
removed "actually"
bradhanks Apr 1, 2025
40676e4
remove "quite"
bradhanks Apr 1, 2025
4edfce3
Edit for consistent header title case
bradhanks Apr 1, 2025
859c9cf
Used the text from the "Official Producers" page to remove passive vo…
bradhanks Apr 1, 2025
90e6a4e
removed "you", added active voice
bradhanks Apr 1, 2025
0f6fb7a
Removing passive voice wasn't as cut and dry here as it was for the p…
bradhanks Apr 1, 2025
4cd058d
remove "you"
bradhanks Apr 1, 2025
1895286
Refactor handling of failed messages for clarity and conciseness
bradhanks Apr 1, 2025
0101f1d
removed passive "you"
bradhanks Apr 1, 2025
d3027ea
removed weak/passive "you"
bradhanks Apr 1, 2025
d4265ee
See 7c706121a5693df75fab914d481c843525afbff7.
bradhanks Apr 1, 2025
f2ffc5f
remove passive/weak "you"
bradhanks Apr 1, 2025
787bbc7
passive/weak "you"
bradhanks Apr 1, 2025
3a2869c
*made edit to make the distinction between function and callback
bradhanks Apr 1, 2025
6c4a016
See 7c706121a5693df75fab914d481c843525afbff7.
bradhanks Apr 1, 2025
3f6bb4d
remove weak/passive "you"
bradhanks Apr 1, 2025
b37937a
edits to remove "you can"
bradhanks Apr 2, 2025
cf2497d
remove weak "you"
bradhanks Apr 2, 2025
267e5ca
* removed weak "you"
bradhanks Apr 2, 2025
859351f
Broadway.NoopAcknowledger edits
bradhanks Apr 2, 2025
18dde83
edited so the subject was doing the action
bradhanks Apr 2, 2025
3480c25
removed weak "you" + verb
bradhanks Apr 2, 2025
0b74efc
* parameter format edits
bradhanks Apr 2, 2025
5d838fe
removed "vague" might/could
bradhanks Apr 2, 2025
b81b2c0
edited out "you need to" (passive voice)
bradhanks Apr 2, 2025
812f5f0
removed "we need to" (passive)
bradhanks Apr 2, 2025
b7e0859
removed "you should" (passive)
bradhanks Apr 2, 2025
88ed9cf
removed foms of "need to" (passive/weak)
bradhanks Apr 2, 2025
d7f4e00
Passive constructions
bradhanks Apr 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

### Bug fix

* No longer set demand to `:accumulate` when draining, for compatibility with GenStage v1.2+. This means that any polling implementation must implement the `prepare_for_draining` callback and stop polling messages. You can check how [BroadwaySQS](https://github.com/dashbitco/broadway_sqs/commit/5b8f18a78e4760b5fcc839ad576be8c63345add0) tackles this problem as an example
* No longer set demand to `:accumulate` when draining, for compatibility with GenStage v1.2+. This means that any polling implementation must implement the `prepare_for_draining` callback and stop polling messages. Check how [BroadwaySQS](https://github.com/dashbitco/broadway_sqs/commit/5b8f18a78e4760b5fcc839ad576be8c63345add0) tackles this problem as an example

### Enhancements

Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Broadway takes the burden of defining concurrent GenStage topologies and provide

### Producers

There are several producers that you can use to integrate with existing services and technologies. [See the docs for detailed how-tos and supported producers](https://hexdocs.pm/broadway/introduction.html#official-producers).
Currently we officially support four Broadway producers that integrate with existing services and technologies. [See the docs for detailed how-tos and supported producers](https://hexdocs.pm/broadway/introduction.html#official-producers).

## Installation

Expand All @@ -43,7 +43,7 @@ end

## A quick example: SQS integration

Assuming you have added [`broadway_sqs`](https://github.com/dashbitco/broadway_sqs) as a dependency and configured your SQS credentials accordingly, you can consume Amazon SQS events in only 20 LOCs:
Assuming you have added [`broadway_sqs`](https://github.com/dashbitco/broadway_sqs) as a dependency and configured your SQS credentials accordingly, consume Amazon SQS events in only 20 LOCs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is significantly worse without you can?


```elixir
defmodule MyBroadway do
Expand Down Expand Up @@ -82,7 +82,7 @@ defmodule MyBroadway do
end
```

Once your Broadway module is defined, you just need to add it as a child of your application supervision tree as `{MyBroadway, []}`.
Once your Broadway module is defined, add it as a child of your application supervision tree as `{MyBroadway, []}`.

## Comparison to Flow

Expand Down
32 changes: 15 additions & 17 deletions guides/examples/amazon-sqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ queues:

Broadway can work seamlessly with both, Standard and FIFO queues.

## Getting Started
## Getting started

In order to use Broadway with SQS, we need to:
To use Broadway with SQS:

1. Create a SQS queue (or use an existing one)
1. Configure our Elixir project to use Broadway
Expand All @@ -43,7 +43,7 @@ which is a Broadway SQS Connector provided by [Dashbit](https://dashbit.co/).

### Starting a new project

If you plan to start a new project, just run:
If you plan to start a new project, run:

$ mix new my_app --sup

Expand All @@ -66,12 +66,12 @@ Don't forget to check for the latest version of dependencies.

## Define the pipeline configuration

Broadway is a process-based behaviour and to define a Broadway
pipeline, we need to define three functions: `start_link/1`,
A Broadway pipeline is an implementation of a process-based behaviour and
is defined by three functions: `start_link/1`,
`handle_message/3` and `handle_batch/4`. We will cover `start_link/1`
in this section and the `handle_` callbacks in the next one.

Similar to other process-based behaviour, `start_link/1` simply
Similar to other process-based behaviour, `start_link/1`
delegates to `Broadway.start_link/2`, which should define the
producers, processors, and batchers in the Broadway pipeline.
Assuming we want to consume messages from a queue called
Expand Down Expand Up @@ -107,8 +107,8 @@ Assuming we want to consume messages from a queue called
The above configuration also assumes that you have the AWS credentials
set up in your environment, for instance, by having the `AWS_ACCESS_KEY_ID`
and `AWS_SECRET_ACCESS_KEY` environment variables set. If that's
not the case, you will need to pass that information to the client so it
can properly connect to the AWS servers. Here is how you can do it:
not the case, pass that information to the client so it
can properly connect to the AWS servers. Here's how to do it:

...
producer: [
Expand All @@ -135,9 +135,8 @@ module docs as well as `Broadway.start_link/2`.

## Implement Broadway callbacks

In order to process incoming messages, we need to implement the
required callbacks. For the sake of simplicity, we're considering that
all messages received from the queue are just numbers:
Implement the required callbacks to process incoming messages.
In this example, all messages received from the queue are numbers:

defmodule MyBroadway do
use Broadway
Expand Down Expand Up @@ -169,9 +168,9 @@ For more information, see `c:Broadway.handle_message/3` and

## Run the Broadway pipeline

To run your `Broadway` pipeline, you just need to add as a child in
To run your `Broadway` pipeline, add it as a child in
a supervision tree. Most applications have a supervision tree defined
at `lib/my_app/application.ex`. You can add Broadway as a child to a
at `lib/my_app/application.ex`. Add Broadway as a child to a
supervisor as follows:

children = [
Expand All @@ -190,7 +189,7 @@ in the supervision tree.
Some of the configuration options available for Broadway come already with a
"reasonable" default value. However those values might not suit your
requirements. Depending on the number of messages you get, how much processing
they need and how much IO work is going to take place, you might need completely
they need and how much IO work is going to take place, you need completely
different values to optimize the flow of your pipeline. The `concurrency` option
available for every set of producers, processors and batchers, among with
`max_demand`, `batch_size`, and `batch_timeout` can give you a great deal
Expand All @@ -202,7 +201,7 @@ See the notes on [`Producer concurrency`](https://hexdocs.pm/broadway/Broadway.h
and [`Batcher concurrency`](https://hexdocs.pm/broadway/Broadway.html#module-batcher-concurrency)
for details.

Here's an example on how you could tune them according to
Here's an example on how you tune them according to
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Here's an example on how you tune them according to
Here's an example on how to tune them according to

Think this reads better?

your needs.

defmodule MyBroadway do
Expand Down Expand Up @@ -236,5 +235,4 @@ your needs.
In order to get a good set of configurations for your pipeline, it's
important to respect the limitations of the servers you're running,
as well as the limitations of the services you're providing/consuming
data to/from. Broadway comes with telemetry, so you can measure your
pipeline and help ensure your changes are effective.
data to/from. Measure your pipeline with [telemetry](https://hexdocs.pm/telemetry/readme.html) to ensure your changes are effective. (It comes standard.)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't think the parentheses there work well. What about

Suggested change
data to/from. Measure your pipeline with [telemetry](https://hexdocs.pm/telemetry/readme.html) to ensure your changes are effective. (It comes standard.)
data to/from. To measure your pipeline and ensure your changes are effective, use the [telemetry](https://hexdocs.pm/telemetry/readme.html) integration that comes with Broadway.

55 changes: 22 additions & 33 deletions guides/examples/apache-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

Kafka is a distributed streaming platform that has three key capabilities:

* Publish and subscribe to streams of records
* Store streams of records in a fault-tolerant durable way
* Process streams of records as they occur
* Publish and subscribe to record streams
* Store record streams with fault-tolerant durability
* Process record streams in real-time

## Getting Started
## Getting started

In order to use Broadway with Kafka, we need to:
To use Broadway with Kafka:

1. Create a stream of records (or use an existing one)
1. Configure your Elixir project to use Broadway
Expand All @@ -20,15 +20,13 @@ In order to use Broadway with Kafka, we need to:

In case you don't have Kafka installed yet, please follow the instructions on Kafka's
[Quickstart](https://kafka.apache.org/quickstart) for a clean installation. After
initializing Kafka, you can create a new stream by running:
initializing Kafka, create a new stream by running:

$ kafka-topics --create --zookeeper localhost:2181 --partitions 3 --topic test

## Configure your Elixir project to use Broadway

This guide describes the steps necessary to integrate Broadway with Kafka using
[BroadwayKafka](https://github.com/dashbitco/broadway_kafka),
which is a Broadway Kafka Connector provided by [Dashbit](https://dashbit.co/).
This guide uses [BroadwayKafka](https://github.com/dashbitco/broadway_kafka) from Dashbit to integrate Broadway with Kafka.

BroadwayKafka can subscribe to one or more topics and process streams of records
using Kafka's [Consumer API](https://kafka.apache.org/documentation.html#consumerapi).
Expand Down Expand Up @@ -125,9 +123,9 @@ module docs as well as `Broadway.start_link/2`.

## Implement Broadway callbacks

In order to process incoming messages, we need to implement the
required callbacks. For the sake of simplicity, we're considering that
all messages received from the topic are just numbers:
Implement callbacks to process incoming messages.
In this example,
all messages received from the topic are numbers:

defmodule MyBroadway do
use Broadway
Expand Down Expand Up @@ -157,16 +155,12 @@ purpose. First, we update the message's data individually inside
For more information, see `c:Broadway.handle_message/3` and
`c:Broadway.handle_batch/4`.

> Note: Since Broadway v0.2, batching is optional. In case you don't need to
> group messages as batches for further processing/publishing, you can remove
> the `:batchers` configuration along with the `handle_batch/4` callback.
> Note: Broadway v0.2 makes batching optional. Remove the `:batchers` configuration along with the `c:handle_batch/4` callback if unneeded.

## Run the Broadway pipeline

To run your `Broadway` pipeline, you just need to add as a child in
a supervision tree. Most applications have a supervision tree defined
at `lib/my_app/application.ex`. You can add Broadway as a child to a
supervisor as follows:
Add your `Broadway` pipeline as a child in a supervision tree to run it. Most applications have a supervision tree defined at `lib/my_app/application.ex`.
Add the child process `{MyBroadway, []}` to a supervisor as follows:

children = [
{MyBroadway, []}
Expand All @@ -179,7 +173,7 @@ Also, if your Broadway has any dependency (for example, it needs to talk
to the database), make sure that Broadway is listed *after* its dependencies
in the supervision tree.

You can now test your pipeline by entering an `iex` session:
Test your pipeline by entering an `iex` session:

$ iex -S mix

Expand All @@ -193,7 +187,7 @@ under the hood to communicate with Kafka.

### Sending messages to Kafka

Finally, we can send some sample messages to Kafka using using `:brod` with the following snippet:
Use `:brod` to send sample messages to Kafka:

topic = "test"
client_id = :my_client
Expand All @@ -207,7 +201,7 @@ Finally, we can send some sample messages to Kafka using using `:brod` with the
:ok = :brod.produce_sync(client_id, topic, partition, _key="", "#{i}")
end)

You should see the output showing the generated batches:
See the output showing the generated batches:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope I don't think this works well?


Got batch: [
{"2", 4},
Expand All @@ -231,7 +225,7 @@ You should see the output showing the generated batches:
Some of the configuration options available for Broadway come already with a
"reasonable" default value. However, those values might not suit your
requirements. Depending on the number of records you get, how much processing
they need and how much IO work is going to take place, you might need completely
they need and how much IO work is going to take place, you need completely
different values to optimize the flow of your pipeline. The `concurrency` option
available for every set of producers, processors and batchers, along with
`batch_size` and `batch_timeout` can give you a great deal of flexibility.
Expand All @@ -256,11 +250,11 @@ can still receive more assignments than planned. For instance, if another consum
the server will reassign all its topic/partition to other available consumers, including
any Broadway producer subscribed to the same topic.

There are other options that you may want to take a closer look when tuning your configuration.
The `:max_bytes` option, for instance, belongs to the `:fetch_config` group and defines the
maximum amount of data to be fetched at a time from a single partition. The default is
Other options require attention during configuration tuning.
The `:max_bytes` option (part of `:fetch_config`) defines the
maximum data fetched at a time from a single partition. The default is
1048576 (1 MiB). Setting greater values can improve throughput at the cost of more
memory consumption. For more information and other fetch options, please refer to the
memory consumption. For more fetch options, please refer to the
"Fetch config options" in the official [BroadwayKafka](https://hexdocs.pm/broadway_kafka/)
documentation.

Expand All @@ -287,9 +281,4 @@ tuning `:offset_commit_interval_seconds` and `:offset_commit_on_ack`.

## Handling failed messages

`broadway_kafka` never stops the flow of the stream, i.e. it will **always ack** the messages
even when they fail. Unlike queue-based connectors, where you can mark a single message as failed.
In Kafka that's not possible due to its single offset per topic/partition ack strategy. If you
want to reprocess failed messages, you need to roll your own strategy. A possible way to do that
is to implement `handle_failed/2` and send failed messages to a separated stream or queue for
later processing.
`broadway_kafka` **always acknowledges** (yes, also failed) messages, so the stream flow is never stopped. Unlike queue-based connectors, Kafka’s single offset-per-topic/partition strategy prevents marking individual messages as failed. To reprocess failures, implement a custom strategy (e.g., using `handle_failed/2` to redirect failed messages to a separate stream or queue).
14 changes: 6 additions & 8 deletions guides/examples/custom-producers.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
# Custom Producers

If you want to use Broadway but there is no existing Broadway producer
for the technology of your choice, you can integrate any existing GenStage
producer into the pipeline with relative ease.
If there is no existing Broadway producer for your technology of choice, integrate any existing GenStage producer into the pipeline.

## Example

In general, producers must generate `%Broadway.Message{}` structs in order
to be processed by Broadway. In case you need to use an existing GenStage
producer and you don't want to change its original implementation,
you'll have to set the producer's `:transformer` option to translate the
to be processed by Broadway. To use an existing GenStage
producer without changing its original implementation,
set the producer's `:transformer` option to translate the
generated events into Broadway messages.

In the following example the producer is a regular GenStage, i.e., it
Expand All @@ -32,7 +30,7 @@ produces plain events that cannot be processed by Broadway directly:
end
end

By using a transformer, you can tell Broadway to transform all events
By using a transformer, Broadway knows to transform all events
generated by the producer into proper Broadway messages:

defmodule MyBroadway do
Expand Down Expand Up @@ -70,7 +68,7 @@ generated by the producer into proper Broadway messages:
end
end

Notice that you need to pass two options to the producer:
Pass two options to the producer:

* `:module` - a tuple representing the GenStage producer as `{mod, arg}`.
Where `mod` is module that implements the GenStage behaviour and `arg`
Expand Down
Loading