From 1df7215124fc28412b7736a304386c7d0efc5fe6 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 10:21:44 -0600 Subject: [PATCH 01/32] removed "just" --- README.md | 2 +- guides/examples/amazon-sqs.md | 6 +++--- guides/examples/apache-kafka.md | 4 ++-- guides/examples/google-cloud-pubsub.md | 6 +++--- guides/examples/rabbitmq.md | 6 +++--- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index b404b592..300db641 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ defmodule MyBroadway do end ``` -Once your Broadway module is defined, you just need to add it as a child of your application supervision tree as `{MyBroadway, []}`. +Once your Broadway module is defined, you need to add it as a child of your application supervision tree as `{MyBroadway, []}`. ## Comparison to Flow diff --git a/guides/examples/amazon-sqs.md b/guides/examples/amazon-sqs.md index 67479f6d..2d081ba3 100644 --- a/guides/examples/amazon-sqs.md +++ b/guides/examples/amazon-sqs.md @@ -43,7 +43,7 @@ which is a Broadway SQS Connector provided by [Dashbit](https://dashbit.co/). ### Starting a new project -If you plan to start a new project, just run: +If you plan to start a new project, run: $ mix new my_app --sup @@ -137,7 +137,7 @@ module docs as well as `Broadway.start_link/2`. In order to process incoming messages, we need to implement the required callbacks. For the sake of simplicity, we're considering that -all messages received from the queue are just numbers: +all messages received from the queue are numbers: defmodule MyBroadway do use Broadway @@ -169,7 +169,7 @@ For more information, see `c:Broadway.handle_message/3` and ## Run the Broadway pipeline -To run your `Broadway` pipeline, you just need to add as a child in +To run your `Broadway` pipeline, you need to add as a child in a supervision tree. Most applications have a supervision tree defined at `lib/my_app/application.ex`. You can add Broadway as a child to a supervisor as follows: diff --git a/guides/examples/apache-kafka.md b/guides/examples/apache-kafka.md index 97f9477b..eb712730 100644 --- a/guides/examples/apache-kafka.md +++ b/guides/examples/apache-kafka.md @@ -127,7 +127,7 @@ module docs as well as `Broadway.start_link/2`. In order to process incoming messages, we need to implement the required callbacks. For the sake of simplicity, we're considering that -all messages received from the topic are just numbers: +all messages received from the topic are numbers: defmodule MyBroadway do use Broadway @@ -163,7 +163,7 @@ For more information, see `c:Broadway.handle_message/3` and ## Run the Broadway pipeline -To run your `Broadway` pipeline, you just need to add as a child in +To run your `Broadway` pipeline, you need to add as a child in a supervision tree. Most applications have a supervision tree defined at `lib/my_app/application.ex`. You can add Broadway as a child to a supervisor as follows: diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index 2dbbc594..f6870c55 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -13,7 +13,7 @@ In order to use Broadway with Cloud Pub/Sub you need to: 1. Run the Broadway pipeline 1. Tune the configuration (Optional) -If you are just getting familiar with Google Pub/Sub, refer to [the documentation](https://cloud.google.com/pubsub/docs/) +If you are getting familiar with Google Pub/Sub, refer to [the documentation](https://cloud.google.com/pubsub/docs/) to get started. Instead of testing against a live environment, you may also consider using the [emulator](https://cloud.google.com/pubsub/docs/emulator) to simulate integrating with Cloud Pub/Sub. @@ -46,7 +46,7 @@ A new topic: $ gcloud pubsub topics create test-topic --project test-pubsub Created topic [projects/test-pubsub/topics/test-topic]. -> Note: If you run this command immediately after creating a new Google Cloud project, you may receive an error indicating that your project's organization policy is still being provisioned. Just wait a couple minutes and try again. +> Note: If you run this command immediately after creating a new Google Cloud project, you may receive an error indicating that your project's organization policy is still being provisioned. wait a couple minutes and try again. And a new subscription: @@ -91,7 +91,7 @@ which is a Broadway Cloud Pub/Sub Connector provided by [Dashbit](https://dashbi ### Starting a new project -If you plan to start a new project, just run: +If you plan to start a new project, run: $ mix new my_app --sup diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index f9ee8656..93254275 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -35,7 +35,7 @@ following command: $ rabbitmqadmin declare queue name=my_queue durable=true -You can list all declared queues to see our the one we've just created: +You can list all declared queues to see the one we've created: $ rabbitmqctl list_queues Timeout: 60.0 seconds ... @@ -143,7 +143,7 @@ module docs as well as `Broadway.start_link/2`. In order to process incoming messages, we need to implement the required callbacks. For the sake of simplicity, we're considering that -all messages received from the queue are just numbers: +all messages received from the queue are numbers: defmodule MyBroadway do use Broadway @@ -181,7 +181,7 @@ For more information, see `c:Broadway.handle_message/3` and ## Run the Broadway pipeline -To run your `Broadway` pipeline, you just need to add as a child in +To run your `Broadway` pipeline, you need to add as a child in a supervision tree. Most applications have a supervision tree defined at `lib/my_app/application.ex`. You can add Broadway as a child to a supervisor as follows: From f99824c3681121d3d3eb00aa40e629ee619d3784 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 10:23:22 -0600 Subject: [PATCH 02/32] removed "simply" --- guides/examples/google-cloud-pubsub.md | 2 +- lib/broadway.ex | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index f6870c55..74656f9f 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -118,7 +118,7 @@ Broadway is a process-based behaviour and to define a Broadway pipeline, we need functions: `start_link/1`, `handle_message/3` and `handle_batch/4`. We will cover `start_link/1` in this section and the `handle_` callbacks in the next one. -Similar to other process-based behaviour, `start_link/1` simply delegates to +Similar to other process-based behaviour, `start_link/1` delegates to `Broadway.start_link/2`, which should define the producers, processors, and batchers in the Broadway pipeline. Assuming we want to consume messages from the `test-subscription`, the minimal configuration would be: diff --git a/lib/broadway.ex b/lib/broadway.ex index 7c573e1b..f22c3aee 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -863,7 +863,7 @@ defmodule Broadway do `min_demand` value. Producers which are push-based, rather than pull-based, such as `BroadwayRabbitMQ.Producer`, are more likely to send messages as they arrive (which may skip batching altogether and always be single element lists). - In other words, this callback is simply a convenience for preparing messages, + In other words, this callback is a convenience for preparing messages, it does not guarantee the messages will be accumulated to a certain length. For effective batch processing, see `c:handle_batch/4`. From e3800d946589786b805cbc1c10a3d844e01721df Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 10:26:05 -0600 Subject: [PATCH 03/32] removed "kind of/sort of" --- lib/broadway.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/broadway.ex b/lib/broadway.ex index f22c3aee..8c4a3611 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -329,7 +329,7 @@ defmodule Broadway do `c:handle_failed/2` callback, that callback will be invoked with all the failed messages before they get acknowledged. - Note however, that `Broadway` does not provide any sort of retries + Note however, that `Broadway` does not provide retries out of the box. This is left completely as a responsibility of the producer. For instance, if you are using Amazon SQS, the default behaviour is to retry unacknowledged messages after a user-defined @@ -891,7 +891,7 @@ defmodule Broadway do And it must return the (potentially) updated `Broadway.Message` struct. - This is the place to do any kind of processing with the incoming message, + This is the place to process the incoming message, e.g., transform the data into another data structure, call specific business logic to do calculations. Basically, any CPU bounded task that runs against a single message should be processed here. From 5b56d1f5347ddc08ad76a86d2ea79ab648260dfa Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 10:26:47 -0600 Subject: [PATCH 04/32] removed "actually" --- lib/broadway.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/broadway.ex b/lib/broadway.ex index 8c4a3611..f24eb01b 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -583,7 +583,7 @@ defmodule Broadway do > all order partitions. This implies two things: > > * Using `:partition_by` with a high level of concurrency can - > actually be detrimental to performance. For example, if + > be detrimental to performance. For example, if > concurrency is set to 100, you need all 100 processors to > make progress at the same time. > From 40676e4c2b5ca0cc49c5023d6215ed24737e405a Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 10:28:09 -0600 Subject: [PATCH 05/32] remove "quite" --- lib/broadway/options.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/broadway/options.ex b/lib/broadway/options.ex index 6b0e680a..3f3051a9 100644 --- a/lib/broadway/options.ex +++ b/lib/broadway/options.ex @@ -146,7 +146,7 @@ defmodule Broadway.Options do > transformations, each with a different concern, then you must > have several processors. > - > However, that's not quite true. Separation of concerns is modeled + > However, that's not true. Separation of concerns is modeled > by defining several modules and functions, not processors. Processors > are ultimately about moving data around and you should only do it > when necessary. Using processors for code organization purposes would From 4edfce349eca3078204ae87028961a0c80347fea Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 10:39:52 -0600 Subject: [PATCH 06/32] Edit for consistent header title case --- guides/examples/amazon-sqs.md | 2 +- guides/examples/apache-kafka.md | 2 +- guides/examples/google-cloud-pubsub.md | 2 +- guides/examples/rabbitmq.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/guides/examples/amazon-sqs.md b/guides/examples/amazon-sqs.md index 2d081ba3..6e7c7bdd 100644 --- a/guides/examples/amazon-sqs.md +++ b/guides/examples/amazon-sqs.md @@ -18,7 +18,7 @@ queues: Broadway can work seamlessly with both, Standard and FIFO queues. -## Getting Started +## Getting started In order to use Broadway with SQS, we need to: diff --git a/guides/examples/apache-kafka.md b/guides/examples/apache-kafka.md index eb712730..715efaa2 100644 --- a/guides/examples/apache-kafka.md +++ b/guides/examples/apache-kafka.md @@ -6,7 +6,7 @@ Kafka is a distributed streaming platform that has three key capabilities: * Store streams of records in a fault-tolerant durable way * Process streams of records as they occur -## Getting Started +## Getting started In order to use Broadway with Kafka, we need to: diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index 74656f9f..3d37f169 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -2,7 +2,7 @@ Cloud Pub/Sub is a fully-managed real-time messaging service provided by Google. -## Getting Started +## Getting started In order to use Broadway with Cloud Pub/Sub you need to: diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index 93254275..0496d65a 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -4,7 +4,7 @@ RabbitMQ is an open source message broker designed to be highly scalable and distributed. It supports multiple protocols including the Advanced Message Queuing Protocol (AMQP). -## Getting Started +## Getting started In order to use Broadway with RabbitMQ, we need to: From 859c9cf1f6846437e6734ccf91185b87d8569d7d Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 16:43:05 -0600 Subject: [PATCH 07/32] Used the text from the "Official Producers" page to remove passive voice and add specificity --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 300db641..22665b90 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Broadway takes the burden of defining concurrent GenStage topologies and provide ### Producers -There are several producers that you can use to integrate with existing services and technologies. [See the docs for detailed how-tos and supported producers](https://hexdocs.pm/broadway/introduction.html#official-producers). +Currently we officially support four Broadway producers that integrate with existing services and technologies. [See the docs for detailed how-tos and supported producers](https://hexdocs.pm/broadway/introduction.html#official-producers). ## Installation From 90e6a4ec45bfabbd39bd49fc429cf6b6d11e980a Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 16:44:46 -0600 Subject: [PATCH 08/32] removed "you", added active voice --- guides/examples/amazon-sqs.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/guides/examples/amazon-sqs.md b/guides/examples/amazon-sqs.md index 6e7c7bdd..0a93d04f 100644 --- a/guides/examples/amazon-sqs.md +++ b/guides/examples/amazon-sqs.md @@ -108,7 +108,7 @@ The above configuration also assumes that you have the AWS credentials set up in your environment, for instance, by having the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables set. If that's not the case, you will need to pass that information to the client so it -can properly connect to the AWS servers. Here is how you can do it: +can properly connect to the AWS servers. Here's how to do it: ... producer: [ @@ -171,7 +171,7 @@ For more information, see `c:Broadway.handle_message/3` and To run your `Broadway` pipeline, you need to add as a child in a supervision tree. Most applications have a supervision tree defined -at `lib/my_app/application.ex`. You can add Broadway as a child to a +at `lib/my_app/application.ex`. Add Broadway as a child to a supervisor as follows: children = [ From 0f6fb7a143bd6a05f06a2379fda0b6049cdfacaa Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 16:45:55 -0600 Subject: [PATCH 09/32] Removing passive voice wasn't as cut and dry here as it was for the previous commit. --- guides/examples/amazon-sqs.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/guides/examples/amazon-sqs.md b/guides/examples/amazon-sqs.md index 0a93d04f..d6fc75ff 100644 --- a/guides/examples/amazon-sqs.md +++ b/guides/examples/amazon-sqs.md @@ -236,5 +236,4 @@ your needs. In order to get a good set of configurations for your pipeline, it's important to respect the limitations of the servers you're running, as well as the limitations of the services you're providing/consuming -data to/from. Broadway comes with telemetry, so you can measure your -pipeline and help ensure your changes are effective. +data to/from. Measure your pipeline with [telemetry](https://hexdocs.pm/telemetry/readme.html) to ensure your changes are effective. (It comes standard.) From 4cd058d81b6f009bf8a6468ccb77c28800de40ba Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 16:48:23 -0600 Subject: [PATCH 10/32] remove "you" --- guides/examples/apache-kafka.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/guides/examples/apache-kafka.md b/guides/examples/apache-kafka.md index 715efaa2..2a3b7542 100644 --- a/guides/examples/apache-kafka.md +++ b/guides/examples/apache-kafka.md @@ -20,7 +20,7 @@ In order to use Broadway with Kafka, we need to: In case you don't have Kafka installed yet, please follow the instructions on Kafka's [Quickstart](https://kafka.apache.org/quickstart) for a clean installation. After -initializing Kafka, you can create a new stream by running: +initializing Kafka, create a new stream by running: $ kafka-topics --create --zookeeper localhost:2181 --partitions 3 --topic test @@ -158,14 +158,14 @@ For more information, see `c:Broadway.handle_message/3` and `c:Broadway.handle_batch/4`. > Note: Since Broadway v0.2, batching is optional. In case you don't need to -> group messages as batches for further processing/publishing, you can remove +> group messages as batches for further processing/publishing, remove > the `:batchers` configuration along with the `handle_batch/4` callback. ## Run the Broadway pipeline To run your `Broadway` pipeline, you need to add as a child in a supervision tree. Most applications have a supervision tree defined -at `lib/my_app/application.ex`. You can add Broadway as a child to a +at `lib/my_app/application.ex`. Add Broadway as a child to a supervisor as follows: children = [ @@ -179,7 +179,7 @@ Also, if your Broadway has any dependency (for example, it needs to talk to the database), make sure that Broadway is listed *after* its dependencies in the supervision tree. -You can now test your pipeline by entering an `iex` session: +Test your pipeline by entering an `iex` session: $ iex -S mix From 1895286f01fceb86a85225ef5b5f0137496e3815 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 16:54:58 -0600 Subject: [PATCH 11/32] Refactor handling of failed messages for clarity and conciseness * added "messages" inside the bold after "the" was no longer breaking up an active phrase * Multiple "you" + verb instances in paragraph form required reworking to make active * I tried to get from active to passive with the minimum amount of edits and no change in the information being presented --- guides/examples/apache-kafka.md | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/guides/examples/apache-kafka.md b/guides/examples/apache-kafka.md index 2a3b7542..c3724c5e 100644 --- a/guides/examples/apache-kafka.md +++ b/guides/examples/apache-kafka.md @@ -287,9 +287,4 @@ tuning `:offset_commit_interval_seconds` and `:offset_commit_on_ack`. ## Handling failed messages -`broadway_kafka` never stops the flow of the stream, i.e. it will **always ack** the messages -even when they fail. Unlike queue-based connectors, where you can mark a single message as failed. -In Kafka that's not possible due to its single offset per topic/partition ack strategy. If you -want to reprocess failed messages, you need to roll your own strategy. A possible way to do that -is to implement `handle_failed/2` and send failed messages to a separated stream or queue for -later processing. +`broadway_kafka` never stops the flow of the stream and will **always ack messages**, even when they fail. Unlike queue-based connectors, which allow marking individual messages as failed, Kafka's single offset per topic/partition strategy prevents that. To reprocess failed messages, implement your own strategy, such as using `handle_failed/2` to send failed messages to a separate stream or queue for later processing. From 0101f1df4747c1cfada6d4f42beacd057133f3b0 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 16:58:56 -0600 Subject: [PATCH 12/32] removed passive "you" --- guides/examples/custom-producers.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/guides/examples/custom-producers.md b/guides/examples/custom-producers.md index efda4b37..11b950f9 100644 --- a/guides/examples/custom-producers.md +++ b/guides/examples/custom-producers.md @@ -1,8 +1,6 @@ # Custom Producers -If you want to use Broadway but there is no existing Broadway producer -for the technology of your choice, you can integrate any existing GenStage -producer into the pipeline with relative ease. +If there is no existing Broadway producer for your technology of choice, integrate any existing GenStage producer into the pipeline. ## Example @@ -32,7 +30,7 @@ produces plain events that cannot be processed by Broadway directly: end end -By using a transformer, you can tell Broadway to transform all events +By using a transformer, Broadway knows to transform all events generated by the producer into proper Broadway messages: defmodule MyBroadway do From d3027ea29d6350943f70447c7bade150c33eac62 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 17:15:01 -0600 Subject: [PATCH 13/32] removed weak/passive "you" --- guides/examples/google-cloud-pubsub.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index 3d37f169..52f9d766 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -18,14 +18,14 @@ to get started. Instead of testing against a live environment, you may also cons [emulator](https://cloud.google.com/pubsub/docs/emulator) to simulate integrating with Cloud Pub/Sub. -If you have an existing project, topic, subscription, and credentials, you can skip [step +Existing projects, topics, subscriptions, and credentials can skip [step 1](#setup-cloud-pub-sub-project) and jump to [Configure the project](#configure-the-project) section. ## Setup Cloud Pub/Sub project In this tutorial we'll use the [`gcloud`](https://cloud.google.com/sdk/gcloud/) command-line tool -to set everything up in Google Cloud. Alternatively, you can roughly follow this guide by using +to set everything up in Google Cloud. Alternatively, follow this [Cloud Console](https://console.cloud.google.com). To install `gcloud` follow the [documentation](https://cloud.google.com/sdk/gcloud/). If you are @@ -46,7 +46,7 @@ A new topic: $ gcloud pubsub topics create test-topic --project test-pubsub Created topic [projects/test-pubsub/topics/test-topic]. -> Note: If you run this command immediately after creating a new Google Cloud project, you may receive an error indicating that your project's organization policy is still being provisioned. wait a couple minutes and try again. +> Note: If an error indicates the organization policy is still provisioning after creating a new Google Cloud project, wait a couple minutes and try again. And a new subscription: @@ -196,7 +196,7 @@ For more information, see `c:Broadway.handle_message/3` and `c:Broadway.handle_b ## Run the Broadway pipeline To run your `Broadway` pipeline, you need to add it as a child in a supervision tree. Most -applications have a supervision tree defined at `lib/my_app/application.ex`. You can add Broadway +applications have a supervision tree defined at `lib/my_app/application.ex`. Add Broadway as a child to a supervisor as follows: children = [ @@ -205,7 +205,7 @@ as a child to a supervisor as follows: Supervisor.start_link(children, strategy: :one_for_one) -The final step is to configure credentials. You can set the following environment variable: +The final step is to configure credentials. Set the following environment variable: export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json @@ -216,7 +216,7 @@ Now the Broadway pipeline should be started when your application starts. Also, pipeline has any dependency (for example, it needs to talk to the database), make sure that it is listed *after* its dependencies in the supervision tree. -If you followed the previous section about setting the project with `gcloud`, you can now test the +In the previous section `gcloud` set up the project. Now test the the pipeline. In one terminal tab start the application: $ iex -S mix From d4265ee09c74fe92ddfe5c499c1bd3e496b511ee Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 17:16:18 -0600 Subject: [PATCH 14/32] See 7c706121a5693df75fab914d481c843525afbff7. --- guides/examples/google-cloud-pubsub.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index 52f9d766..986b8abf 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -288,5 +288,4 @@ your needs. In order to get a good set of configurations for your pipeline, it's important to respect the limitations of the servers you're running, as well as the limitations of the services you're providing/consuming -data to/from. Broadway comes with telemetry, so you can measure your -pipeline and help ensure your changes are effective. +data to/from. Measure your pipeline with [telemetry](https://hexdocs.pm/telemetry/readme.html) to ensure your changes are effective. (It comes standard.) From f2ffc5f9db813a99effabb96e12db0811c617533 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 17:19:21 -0600 Subject: [PATCH 15/32] remove passive/weak "you" --- guides/examples/introduction.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/guides/examples/introduction.md b/guides/examples/introduction.md index f0f677ce..fd071b22 100644 --- a/guides/examples/introduction.md +++ b/guides/examples/introduction.md @@ -15,18 +15,21 @@ ## Official Producers -Currently we officially support four Broadway producers: +Currently we officially support four Broadway producers. + +Start with the guide link for your adapter: * Amazon SQS: [Source](https://github.com/dashbitco/broadway_sqs) - [Guide](amazon-sqs.md) * Apache Kafka: [Source](https://github.com/dashbitco/broadway_kafka) - [Guide](apache-kafka.md) * Google Cloud Pub/Sub: [Source](https://github.com/dashbitco/broadway_cloud_pub_sub) - [Guide](google-cloud-pubsub.md) * RabbitMQ: [Source](https://github.com/dashbitco/broadway_rabbitmq) - [Guide](rabbitmq.md) -The guides links above will help you get started with your adapter of choice. For API reference, you can check out the `Broadway` module. +For API reference, check out the `Broadway` module. ## Non-official (Off-Broadway) Producers -For those interested in rolling their own Broadway Producers (which we actively encourage!), we recommend using the `OffBroadway` namespace, mirroring the [Off-Broadway theaters](https://en.wikipedia.org/wiki/Off-Broadway). For example, if you want to publish your own integration with Amazon SQS, you can package it as `off_broadway_sqs`, which uses the `OffBroadway.SQS` namespace. +For those interested in rolling their own Broadway Producers (which we actively encourage!), we recommend using the `OffBroadway` namespace, mirroring the [Off-Broadway theaters](https://en.wikipedia.org/wiki/Off-Broadway). For example, package +your own integration with Amazon SQS as `off_broadway_sqs`, which uses the `OffBroadway.SQS` namespace. The following Off-Broadway libraries are available (feel free to send a PR adding your own in alphabetical order): From 787bbc796364a86d5fe62cb144b9e50b8dfef3a6 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 17:23:25 -0600 Subject: [PATCH 16/32] passive/weak "you" --- guides/examples/rabbitmq.md | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index 0496d65a..6d2147f3 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -15,12 +15,11 @@ In order to use Broadway with RabbitMQ, we need to: 1. [Run the Broadway pipeline](#run-the-broadway-pipeline) 1. [Tuning the configuration](#tuning-the-configuration) (Optional) -In case you want to work with an existing queue, you can skip [step 1](#create-a-queue) +To work with an existing queue, skip [step 1](#create-a-queue) and jump to [Configure the project](#configure-the-project). -> Note: `BroadwayRabbitMQ` does not automatically create any queue. If you -configure a pipeline with a non-existent queue, the producers will crash, -bringing down the pipeline. +> Note: `BroadwayRabbitMQ` does not automatically create queues. +> Without a queue the producers will crash, bringing down the pipeline. ## Create a queue @@ -30,12 +29,12 @@ further information. Also, make sure you have the [Management](https://www.rabbitmq.com/management.html) plugin enabled, which ships with the command line tool, `rabbitmqadmin`. -After successfully installing RabbitMQ, you can declare a new queue with the +After successfully installing RabbitMQ, declare a new queue with the following command: $ rabbitmqadmin declare queue name=my_queue durable=true -You can list all declared queues to see the one we've created: +List all declared queues to see the one we've created: $ rabbitmqctl list_queues Timeout: 60.0 seconds ... @@ -173,17 +172,16 @@ purpose. First, we update the message's data individually inside For more information, see `c:Broadway.handle_message/3` and `c:Broadway.handle_batch/4`. -> Note: Since Broadway v0.2, batching is optional. In case you don't need to -> group messages as batches for further processing/publishing, you can remove -> the `:batchers` configuration along with the `handle_batch/4` callback. This -> is perfectly fine for RabbitMQ, where messages are acknowledged individually -> and never as a batch. +> Note: Since Broadway v0.2, batching is optional. Remove +> the `:batchers` configuration along with the `c:handle_batch/4` callback +> to send single messages for further processing/publishing. This +> works because RabbitMQ messages acknowledges messages individually. ## Run the Broadway pipeline To run your `Broadway` pipeline, you need to add as a child in a supervision tree. Most applications have a supervision tree defined -at `lib/my_app/application.ex`. You can add Broadway as a child to a +at `lib/my_app/application.ex`. Add Broadway as a child to a supervisor as follows: children = [ @@ -197,7 +195,7 @@ Also, if your Broadway has any dependency (for example, it needs to talk to the database), make sure that Broadway is listed *after* its dependencies in the supervision tree. -You can now test your pipeline by entering an `iex` session: +Test your pipeline by entering an `iex` session: $ iex -S mix From 3a2869c3769cbece6582b75e711558ca61358a3e Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 17:25:30 -0600 Subject: [PATCH 17/32] *made edit to make the distinction between function and callback * removed weak words would/should --- guides/examples/rabbitmq.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index 6d2147f3..7641b54b 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -71,15 +71,14 @@ Don't forget to check for the latest version of dependencies. ## Define the pipeline configuration Broadway is a process-based behaviour and to define a Broadway pipeline, -we need to define three functions: `start_link/1`, `handle_message/3` -and optionally `handle_batch/4`. We will cover `start_link/1` in this -section and the `handle_` callbacks in the next one. +we need to define the `start_link/1` function, the `c:handle_message/3` +callback, and optionally, the `c:handle_batch/4` callback. We will cover `start_link/1` in this section and the `handle_` callbacks in the next one. Similar to other process-based behaviours, `start_link/1` simply -delegates to `Broadway.start_link/2`, which should define the +delegates to `Broadway.start_link/2`, which defines the producers, processors, and batchers in the Broadway pipeline. Assuming we want to consume messages from a queue called -`my_queue`, one possible configuration would be: +`my_queue` with the following configuration: defmodule MyBroadway do use Broadway From 6c4a016c461e14b7596b6f31702c86b7c6ab864d Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 17:26:13 -0600 Subject: [PATCH 18/32] See 7c706121a5693df75fab914d481c843525afbff7. --- guides/examples/rabbitmq.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index 7641b54b..10093c0f 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -260,5 +260,4 @@ section of the `BroadwayRabbitMQ` documentation for details. In order to get a good set of configurations for your pipeline, it's important to respect the limitations of the servers you're running, as well as the limitations of the services you're providing/consuming -data to/from. Broadway comes with telemetry, so you can measure your -pipeline and help ensure your changes are effective. +data to/from. Measure your pipeline with [telemetry](https://hexdocs.pm/telemetry/readme.html) to ensure your changes are effective. (It comes standard.) From 3f6bb4da04a9ef162673106413c8687cd7601a9a Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 17:54:31 -0600 Subject: [PATCH 19/32] remove weak/passive "you" --- lib/broadway.ex | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/lib/broadway.ex b/lib/broadway.ex index f24eb01b..6ec21e9a 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -38,7 +38,7 @@ defmodule Broadway do event was properly processed. * Custom failure handling - Broadway provides a `c:handle_failed/2` callback - where developers can outline custom code to handle errors. For example, + to outline custom code to handle errors. For example, if they want to move messages to another queue for further processing. * Dynamic batching - Broadway allows developers to batch messages based on @@ -48,9 +48,9 @@ defmodule Broadway do * Ordering and Partitioning - Broadway allows developers to partition messages across workers, guaranteeing messages within the same partition - are processed in order. For example, if you want to guarantee all events - tied to a given `user_id` are processed in order and not concurrently, - you can set the `:partition_by` option. See ["Ordering and partitioning"](#module-ordering-and-partitioning). + are processed in order. For example, set the `:partition_by` option to + guarantee all events tied to a given `user_id` are processed in order + and not concurrently. See ["Ordering and partitioning"](#module-ordering-and-partitioning). * Rate limiting - Broadway allows developers to rate limit all producers in a single node by a given number of messages in a time period, allowing @@ -104,12 +104,12 @@ defmodule Broadway do Supervisor.start_link(children, strategy: :one_for_one) - Adding your pipeline to your supervision tree in this way + Adding your pipeline to your supervision tree this way calls the default `child_spec/1` function that is generated - when `use Broadway` is invoked. If you would like to customize - the child spec passed to the supervisor, you can override the - `child_spec/1` function in your module or explicitly pass a - child spec to the supervisor when adding it to your supervision tree. + when `use Broadway` is invoked. Customize child specifications + by overriding the `child_spec/1` function in your module or + by explicitly passing the `:child_spec` option to the supervisor + when adding it to your supervision tree. The configuration above defines a pipeline with: @@ -198,8 +198,8 @@ defmodule Broadway do [batch_sqs_1] [batch_sqs_2] [batch_s3_1] <- process each batch ``` - Additionally, you have to define the `c:handle_batch/4` callback, - which batch processors invoke for each batch. You can then + Next, define the `c:handle_batch/4` callback, + which batch processors invoke with each batch. Then call `Broadway.Message.put_batcher/2` inside `c:handle_message/3` to control which batcher the message should go to. @@ -265,7 +265,7 @@ defmodule Broadway do to raise an error. For example, imagine you want to batch "special" messages and handle them differently - then all other messages. You can configure your pipeline like this: + then all other messages. Configure your pipeline like this: defmodule MyBroadway do use Broadway @@ -386,8 +386,8 @@ defmodule Broadway do `test_message/3` and `test_batch/3` functions should be used to publish messages. - With `test_message/3`, you can push a message into the pipeline and receive - a process message when the pipeline acknowledges the data you have pushed + Call `test_message/3` to push a message into the pipeline and receive + back a process message when the pipeline acknowledges the message data has been processed. Let's see an example. Imagine the following `Broadway` module: @@ -445,7 +445,7 @@ defmodule Broadway do {:ack, ^ref, successful_messages, failure_messages} - You can use the acknowledgment to guarantee the message has been + Use the acknowledgement to guarantee the message has been processed and therefore any side-effect from the pipeline should be visible. @@ -572,8 +572,8 @@ defmodule Broadway do across partitions. So some partitions may be more overloaded than others, slowing down the whole pipeline. - In the example above, we have set the same partition for all - processors and batchers. You can also specify the `:partition_by` + In the example above, we set the same partition for all + processors and batchers. Alternatively, specify the `:partition_by` function for each "processor" and "batcher" individually. > #### Even partitions {: .warning} @@ -852,9 +852,8 @@ defmodule Broadway do * `context` is the user defined data structure passed to `start_link/2`. This is the place to prepare and preload any information that will be used - by `c:handle_message/3`. For example, if you need to query the database, - instead of doing it once per message, you can do it on this callback as - a best-effort optimization. + by `c:handle_message/3`. For example, query the database on this callback as + a best-effort optimization instead of doing it once per message. The length of the list of messages received by this callback is often based on the `min_demand`/`max_demand` configuration in the processor but ultimately @@ -908,7 +907,7 @@ defmodule Broadway do In case more than one batcher have been defined in the configuration, you need to specify which of them the resulting message will be forwarded - to. You can do this by calling `put_batcher/2` and returning the new + to. Do this by calling `put_batcher/2` and returning the new updated message: @impl true From b37937a1c5bc88b8c8c85bb77b288655533aaed3 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 19:18:39 -0600 Subject: [PATCH 20/32] edits to remove "you can" --- lib/broadway/message.ex | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/broadway/message.ex b/lib/broadway/message.ex index bf682d7f..ebef2365 100644 --- a/lib/broadway/message.ex +++ b/lib/broadway/message.ex @@ -36,11 +36,12 @@ defmodule Broadway.Message do @typedoc """ The Broadway message struct. - Most of these fields are manipulated by Broadway itself. You can - *read* the `:metadata` field, and you can use the functions in this - module to update most of the other fields. If you are implementing - your own producer, see the `Broadway.Producer` documentation - for more information on how to create and manipulate message structs. + `%Broadway.Message{}` is manipulated by the main Broadway process. + Use this module's functions to update message fields. The `:metadata` + is **read-only**. + + See the `Broadway.Producer` documentation + on how custom producers create and manipulate message structs. """ @type t :: %Message{ data: term, From cf2497d099cf77b89557fac3027a9d3fc20ba536 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 19:42:49 -0600 Subject: [PATCH 21/32] remove weak "you" --- lib/broadway/caller_acknowledger.ex | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/broadway/caller_acknowledger.ex b/lib/broadway/caller_acknowledger.ex index 2388a852..7727b682 100644 --- a/lib/broadway/caller_acknowledger.ex +++ b/lib/broadway/caller_acknowledger.ex @@ -2,9 +2,9 @@ defmodule Broadway.CallerAcknowledger do @moduledoc """ A simple acknowledger that sends a message back to a caller. - If you want to use this acknowledger in messages produced by your - `Broadway.Producer`, you can get its configuration by calling - the `init/0` function. For example, you can use it in + Use this acknowledger in messages produced by your + `Broadway.Producer` to get its configuration by calling + the `init/0` function. For example, use it in `Broadway.test_message/3`: some_ref = make_ref() @@ -15,9 +15,9 @@ defmodule Broadway.CallerAcknowledger do acknowledger: Broadway.CallerAcknowledger.init({self(), some_ref}, :ignored) ) - The first parameter is a tuple with the PID to receive the messages - and a unique identifier (usually a reference). Such unique identifier - is then included in the messages sent to the PID. The second parameter, + The first parameter is a 2-tuple, `{pid, ref}`. The PID is the process that + receives acknowledgment messages, and `ref` is a unique identifier (prefer `make_ref()`). + That unique identifier is then included in the messages sent to the PID. The second parameter, which is per message, is ignored. It sends a message in the format: From 267e5cae5114fcda3aee5368fdd66f6f7ce6aa65 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 19:53:51 -0600 Subject: [PATCH 22/32] * removed weak "you" * removed "should" * updated list layout --- lib/broadway/acknowledger.ex | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/lib/broadway/acknowledger.ex b/lib/broadway/acknowledger.ex index 3a945da6..14f66fe0 100644 --- a/lib/broadway/acknowledger.ex +++ b/lib/broadway/acknowledger.ex @@ -22,20 +22,16 @@ defmodule Broadway.Acknowledger do @doc """ Invoked to acknowledge successful and failed messages. - * `ack_ref` is a term that uniquely identifies how messages - should be grouped and sent for acknowledgement. Imagine - you have a scenario where messages are coming from - different producers. Broadway will use this information - to correctly identify the acknowledger and pass it among - with the messages so you can properly communicate with - the source of the data for acknowledgement. `ack_ref` is + * `ack_ref` - a unique identifier used to determine how messages + are to be grouped and sent for acknowledgement. For example, + if messages come from different producers, Broadway uses its + value to identify the acknowledger and pass it along with messages + to coordinate acknowledgements with the data source . `ack_ref` is part of `t:Broadway.Message.acknowledger/0`. - * `successful` is the list of messages that were - successfully processed and published. + * `successful` - the list of messages both processed and published. - * `failed` is the list of messages that, for some reason, - could not be processed or published. + * `failed` - the list of messages that failed processing or publishing. """ @callback ack(ack_ref :: term, successful :: [Message.t()], failed :: [Message.t()]) :: From 859351ff73fb86d812dfe333bc88b484ef1307b5 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 20:15:43 -0600 Subject: [PATCH 23/32] Broadway.NoopAcknowledger edits * removed "you want", "you can"s * Added context and formatting to the example * added example for `init/0` @doc --- lib/broadway/noop_acknowledger.ex | 41 ++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/lib/broadway/noop_acknowledger.ex b/lib/broadway/noop_acknowledger.ex index 791c69e7..65346eed 100644 --- a/lib/broadway/noop_acknowledger.ex +++ b/lib/broadway/noop_acknowledger.ex @@ -1,30 +1,41 @@ defmodule Broadway.NoopAcknowledger do @moduledoc """ - An acknowledger that does nothing. + An acknowledger that performs no operations. - If you want to use this acknowledger in messages produced by your - `Broadway.Producer`, you can get its configuration by calling - the `init/0` function. For example, you can use it in - `Broadway.test_message/3`: + Use this module to configure messages produced by `Broadway.Producer` + when no acknowledgment is needed. - Broadway.test_message(MyPipeline, "some data", acknowledger: Broadway.NoopAcknowledger.init()) + ## Example - Broadway sets this acknowledger automatically on messages that have been acked - via `Broadway.Message.ack_immediately/1`. + Use with `Broadway.test_message/3` for testing purposes: + + Broadway.test_message( + MyPipeline, + "some data", + acknowledger: Broadway.NoopAcknowledger.init() + ) + + This acknowledger is automatically set by Broadway for messages + acknowledged via `Broadway.Message.ack_immediately/1`. """ @behaviour Broadway.Acknowledger @doc """ - Returns the acknowledger metadata. + Returns a no-op acknowledger tuple. + + The tuple format `{module, ack_ref, data}` satisfies the + `Broadway.Acknowledger` contract without performing any operation. + + ## Example + + iex> Broadway.NoopAcknowledger.init() + {Broadway.NoopAcknowledger, nil, nil} """ @spec init() :: Broadway.Message.acknowledger() - def init do - {__MODULE__, _ack_ref = nil, _data = nil} - end + def init, do: {__MODULE__, nil, nil} @impl true - def ack(_ack_ref = nil, _successful, _failed) do - :ok - end + @doc false + def ack(_ack_ref, _successful, _failed), do: :ok end From 18dde839d0321e9bfbbfadfd243b1d1da7807264 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 20:18:56 -0600 Subject: [PATCH 24/32] edited so the subject was doing the action --- lib/broadway/producer.ex | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/broadway/producer.ex b/lib/broadway/producer.ex index c1218082..9a778fdf 100644 --- a/lib/broadway/producer.ex +++ b/lib/broadway/producer.ex @@ -67,7 +67,7 @@ defmodule Broadway.Producer do """ @doc """ - Invoked once by Broadway during `Broadway.start_link/2`. + Broadway invokes this callback once during `Broadway.start_link/2`. The goal of this callback is to manipulate the general topology options, if necessary at all, and introduce any new child specs that will be @@ -76,7 +76,7 @@ defmodule Broadway.Producer do for `Supervisor`), which means that if the children returned from this callback crash they will bring down the rest of the pipeline before being restarted. - This callback is guaranteed to be invoked inside the Broadway main process. + Broadway invokes this callback within the main Broadway process. `module` is the Broadway module passed as the first argument to `Broadway.start_link/2`. `options` is all of Broadway topology options passed @@ -114,12 +114,12 @@ defmodule Broadway.Producer do when child_spec: :supervisor.child_spec() | {module, any} | module @doc """ - Invoked by the terminator right before Broadway starts draining in-flight + The terminator invokes this callback right before Broadway starts draining in-flight messages during shutdown. - This callback should be implemented by producers that need to do additional + Implement this callback for producers that need to do additional work before shutting down. That includes active producers like RabbitMQ that - must ask the data provider to stop sending messages. It will be invoked for + must ask the data provider to stop sending messages. Broadway will invoke this for each producer stage. `state` is the current state of the producer. From 3480c25b5644814d1039d3fb3ec39ac38028bdb4 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 20:22:33 -0600 Subject: [PATCH 25/32] removed weak "you" + verb --- lib/broadway/producer.ex | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/lib/broadway/producer.ex b/lib/broadway/producer.ex index 9a778fdf..f40f22ed 100644 --- a/lib/broadway/producer.ex +++ b/lib/broadway/producer.ex @@ -18,9 +18,9 @@ defmodule Broadway.Producer do ## Injected Broadway configuration If `options` is a keyword list, Broadway injects a `:broadway` option - into the keyword list. This option contains the configuration for the - complete Broadway topology (see `Broadway.start_link/2`). For example, - you can use `options[:broadway][:name]` to uniquely identify the topology. + into that list. This option contains the configuration for the + complete Broadway topology (see `Broadway.start_link/2`). For example, + use `options[:broadway][:name]` to uniquely identify the topology. The `:broadway` configuration also has an `:index` key. This is the index of the producer in its supervision tree (starting @@ -40,22 +40,19 @@ defmodule Broadway.Producer do ## Producing Broadway messages - You should generally modify `Broadway.Message` structs by using the functions - in the `Broadway.Message` module. However, if you are implementing your - own producer, you **can manipulate** some of the struct's fields directly. + The pipeline modifies `Broadway.Message` structs using functions + from the `Broadway.Message` module, except for [custom producers](https://hexdocs.pm/broadway/custom-producers.html). These fields are: - * `:data` (required) - the data of the message. Even though the function - `Broadway.Message.put_data/2` exists, when creating a `%Broadway.Message{}` - struct from scratch you will have to pass in the `:data` field directly. + * `:data` (required) - the message data. + Pass in the `:data` field directly. (Don't use `Broadway.Message.put_data/2`.) * `:acknowledger` (required) - the acknowledger of the message, of type `t:Broadway.Message.acknowledger/0`. - * `:metadata` (optional) - metadata about the message that your producer - can attach to the message. This is useful when you want to add some metadata - to messages, and document it for users to use in their pipelines. + * `:metadata` (optional) - the producer-attached message metadata. + Optionally document information for users to use in their pipelines. For example, a producer could create a message by doing something like this: From 0b74efc43c332753c3c135929fbe7348e0161ff9 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Tue, 1 Apr 2025 20:39:10 -0600 Subject: [PATCH 26/32] * parameter format edits * removing some passive voice and weak words like "should" * minor rewrite for different word choice/sentence structure. *white space changes Note: If we don't like the formatting of the `c:prepare_for_start/2` parameters, here are the edits in paragraph form: The `module` parameter represents the Broadway module passed as the first argument to `Broadway.start_link/2`. The `options` parameter contains all the Broadway topology options passed as the second argument to `Broadway.start_link/2`. --- lib/broadway/producer.ex | 66 ++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/lib/broadway/producer.ex b/lib/broadway/producer.ex index f40f22ed..594de1ec 100644 --- a/lib/broadway/producer.ex +++ b/lib/broadway/producer.ex @@ -18,43 +18,41 @@ defmodule Broadway.Producer do ## Injected Broadway configuration If `options` is a keyword list, Broadway injects a `:broadway` option - into that list. This option contains the configuration for the - complete Broadway topology (see `Broadway.start_link/2`). For example, - use `options[:broadway][:name]` to uniquely identify the topology. + into that list. This option contains the configuration for the + complete Broadway topology (see `Broadway.start_link/2`). For example, + use `options[:broadway][:name]` to uniquely identify the topology. The `:broadway` configuration also has an `:index` key. This is the index of the producer in its supervision tree (starting - from `0`). This allows features such as having even producers - connect to some server while odd producers connect to another. - - If `options` is any other term, it is passed as is to the `c:GenStage.init/1` - callback. All other functions behave precisely as in `GenStage` - with the requirements that all emitted events must be `Broadway.Message` - structs. + from `0`). This enables patterns such as connecting even-indexed producers + to one server while odd-indexed producers connect to another. + If `options` is any other term besides a keyword list, it is passed directly to the `c:GenStage.init/1` + callback without modification. All other functions behave precisely as in `GenStage` + with the requirement that all emitted events must be `Broadway.Message` structs. ## Optional callbacks A `Broadway.Producer` can implement two optional Broadway callbacks, - `c:prepare_for_start/2` and `c:prepare_for_draining/1`, which are useful - for booting up and shutting down Broadway topologies respectively. + `c:prepare_for_start/2` and `c:prepare_for_draining/1`, which + boot up and shut down Broadway topologies, respectively. ## Producing Broadway messages The pipeline modifies `Broadway.Message` structs using functions from the `Broadway.Message` module, except for [custom producers](https://hexdocs.pm/broadway/custom-producers.html). - These fields are: + Manipulate these custom producer struct fields directly: * `:data` (required) - the message data. Pass in the `:data` field directly. (Don't use `Broadway.Message.put_data/2`.) - * `:acknowledger` (required) - the acknowledger of the message, of type - `t:Broadway.Message.acknowledger/0`. + * `:acknowledger` (required) - the message acknowledger. + The acknowledger's type is `t:Broadway.Message.acknowledger/0`. * `:metadata` (optional) - the producer-attached message metadata. Optionally document information for users to use in their pipelines. - For example, a producer could create a message by doing something like this: + For example, a custom producer creates this message: %Broadway.Message{ data: "some data here", @@ -66,25 +64,27 @@ defmodule Broadway.Producer do @doc """ Broadway invokes this callback once during `Broadway.start_link/2`. - The goal of this callback is to manipulate the general topology options, - if necessary at all, and introduce any new child specs that will be - started **before** the producers' supervisor in Broadway's supervision tree. - Broadway's supervision tree is a `rest_for_one` supervisor (see the documentation - for `Supervisor`), which means that if the children returned from this callback - crash they will bring down the rest of the pipeline before being restarted. + This callback manipulates the general topology options + and introduces any new child specifications that Broadway will + start **before** the producers' supervisor in its supervision tree. + Broadway's supervision tree operates as a `rest_for_one` supervisor + (see `Supervisor`), so if the children returned from this callback + crash, the entire pipeline shuts down until the supervisor restarts them. + + Broadway invokes this callback within the main Broadway process with the + following parameters: - Broadway invokes this callback within the main Broadway process. + * `module` - the Broadway module passed as the first argument to `Broadway.start_link/2`. - `module` is the Broadway module passed as the first argument to - `Broadway.start_link/2`. `options` is all of Broadway topology options passed - as the second argument to `Broadway.start_link/2`. + * `options` - a keyword list of Broadway topology options passed + as the second argument to `Broadway.start_link/2`. - The return value of this callback is a tuple `{child_specs, options}`. `child_specs` - is the list of child specs to be started under Broadway's supervision tree. - `updated_options` is a potentially-updated list of Broadway options - that will be used instead of the ones passed to `Broadway.start_link/2`. This can be - used to modify the characteristics of the Broadway topology to accommodate - the children started here. + The callback returns a tuple `{child_specs, updated_options}`. `child_specs` + is a list of child specifications from the children that Broadway will start under its supervision tree. + `updated_options` contains potentially-modified Broadway options + that replace those initially passed to `Broadway.start_link/2`. This + allows adjustments to the characteristics of the Broadway topology to accommodate + the newly introduced children. ## Examples @@ -119,7 +119,7 @@ defmodule Broadway.Producer do must ask the data provider to stop sending messages. Broadway will invoke this for each producer stage. - `state` is the current state of the producer. + * `state` - the current state of the producer. """ @callback prepare_for_draining(state :: any) :: {:noreply, [event], new_state} From 5d838fef415703958cbd7397a07031615d705056 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Wed, 2 Apr 2025 02:26:37 -0600 Subject: [PATCH 27/32] removed "vague" might/could --- guides/examples/amazon-sqs.md | 4 ++-- guides/examples/apache-kafka.md | 2 +- guides/examples/google-cloud-pubsub.md | 4 ++-- guides/examples/rabbitmq.md | 2 +- lib/broadway.ex | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/guides/examples/amazon-sqs.md b/guides/examples/amazon-sqs.md index d6fc75ff..0fcd7d44 100644 --- a/guides/examples/amazon-sqs.md +++ b/guides/examples/amazon-sqs.md @@ -190,7 +190,7 @@ in the supervision tree. Some of the configuration options available for Broadway come already with a "reasonable" default value. However those values might not suit your requirements. Depending on the number of messages you get, how much processing -they need and how much IO work is going to take place, you might need completely +they need and how much IO work is going to take place, you need completely different values to optimize the flow of your pipeline. The `concurrency` option available for every set of producers, processors and batchers, among with `max_demand`, `batch_size`, and `batch_timeout` can give you a great deal @@ -202,7 +202,7 @@ See the notes on [`Producer concurrency`](https://hexdocs.pm/broadway/Broadway.h and [`Batcher concurrency`](https://hexdocs.pm/broadway/Broadway.html#module-batcher-concurrency) for details. -Here's an example on how you could tune them according to +Here's an example on how you tune them according to your needs. defmodule MyBroadway do diff --git a/guides/examples/apache-kafka.md b/guides/examples/apache-kafka.md index c3724c5e..6e157110 100644 --- a/guides/examples/apache-kafka.md +++ b/guides/examples/apache-kafka.md @@ -231,7 +231,7 @@ You should see the output showing the generated batches: Some of the configuration options available for Broadway come already with a "reasonable" default value. However, those values might not suit your requirements. Depending on the number of records you get, how much processing -they need and how much IO work is going to take place, you might need completely +they need and how much IO work is going to take place, you need completely different values to optimize the flow of your pipeline. The `concurrency` option available for every set of producers, processors and batchers, along with `batch_size` and `batch_timeout` can give you a great deal of flexibility. diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index 986b8abf..277c17f6 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -242,7 +242,7 @@ Got batch of finished jobs from processors, sending ACKs to Pub/Sub as a batch.: Some of the configuration options available for Broadway come already with a "reasonable" default value. However those values might not suit your requirements. Depending on the number of messages you get, how much processing -they need and how much IO work is going to take place, you might need completely +they need and how much IO work is going to take place, you need completely different values to optimize the flow of your pipeline. The `concurrency` option available for every set of producers, processors and batchers, among with `max_demand`, `batch_size`, and `batch_timeout` can give you a great deal @@ -254,7 +254,7 @@ See the notes on [`Producer concurrency`](https://hexdocs.pm/broadway/Broadway.h and [`Batcher concurrency`](https://hexdocs.pm/broadway/Broadway.html#module-batcher-concurrency) for details. -Here's an example on how you could tune them according to +Here's an example on how you tune them according to your needs. defmodule MyBroadway do diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index 10093c0f..09a5dc18 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -241,7 +241,7 @@ You should see the output showing the generated batches: Some of the configuration options available for Broadway come already with a "reasonable" default value. However, those values might not suit your requirements. Depending on the number of messages you get, how much processing -they need and how much IO work is going to take place, you might need completely +they need and how much IO work is going to take place, you need completely different values to optimize the flow of your pipeline. The `concurrency` option available for every set of producers, processors and batchers, among with `max_demand`, `batch_size`, and `batch_timeout` can give you a great deal diff --git a/lib/broadway.ex b/lib/broadway.ex index 6ec21e9a..5ce2b8d5 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -424,7 +424,7 @@ defmodule Broadway do end end - Now in `config/test.exs` you could do: + Now in `config/test.exs` you do: config :my_app, producer_module: Broadway.DummyProducer, @@ -972,7 +972,7 @@ defmodule Broadway do * `context` is the user-defined data structure passed to `start_link/2`. This callback must return the same messages given to it, possibly updated. - For example, you could update the message data or use `Broadway.Message.configure_ack/2` + For example, you update the message data or use `Broadway.Message.configure_ack/2` in a centralized place to configure how to ack the message based on the failure reason. From b81b2c0cd47663e6a34daf859a95311042812ded Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Wed, 2 Apr 2025 02:53:43 -0600 Subject: [PATCH 28/32] edited out "you need to" (passive voice) --- README.md | 2 +- guides/examples/amazon-sqs.md | 2 +- guides/examples/apache-kafka.md | 6 ++---- guides/examples/custom-producers.md | 8 ++++---- guides/examples/google-cloud-pubsub.md | 2 +- guides/examples/rabbitmq.md | 2 +- lib/broadway.ex | 26 ++++++++++++-------------- 7 files changed, 22 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 22665b90..52e2cbe8 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ defmodule MyBroadway do end ``` -Once your Broadway module is defined, you need to add it as a child of your application supervision tree as `{MyBroadway, []}`. +Once your Broadway module is defined, add it as a child of your application supervision tree as `{MyBroadway, []}`. ## Comparison to Flow diff --git a/guides/examples/amazon-sqs.md b/guides/examples/amazon-sqs.md index 0fcd7d44..66be9c2a 100644 --- a/guides/examples/amazon-sqs.md +++ b/guides/examples/amazon-sqs.md @@ -169,7 +169,7 @@ For more information, see `c:Broadway.handle_message/3` and ## Run the Broadway pipeline -To run your `Broadway` pipeline, you need to add as a child in +To run your `Broadway` pipeline, add it as a child in a supervision tree. Most applications have a supervision tree defined at `lib/my_app/application.ex`. Add Broadway as a child to a supervisor as follows: diff --git a/guides/examples/apache-kafka.md b/guides/examples/apache-kafka.md index 6e157110..e28fd484 100644 --- a/guides/examples/apache-kafka.md +++ b/guides/examples/apache-kafka.md @@ -163,10 +163,8 @@ For more information, see `c:Broadway.handle_message/3` and ## Run the Broadway pipeline -To run your `Broadway` pipeline, you need to add as a child in -a supervision tree. Most applications have a supervision tree defined -at `lib/my_app/application.ex`. Add Broadway as a child to a -supervisor as follows: +Add your `Broadway` pipeline as a child in a supervision tree to run it. Most applications have a supervision tree defined at `lib/my_app/application.ex`. +Add the child process `{MyBroadway, []}` to a supervisor as follows: children = [ {MyBroadway, []} diff --git a/guides/examples/custom-producers.md b/guides/examples/custom-producers.md index 11b950f9..0387e9e2 100644 --- a/guides/examples/custom-producers.md +++ b/guides/examples/custom-producers.md @@ -5,9 +5,9 @@ If there is no existing Broadway producer for your technology of choice, integra ## Example In general, producers must generate `%Broadway.Message{}` structs in order -to be processed by Broadway. In case you need to use an existing GenStage -producer and you don't want to change its original implementation, -you'll have to set the producer's `:transformer` option to translate the +to be processed by Broadway. To use an existing GenStage +producer without changing its original implementation, +set the producer's `:transformer` option to translate the generated events into Broadway messages. In the following example the producer is a regular GenStage, i.e., it @@ -68,7 +68,7 @@ generated by the producer into proper Broadway messages: end end -Notice that you need to pass two options to the producer: +Pass two options to the producer: * `:module` - a tuple representing the GenStage producer as `{mod, arg}`. Where `mod` is module that implements the GenStage behaviour and `arg` diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index 277c17f6..e3764cc6 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -195,7 +195,7 @@ For more information, see `c:Broadway.handle_message/3` and `c:Broadway.handle_b ## Run the Broadway pipeline -To run your `Broadway` pipeline, you need to add it as a child in a supervision tree. Most +To run your `Broadway` pipeline, add it as a child in a supervision tree. Most applications have a supervision tree defined at `lib/my_app/application.ex`. Add Broadway as a child to a supervisor as follows: diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index 09a5dc18..0ebb4743 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -178,7 +178,7 @@ For more information, see `c:Broadway.handle_message/3` and ## Run the Broadway pipeline -To run your `Broadway` pipeline, you need to add as a child in +To run your `Broadway` pipeline, add it as a child in a supervision tree. Most applications have a supervision tree defined at `lib/my_app/application.ex`. Add Broadway as a child to a supervisor as follows: diff --git a/lib/broadway.ex b/lib/broadway.ex index 5ce2b8d5..32eca3ec 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -63,14 +63,14 @@ defmodule Broadway do ## The Broadway behaviour - In order to use Broadway, you need to: + To use Broadway: - 1. Define your pipeline configuration - 2. Define a module implementing the Broadway behaviour + 1. define your pipeline configuration + 2. define a module implementing the Broadway behaviour ### Example - Broadway is a process-based behaviour, and you begin by + Broadway is a process-based behaviour, so begin by defining a module that invokes `use Broadway`. Processes defined by these modules will often be started by a supervisor, and so a `start_link/1` function is frequently @@ -127,7 +127,7 @@ defmodule Broadway do [processor_1] [processor_2] <- process each message ``` - After the pipeline is defined, you need to implement the `c:handle_message/3` + After the pipeline is defined, implement the `c:handle_message/3` callback which will be invoked by processors for each message. `c:handle_message/3` receives every message as a `Broadway.Message` @@ -484,9 +484,8 @@ defmodule Broadway do ### Testing with Ecto - If you are using Ecto in your Broadway processors and you want - to run your tests concurrently, you need to tell Broadway to - use the Ecto SQL Sandbox during tests. This can be done in two + If your processors use Ecto, use the Ecto SQL Sandbox + during tests to run them concurrently. Tell Broadway in two steps. First, when you call `test_messages/3` in your tests, include @@ -905,9 +904,9 @@ defmodule Broadway do |> update_data(&do_calculation_and_returns_the_new_data/1) end - In case more than one batcher have been defined in the configuration, - you need to specify which of them the resulting message will be forwarded - to. Do this by calling `put_batcher/2` and returning the new + In case more than one batcher has been defined in the configuration, + specify which will be forwarded the resulting message. + Do this by calling `put_batcher/2` and returning the new updated message: @impl true @@ -1060,9 +1059,8 @@ defmodule Broadway do ## Options - In order to set up how the pipeline created by Broadway should work, - you need to specify the blueprint of the pipeline. You can - do this by passing a set of options to `start_link/2`. + Specify a Broadway pipeline's blueprint to set up how it works + by passing a set of options to `start_link/2`. Each component of the pipeline has its own set of options. The Broadway options are: From 812f5f063119dd810f45d54e0ef5f67887fef7f2 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Wed, 2 Apr 2025 03:09:25 -0600 Subject: [PATCH 29/32] removed "we need to" (passive) --- guides/examples/amazon-sqs.md | 9 ++++----- guides/examples/apache-kafka.md | 6 +++--- guides/examples/google-cloud-pubsub.md | 7 ++++--- guides/examples/rabbitmq.md | 6 +++--- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/guides/examples/amazon-sqs.md b/guides/examples/amazon-sqs.md index 66be9c2a..220b1183 100644 --- a/guides/examples/amazon-sqs.md +++ b/guides/examples/amazon-sqs.md @@ -66,8 +66,8 @@ Don't forget to check for the latest version of dependencies. ## Define the pipeline configuration -Broadway is a process-based behaviour and to define a Broadway -pipeline, we need to define three functions: `start_link/1`, +A Broadway pipeline is an implementation of a process-based behaviour and +is defined by three functions: `start_link/1`, `handle_message/3` and `handle_batch/4`. We will cover `start_link/1` in this section and the `handle_` callbacks in the next one. @@ -135,9 +135,8 @@ module docs as well as `Broadway.start_link/2`. ## Implement Broadway callbacks -In order to process incoming messages, we need to implement the -required callbacks. For the sake of simplicity, we're considering that -all messages received from the queue are numbers: +Implement the required callbacks to process incoming messages. +In this example, all messages received from the queue are numbers: defmodule MyBroadway do use Broadway diff --git a/guides/examples/apache-kafka.md b/guides/examples/apache-kafka.md index e28fd484..43a7d77e 100644 --- a/guides/examples/apache-kafka.md +++ b/guides/examples/apache-kafka.md @@ -8,7 +8,7 @@ Kafka is a distributed streaming platform that has three key capabilities: ## Getting started -In order to use Broadway with Kafka, we need to: +To use Broadway with Kafka: 1. Create a stream of records (or use an existing one) 1. Configure your Elixir project to use Broadway @@ -125,8 +125,8 @@ module docs as well as `Broadway.start_link/2`. ## Implement Broadway callbacks -In order to process incoming messages, we need to implement the -required callbacks. For the sake of simplicity, we're considering that +Implement callbacks to process incoming messages. +In this example, all messages received from the topic are numbers: defmodule MyBroadway do diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index e3764cc6..fecbc7c9 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -79,7 +79,7 @@ This command generated a `credentials.json` file which will be useful later. Not pattern is `@.iam.gserviceaccount.com`. Run `gcloud iam service-accounts list --project test-pubsub` to see all service accounts associated with the given project. -Finally, we need to enable Pub/Sub for our project: +Finally, enable Pub/Sub for our project: $ gcloud services enable pubsub --project test-pubsub Operation "operations/xxx" finished successfully. @@ -114,7 +114,8 @@ Don't forget to check for the latest version of dependencies. ## Define the pipeline configuration -Broadway is a process-based behaviour and to define a Broadway pipeline, we need to define three +Broadway is a process-based behaviour and a Broadway pipeline +is defined with three functions: `start_link/1`, `handle_message/3` and `handle_batch/4`. We will cover `start_link/1` in this section and the `handle_` callbacks in the next one. @@ -164,7 +165,7 @@ For general information about setting up Broadway, see `Broadway` module docs as ## Implement Broadway callbacks -In order to process incoming messages, we need to implement the required callbacks. For the sake +In order to process incoming messages, implement the required callbacks. For the sake of simplicity, we're considering that all messages received from the queue are strings and our processor calls `String.upcase/1` on them: diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index 0ebb4743..22a8c3be 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -70,8 +70,8 @@ Don't forget to check for the latest version of dependencies. ## Define the pipeline configuration -Broadway is a process-based behaviour and to define a Broadway pipeline, -we need to define the `start_link/1` function, the `c:handle_message/3` +Broadway is a process-based behaviour, and a Broadway pipeline +is defined by the `start_link/1` function, the `c:handle_message/3` callback, and optionally, the `c:handle_batch/4` callback. We will cover `start_link/1` in this section and the `handle_` callbacks in the next one. Similar to other process-based behaviours, `start_link/1` simply @@ -139,7 +139,7 @@ module docs as well as `Broadway.start_link/2`. ## Implement Broadway callbacks -In order to process incoming messages, we need to implement the +In order to process incoming messages, implement the required callbacks. For the sake of simplicity, we're considering that all messages received from the queue are numbers: From b7e085946ee96ab9b74bf3c9372921ef13ab402c Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Wed, 2 Apr 2025 03:16:43 -0600 Subject: [PATCH 30/32] removed "you should" (passive) --- guides/examples/google-cloud-pubsub.md | 2 +- guides/examples/rabbitmq.md | 2 +- lib/broadway.ex | 6 +++--- lib/broadway/message.ex | 4 ++-- lib/broadway/options.ex | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index fecbc7c9..98b07104 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -232,7 +232,7 @@ And in another tab, send a couple of test messages to Pub/Sub: messageIds: - '651427034966696' -Now, In the first tab, you should see output similar to: +In the first tab, see output similar to: ``` Got batch of finished jobs from processors, sending ACKs to Pub/Sub as a batch.: ["TEST 1", "TEST 2"] diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index 22a8c3be..547a4d8c 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -198,7 +198,7 @@ Test your pipeline by entering an `iex` session: $ iex -S mix -If everything went fine, you should see lots of `info` log messages from the `amqp` +If everything went fine, you will see lots of `info` log messages from the `amqp` supervisors. If you think that's too verbose and want to do something about it, please take a look at the _"Log related to amqp supervisors are too verbose"_ subsection in the `amqp`'s [Troubleshooting](https://hexdocs.pm/amqp/readme.html#troubleshooting) diff --git a/lib/broadway.ex b/lib/broadway.ex index 32eca3ec..be241c0a 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -342,7 +342,7 @@ defmodule Broadway do Setting producer concurrency is a tradeoff between latency and internal queueing. - For efficiency, you should generally limit the amount of internal queueing. + For efficiency, limit the amount of internal queueing. Whenever additional messages are sitting in a busy processor's mailbox, they can't be delivered to another processor which may be available or become available first. @@ -455,7 +455,7 @@ defmodule Broadway do and verify single messages, without imposing high timeouts to our test suites. - In case you want to test multiple messages, then you need to use + To test multiple messages, use `test_batch/3`. `test_batch/3` will respect the batching configuration, which most likely means you need to increase your test timeouts: @@ -518,7 +518,7 @@ defmodule Broadway do BroadwayEctoSandbox.attach(MyApp.Repo) - And now you should have concurrent Broadway tests that talk to the database. + And now you have concurrent Broadway tests that talk to the database. ## Ordering and partitioning diff --git a/lib/broadway/message.ex b/lib/broadway/message.ex index ebef2365..0be32f35 100644 --- a/lib/broadway/message.ex +++ b/lib/broadway/message.ex @@ -8,9 +8,9 @@ defmodule Broadway.Message do through the `c:Broadway.handle_message/3` callback or internally by one of the built-in stages of Broadway. - Instead of modifying the struct directly, you should use the functions + Instead of modifying the struct directly, use the functions provided by this module to manipulate messages. However, if you are implementing - a `Broadway.Producer` of your own, see `t:t/0` to see what fields you should set. + a `Broadway.Producer` of your own, see `t:t/0` for what fields to set. """ alias __MODULE__, as: Message diff --git a/lib/broadway/options.ex b/lib/broadway/options.ex index 3f3051a9..4d8cfed9 100644 --- a/lib/broadway/options.ex +++ b/lib/broadway/options.ex @@ -148,7 +148,7 @@ defmodule Broadway.Options do > > However, that's not true. Separation of concerns is modeled > by defining several modules and functions, not processors. Processors - > are ultimately about moving data around and you should only do it + > are ultimately about moving data around - only do it > when necessary. Using processors for code organization purposes would > lead to inefficient pipelines. From 88ed9cf43adc4052e4e3341b014c9e811b13fc82 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Wed, 2 Apr 2025 03:19:56 -0600 Subject: [PATCH 31/32] removed foms of "need to" (passive/weak) --- guides/examples/amazon-sqs.md | 2 +- guides/examples/rabbitmq.md | 2 +- lib/broadway/producer.ex | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/guides/examples/amazon-sqs.md b/guides/examples/amazon-sqs.md index 220b1183..0efd824a 100644 --- a/guides/examples/amazon-sqs.md +++ b/guides/examples/amazon-sqs.md @@ -107,7 +107,7 @@ Assuming we want to consume messages from a queue called The above configuration also assumes that you have the AWS credentials set up in your environment, for instance, by having the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables set. If that's -not the case, you will need to pass that information to the client so it +not the case, pass that information to the client so it can properly connect to the AWS servers. Here's how to do it: ... diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index 547a4d8c..d1bbe07f 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -116,7 +116,7 @@ Assuming we want to consume messages from a queue called end If you're consuming data from an existing broker that requires authorization, -you'll need to provide your credentials using the `connection` option: +provide your credentials using the `connection` option: ... producer: [ diff --git a/lib/broadway/producer.ex b/lib/broadway/producer.ex index 594de1ec..1922621a 100644 --- a/lib/broadway/producer.ex +++ b/lib/broadway/producer.ex @@ -114,7 +114,7 @@ defmodule Broadway.Producer do The terminator invokes this callback right before Broadway starts draining in-flight messages during shutdown. - Implement this callback for producers that need to do additional + Implement this callback for producers that do additional work before shutting down. That includes active producers like RabbitMQ that must ask the data provider to stop sending messages. Broadway will invoke this for each producer stage. From d7f4e00f6c0f6ddbd0b6e1a253eb787dd72157af Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Wed, 2 Apr 2025 04:21:01 -0600 Subject: [PATCH 32/32] Passive constructions * "in order to" * "simply" * "we need to"/"we can" * "you need to" * "blank of blanks" * "you should" * "never stops" (weakly double negative) * Change "you can blank" to "blank" --- CHANGELOG.md | 2 +- README.md | 2 +- guides/examples/amazon-sqs.md | 4 ++-- guides/examples/apache-kafka.md | 28 +++++++++++--------------- guides/examples/google-cloud-pubsub.md | 6 +++--- guides/examples/rabbitmq.md | 2 +- lib/broadway.ex | 17 ++++++++-------- lib/broadway/acknowledger.ex | 2 +- 8 files changed, 29 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfec8f78..141eb6ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ ### Bug fix - * No longer set demand to `:accumulate` when draining, for compatibility with GenStage v1.2+. This means that any polling implementation must implement the `prepare_for_draining` callback and stop polling messages. You can check how [BroadwaySQS](https://github.com/dashbitco/broadway_sqs/commit/5b8f18a78e4760b5fcc839ad576be8c63345add0) tackles this problem as an example + * No longer set demand to `:accumulate` when draining, for compatibility with GenStage v1.2+. This means that any polling implementation must implement the `prepare_for_draining` callback and stop polling messages. Check how [BroadwaySQS](https://github.com/dashbitco/broadway_sqs/commit/5b8f18a78e4760b5fcc839ad576be8c63345add0) tackles this problem as an example ### Enhancements diff --git a/README.md b/README.md index 52e2cbe8..9ceda785 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ end ## A quick example: SQS integration -Assuming you have added [`broadway_sqs`](https://github.com/dashbitco/broadway_sqs) as a dependency and configured your SQS credentials accordingly, you can consume Amazon SQS events in only 20 LOCs: +Assuming you have added [`broadway_sqs`](https://github.com/dashbitco/broadway_sqs) as a dependency and configured your SQS credentials accordingly, consume Amazon SQS events in only 20 LOCs: ```elixir defmodule MyBroadway do diff --git a/guides/examples/amazon-sqs.md b/guides/examples/amazon-sqs.md index 0efd824a..a5bbeebe 100644 --- a/guides/examples/amazon-sqs.md +++ b/guides/examples/amazon-sqs.md @@ -20,7 +20,7 @@ Broadway can work seamlessly with both, Standard and FIFO queues. ## Getting started -In order to use Broadway with SQS, we need to: +To use Broadway with SQS: 1. Create a SQS queue (or use an existing one) 1. Configure our Elixir project to use Broadway @@ -71,7 +71,7 @@ is defined by three functions: `start_link/1`, `handle_message/3` and `handle_batch/4`. We will cover `start_link/1` in this section and the `handle_` callbacks in the next one. -Similar to other process-based behaviour, `start_link/1` simply +Similar to other process-based behaviour, `start_link/1` delegates to `Broadway.start_link/2`, which should define the producers, processors, and batchers in the Broadway pipeline. Assuming we want to consume messages from a queue called diff --git a/guides/examples/apache-kafka.md b/guides/examples/apache-kafka.md index 43a7d77e..f41a918d 100644 --- a/guides/examples/apache-kafka.md +++ b/guides/examples/apache-kafka.md @@ -2,9 +2,9 @@ Kafka is a distributed streaming platform that has three key capabilities: - * Publish and subscribe to streams of records - * Store streams of records in a fault-tolerant durable way - * Process streams of records as they occur + * Publish and subscribe to record streams + * Store record streams with fault-tolerant durability + * Process record streams in real-time ## Getting started @@ -26,9 +26,7 @@ initializing Kafka, create a new stream by running: ## Configure your Elixir project to use Broadway -This guide describes the steps necessary to integrate Broadway with Kafka using -[BroadwayKafka](https://github.com/dashbitco/broadway_kafka), -which is a Broadway Kafka Connector provided by [Dashbit](https://dashbit.co/). +This guide uses [BroadwayKafka](https://github.com/dashbitco/broadway_kafka) from Dashbit to integrate Broadway with Kafka. BroadwayKafka can subscribe to one or more topics and process streams of records using Kafka's [Consumer API](https://kafka.apache.org/documentation.html#consumerapi). @@ -157,9 +155,7 @@ purpose. First, we update the message's data individually inside For more information, see `c:Broadway.handle_message/3` and `c:Broadway.handle_batch/4`. -> Note: Since Broadway v0.2, batching is optional. In case you don't need to -> group messages as batches for further processing/publishing, remove -> the `:batchers` configuration along with the `handle_batch/4` callback. +> Note: Broadway v0.2 makes batching optional. Remove the `:batchers` configuration along with the `c:handle_batch/4` callback if unneeded. ## Run the Broadway pipeline @@ -191,7 +187,7 @@ under the hood to communicate with Kafka. ### Sending messages to Kafka -Finally, we can send some sample messages to Kafka using using `:brod` with the following snippet: +Use `:brod` to send sample messages to Kafka: topic = "test" client_id = :my_client @@ -205,7 +201,7 @@ Finally, we can send some sample messages to Kafka using using `:brod` with the :ok = :brod.produce_sync(client_id, topic, partition, _key="", "#{i}") end) -You should see the output showing the generated batches: +See the output showing the generated batches: Got batch: [ {"2", 4}, @@ -254,11 +250,11 @@ can still receive more assignments than planned. For instance, if another consum the server will reassign all its topic/partition to other available consumers, including any Broadway producer subscribed to the same topic. -There are other options that you may want to take a closer look when tuning your configuration. -The `:max_bytes` option, for instance, belongs to the `:fetch_config` group and defines the -maximum amount of data to be fetched at a time from a single partition. The default is +Other options require attention during configuration tuning. +The `:max_bytes` option (part of `:fetch_config`) defines the +maximum data fetched at a time from a single partition. The default is 1048576 (1 MiB). Setting greater values can improve throughput at the cost of more -memory consumption. For more information and other fetch options, please refer to the +memory consumption. For more fetch options, please refer to the "Fetch config options" in the official [BroadwayKafka](https://hexdocs.pm/broadway_kafka/) documentation. @@ -285,4 +281,4 @@ tuning `:offset_commit_interval_seconds` and `:offset_commit_on_ack`. ## Handling failed messages -`broadway_kafka` never stops the flow of the stream and will **always ack messages**, even when they fail. Unlike queue-based connectors, which allow marking individual messages as failed, Kafka's single offset per topic/partition strategy prevents that. To reprocess failed messages, implement your own strategy, such as using `handle_failed/2` to send failed messages to a separate stream or queue for later processing. +`broadway_kafka` **always acknowledges** (yes, also failed) messages, so the stream flow is never stopped. Unlike queue-based connectors, Kafka’s single offset-per-topic/partition strategy prevents marking individual messages as failed. To reprocess failures, implement a custom strategy (e.g., using `handle_failed/2` to redirect failed messages to a separate stream or queue). diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index 98b07104..b5f9c332 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -54,7 +54,7 @@ And a new subscription: Created subscription [projects/test-pubsub/subscriptions/test-subscription]. We also need a [service account](https://cloud.google.com/iam/docs/service-accounts), an IAM -policy, as well as API credentials in order to programmatically work with the service. First, let's +policy, as well as API credentials to programmatically work with the service. First, let's create the service account: $ gcloud iam service-accounts create test-account --project test-pubsub @@ -165,8 +165,8 @@ For general information about setting up Broadway, see `Broadway` module docs as ## Implement Broadway callbacks -In order to process incoming messages, implement the required callbacks. For the sake -of simplicity, we're considering that all messages received from the queue are strings and our +Implement the required callbacks to process incoming messages. +In this example, all messages received from the queue are strings and our processor calls `String.upcase/1` on them: defmodule MyBroadway do diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index d1bbe07f..3de03c62 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -74,7 +74,7 @@ Broadway is a process-based behaviour, and a Broadway pipeline is defined by the `start_link/1` function, the `c:handle_message/3` callback, and optionally, the `c:handle_batch/4` callback. We will cover `start_link/1` in this section and the `handle_` callbacks in the next one. -Similar to other process-based behaviours, `start_link/1` simply +Similar to other process-based behaviours, `start_link/1` delegates to `Broadway.start_link/2`, which defines the producers, processors, and batchers in the Broadway pipeline. Assuming we want to consume messages from a queue called diff --git a/lib/broadway.ex b/lib/broadway.ex index be241c0a..a7667c68 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -457,7 +457,7 @@ defmodule Broadway do To test multiple messages, use `test_batch/3`. `test_batch/3` will respect the batching configuration, - which most likely means you need to increase your test timeouts: + which most likely means increasing your test timeouts: test "batch messages" do ref = Broadway.test_batch(MyBroadway, [1, 2, 3]) @@ -529,8 +529,8 @@ defmodule Broadway do This can be done with the `:partition_by` option, which enforces that messages with a given property are always forwarded to the same stage. - In order to provide partitioning throughout the whole pipeline, just - set `:partition_by` at the root of your configuration: + Set `:partition_by` at the root of your configuration to provide + partitioning throughout the whole pipeline: defmodule MyBroadway do use Broadway @@ -603,7 +603,7 @@ defmodule Broadway do > may be undesired. If your producer supports retrying, the > failed message may be retried later, out of its original order. > Those issues happen regardless of Broadway and solutions to said - > problems almost always need to be addressed outside of Broadway too. + > problems are almost always addressed outside of Broadway, too. ## Configuration storage @@ -894,9 +894,8 @@ defmodule Broadway do logic to do calculations. Basically, any CPU bounded task that runs against a single message should be processed here. - In order to update the data after processing, use the - `Broadway.Message.update_data/2` function. This way the new message can be - properly forwarded and handled by the batcher: + Use the `Broadway.Message.update_data/2` function to update the data after processing, + so the new message can be properly forwarded and handled by the batcher: @impl true def handle_message(_, message, _) do @@ -1267,7 +1266,7 @@ defmodule Broadway do for the Broadway pipeline `batch_size` to be filled or the `batch_timeout` to be triggered. - It returns a reference that can be used to identify the ack + It returns a reference used to identify the ack messages. See ["Testing"](#module-testing) section in module documentation @@ -1315,7 +1314,7 @@ defmodule Broadway do or if the messages in the batch take more time to process than `batch_timeout` then the caller will receive multiple messages. - It returns a reference that can be used to identify the ack + It returns a reference used to identify the ack messages. See ["Testing"](#module-testing) section in module documentation diff --git a/lib/broadway/acknowledger.ex b/lib/broadway/acknowledger.ex index 14f66fe0..5853c577 100644 --- a/lib/broadway/acknowledger.ex +++ b/lib/broadway/acknowledger.ex @@ -7,7 +7,7 @@ defmodule Broadway.Acknowledger do implement this behaviour and consider how the technology you're working with handles message acknowledgement. - The `c:ack/3` callback must be implemented in order to notify + The `c:ack/3` callback must be implemented to notify the origin of the data that a message can be safely removed after been successfully processed and published. In case of failed messages or messages without acknowledgement, depending