diff --git a/CHANGELOG.md b/CHANGELOG.md index bfec8f7..141eb6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ ### Bug fix - * No longer set demand to `:accumulate` when draining, for compatibility with GenStage v1.2+. This means that any polling implementation must implement the `prepare_for_draining` callback and stop polling messages. You can check how [BroadwaySQS](https://github.com/dashbitco/broadway_sqs/commit/5b8f18a78e4760b5fcc839ad576be8c63345add0) tackles this problem as an example + * No longer set demand to `:accumulate` when draining, for compatibility with GenStage v1.2+. This means that any polling implementation must implement the `prepare_for_draining` callback and stop polling messages. Check how [BroadwaySQS](https://github.com/dashbitco/broadway_sqs/commit/5b8f18a78e4760b5fcc839ad576be8c63345add0) tackles this problem as an example ### Enhancements diff --git a/README.md b/README.md index b404b59..9ceda78 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Broadway takes the burden of defining concurrent GenStage topologies and provide ### Producers -There are several producers that you can use to integrate with existing services and technologies. [See the docs for detailed how-tos and supported producers](https://hexdocs.pm/broadway/introduction.html#official-producers). +Currently we officially support four Broadway producers that integrate with existing services and technologies. [See the docs for detailed how-tos and supported producers](https://hexdocs.pm/broadway/introduction.html#official-producers). ## Installation @@ -43,7 +43,7 @@ end ## A quick example: SQS integration -Assuming you have added [`broadway_sqs`](https://github.com/dashbitco/broadway_sqs) as a dependency and configured your SQS credentials accordingly, you can consume Amazon SQS events in only 20 LOCs: +Assuming you have added [`broadway_sqs`](https://github.com/dashbitco/broadway_sqs) as a dependency and configured your SQS credentials accordingly, consume Amazon SQS events in only 20 LOCs: ```elixir defmodule MyBroadway do @@ -82,7 +82,7 @@ defmodule MyBroadway do end ``` -Once your Broadway module is defined, you just need to add it as a child of your application supervision tree as `{MyBroadway, []}`. +Once your Broadway module is defined, add it as a child of your application supervision tree as `{MyBroadway, []}`. ## Comparison to Flow diff --git a/guides/examples/amazon-sqs.md b/guides/examples/amazon-sqs.md index 67479f6..a5bbeeb 100644 --- a/guides/examples/amazon-sqs.md +++ b/guides/examples/amazon-sqs.md @@ -18,9 +18,9 @@ queues: Broadway can work seamlessly with both, Standard and FIFO queues. -## Getting Started +## Getting started -In order to use Broadway with SQS, we need to: +To use Broadway with SQS: 1. Create a SQS queue (or use an existing one) 1. Configure our Elixir project to use Broadway @@ -43,7 +43,7 @@ which is a Broadway SQS Connector provided by [Dashbit](https://dashbit.co/). ### Starting a new project -If you plan to start a new project, just run: +If you plan to start a new project, run: $ mix new my_app --sup @@ -66,12 +66,12 @@ Don't forget to check for the latest version of dependencies. ## Define the pipeline configuration -Broadway is a process-based behaviour and to define a Broadway -pipeline, we need to define three functions: `start_link/1`, +A Broadway pipeline is an implementation of a process-based behaviour and +is defined by three functions: `start_link/1`, `handle_message/3` and `handle_batch/4`. We will cover `start_link/1` in this section and the `handle_` callbacks in the next one. -Similar to other process-based behaviour, `start_link/1` simply +Similar to other process-based behaviour, `start_link/1` delegates to `Broadway.start_link/2`, which should define the producers, processors, and batchers in the Broadway pipeline. Assuming we want to consume messages from a queue called @@ -107,8 +107,8 @@ Assuming we want to consume messages from a queue called The above configuration also assumes that you have the AWS credentials set up in your environment, for instance, by having the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables set. If that's -not the case, you will need to pass that information to the client so it -can properly connect to the AWS servers. Here is how you can do it: +not the case, pass that information to the client so it +can properly connect to the AWS servers. Here's how to do it: ... producer: [ @@ -135,9 +135,8 @@ module docs as well as `Broadway.start_link/2`. ## Implement Broadway callbacks -In order to process incoming messages, we need to implement the -required callbacks. For the sake of simplicity, we're considering that -all messages received from the queue are just numbers: +Implement the required callbacks to process incoming messages. +In this example, all messages received from the queue are numbers: defmodule MyBroadway do use Broadway @@ -169,9 +168,9 @@ For more information, see `c:Broadway.handle_message/3` and ## Run the Broadway pipeline -To run your `Broadway` pipeline, you just need to add as a child in +To run your `Broadway` pipeline, add it as a child in a supervision tree. Most applications have a supervision tree defined -at `lib/my_app/application.ex`. You can add Broadway as a child to a +at `lib/my_app/application.ex`. Add Broadway as a child to a supervisor as follows: children = [ @@ -190,7 +189,7 @@ in the supervision tree. Some of the configuration options available for Broadway come already with a "reasonable" default value. However those values might not suit your requirements. Depending on the number of messages you get, how much processing -they need and how much IO work is going to take place, you might need completely +they need and how much IO work is going to take place, you need completely different values to optimize the flow of your pipeline. The `concurrency` option available for every set of producers, processors and batchers, among with `max_demand`, `batch_size`, and `batch_timeout` can give you a great deal @@ -202,7 +201,7 @@ See the notes on [`Producer concurrency`](https://hexdocs.pm/broadway/Broadway.h and [`Batcher concurrency`](https://hexdocs.pm/broadway/Broadway.html#module-batcher-concurrency) for details. -Here's an example on how you could tune them according to +Here's an example on how you tune them according to your needs. defmodule MyBroadway do @@ -236,5 +235,4 @@ your needs. In order to get a good set of configurations for your pipeline, it's important to respect the limitations of the servers you're running, as well as the limitations of the services you're providing/consuming -data to/from. Broadway comes with telemetry, so you can measure your -pipeline and help ensure your changes are effective. +data to/from. Measure your pipeline with [telemetry](https://hexdocs.pm/telemetry/readme.html) to ensure your changes are effective. (It comes standard.) diff --git a/guides/examples/apache-kafka.md b/guides/examples/apache-kafka.md index 97f9477..f41a918 100644 --- a/guides/examples/apache-kafka.md +++ b/guides/examples/apache-kafka.md @@ -2,13 +2,13 @@ Kafka is a distributed streaming platform that has three key capabilities: - * Publish and subscribe to streams of records - * Store streams of records in a fault-tolerant durable way - * Process streams of records as they occur + * Publish and subscribe to record streams + * Store record streams with fault-tolerant durability + * Process record streams in real-time -## Getting Started +## Getting started -In order to use Broadway with Kafka, we need to: +To use Broadway with Kafka: 1. Create a stream of records (or use an existing one) 1. Configure your Elixir project to use Broadway @@ -20,15 +20,13 @@ In order to use Broadway with Kafka, we need to: In case you don't have Kafka installed yet, please follow the instructions on Kafka's [Quickstart](https://kafka.apache.org/quickstart) for a clean installation. After -initializing Kafka, you can create a new stream by running: +initializing Kafka, create a new stream by running: $ kafka-topics --create --zookeeper localhost:2181 --partitions 3 --topic test ## Configure your Elixir project to use Broadway -This guide describes the steps necessary to integrate Broadway with Kafka using -[BroadwayKafka](https://github.com/dashbitco/broadway_kafka), -which is a Broadway Kafka Connector provided by [Dashbit](https://dashbit.co/). +This guide uses [BroadwayKafka](https://github.com/dashbitco/broadway_kafka) from Dashbit to integrate Broadway with Kafka. BroadwayKafka can subscribe to one or more topics and process streams of records using Kafka's [Consumer API](https://kafka.apache.org/documentation.html#consumerapi). @@ -125,9 +123,9 @@ module docs as well as `Broadway.start_link/2`. ## Implement Broadway callbacks -In order to process incoming messages, we need to implement the -required callbacks. For the sake of simplicity, we're considering that -all messages received from the topic are just numbers: +Implement callbacks to process incoming messages. +In this example, +all messages received from the topic are numbers: defmodule MyBroadway do use Broadway @@ -157,16 +155,12 @@ purpose. First, we update the message's data individually inside For more information, see `c:Broadway.handle_message/3` and `c:Broadway.handle_batch/4`. -> Note: Since Broadway v0.2, batching is optional. In case you don't need to -> group messages as batches for further processing/publishing, you can remove -> the `:batchers` configuration along with the `handle_batch/4` callback. +> Note: Broadway v0.2 makes batching optional. Remove the `:batchers` configuration along with the `c:handle_batch/4` callback if unneeded. ## Run the Broadway pipeline -To run your `Broadway` pipeline, you just need to add as a child in -a supervision tree. Most applications have a supervision tree defined -at `lib/my_app/application.ex`. You can add Broadway as a child to a -supervisor as follows: +Add your `Broadway` pipeline as a child in a supervision tree to run it. Most applications have a supervision tree defined at `lib/my_app/application.ex`. +Add the child process `{MyBroadway, []}` to a supervisor as follows: children = [ {MyBroadway, []} @@ -179,7 +173,7 @@ Also, if your Broadway has any dependency (for example, it needs to talk to the database), make sure that Broadway is listed *after* its dependencies in the supervision tree. -You can now test your pipeline by entering an `iex` session: +Test your pipeline by entering an `iex` session: $ iex -S mix @@ -193,7 +187,7 @@ under the hood to communicate with Kafka. ### Sending messages to Kafka -Finally, we can send some sample messages to Kafka using using `:brod` with the following snippet: +Use `:brod` to send sample messages to Kafka: topic = "test" client_id = :my_client @@ -207,7 +201,7 @@ Finally, we can send some sample messages to Kafka using using `:brod` with the :ok = :brod.produce_sync(client_id, topic, partition, _key="", "#{i}") end) -You should see the output showing the generated batches: +See the output showing the generated batches: Got batch: [ {"2", 4}, @@ -231,7 +225,7 @@ You should see the output showing the generated batches: Some of the configuration options available for Broadway come already with a "reasonable" default value. However, those values might not suit your requirements. Depending on the number of records you get, how much processing -they need and how much IO work is going to take place, you might need completely +they need and how much IO work is going to take place, you need completely different values to optimize the flow of your pipeline. The `concurrency` option available for every set of producers, processors and batchers, along with `batch_size` and `batch_timeout` can give you a great deal of flexibility. @@ -256,11 +250,11 @@ can still receive more assignments than planned. For instance, if another consum the server will reassign all its topic/partition to other available consumers, including any Broadway producer subscribed to the same topic. -There are other options that you may want to take a closer look when tuning your configuration. -The `:max_bytes` option, for instance, belongs to the `:fetch_config` group and defines the -maximum amount of data to be fetched at a time from a single partition. The default is +Other options require attention during configuration tuning. +The `:max_bytes` option (part of `:fetch_config`) defines the +maximum data fetched at a time from a single partition. The default is 1048576 (1 MiB). Setting greater values can improve throughput at the cost of more -memory consumption. For more information and other fetch options, please refer to the +memory consumption. For more fetch options, please refer to the "Fetch config options" in the official [BroadwayKafka](https://hexdocs.pm/broadway_kafka/) documentation. @@ -287,9 +281,4 @@ tuning `:offset_commit_interval_seconds` and `:offset_commit_on_ack`. ## Handling failed messages -`broadway_kafka` never stops the flow of the stream, i.e. it will **always ack** the messages -even when they fail. Unlike queue-based connectors, where you can mark a single message as failed. -In Kafka that's not possible due to its single offset per topic/partition ack strategy. If you -want to reprocess failed messages, you need to roll your own strategy. A possible way to do that -is to implement `handle_failed/2` and send failed messages to a separated stream or queue for -later processing. +`broadway_kafka` **always acknowledges** (yes, also failed) messages, so the stream flow is never stopped. Unlike queue-based connectors, Kafka’s single offset-per-topic/partition strategy prevents marking individual messages as failed. To reprocess failures, implement a custom strategy (e.g., using `handle_failed/2` to redirect failed messages to a separate stream or queue). diff --git a/guides/examples/custom-producers.md b/guides/examples/custom-producers.md index efda4b3..0387e9e 100644 --- a/guides/examples/custom-producers.md +++ b/guides/examples/custom-producers.md @@ -1,15 +1,13 @@ # Custom Producers -If you want to use Broadway but there is no existing Broadway producer -for the technology of your choice, you can integrate any existing GenStage -producer into the pipeline with relative ease. +If there is no existing Broadway producer for your technology of choice, integrate any existing GenStage producer into the pipeline. ## Example In general, producers must generate `%Broadway.Message{}` structs in order -to be processed by Broadway. In case you need to use an existing GenStage -producer and you don't want to change its original implementation, -you'll have to set the producer's `:transformer` option to translate the +to be processed by Broadway. To use an existing GenStage +producer without changing its original implementation, +set the producer's `:transformer` option to translate the generated events into Broadway messages. In the following example the producer is a regular GenStage, i.e., it @@ -32,7 +30,7 @@ produces plain events that cannot be processed by Broadway directly: end end -By using a transformer, you can tell Broadway to transform all events +By using a transformer, Broadway knows to transform all events generated by the producer into proper Broadway messages: defmodule MyBroadway do @@ -70,7 +68,7 @@ generated by the producer into proper Broadway messages: end end -Notice that you need to pass two options to the producer: +Pass two options to the producer: * `:module` - a tuple representing the GenStage producer as `{mod, arg}`. Where `mod` is module that implements the GenStage behaviour and `arg` diff --git a/guides/examples/google-cloud-pubsub.md b/guides/examples/google-cloud-pubsub.md index 2dbbc59..b5f9c33 100644 --- a/guides/examples/google-cloud-pubsub.md +++ b/guides/examples/google-cloud-pubsub.md @@ -2,7 +2,7 @@ Cloud Pub/Sub is a fully-managed real-time messaging service provided by Google. -## Getting Started +## Getting started In order to use Broadway with Cloud Pub/Sub you need to: @@ -13,19 +13,19 @@ In order to use Broadway with Cloud Pub/Sub you need to: 1. Run the Broadway pipeline 1. Tune the configuration (Optional) -If you are just getting familiar with Google Pub/Sub, refer to [the documentation](https://cloud.google.com/pubsub/docs/) +If you are getting familiar with Google Pub/Sub, refer to [the documentation](https://cloud.google.com/pubsub/docs/) to get started. Instead of testing against a live environment, you may also consider using the [emulator](https://cloud.google.com/pubsub/docs/emulator) to simulate integrating with Cloud Pub/Sub. -If you have an existing project, topic, subscription, and credentials, you can skip [step +Existing projects, topics, subscriptions, and credentials can skip [step 1](#setup-cloud-pub-sub-project) and jump to [Configure the project](#configure-the-project) section. ## Setup Cloud Pub/Sub project In this tutorial we'll use the [`gcloud`](https://cloud.google.com/sdk/gcloud/) command-line tool -to set everything up in Google Cloud. Alternatively, you can roughly follow this guide by using +to set everything up in Google Cloud. Alternatively, follow this [Cloud Console](https://console.cloud.google.com). To install `gcloud` follow the [documentation](https://cloud.google.com/sdk/gcloud/). If you are @@ -46,7 +46,7 @@ A new topic: $ gcloud pubsub topics create test-topic --project test-pubsub Created topic [projects/test-pubsub/topics/test-topic]. -> Note: If you run this command immediately after creating a new Google Cloud project, you may receive an error indicating that your project's organization policy is still being provisioned. Just wait a couple minutes and try again. +> Note: If an error indicates the organization policy is still provisioning after creating a new Google Cloud project, wait a couple minutes and try again. And a new subscription: @@ -54,7 +54,7 @@ And a new subscription: Created subscription [projects/test-pubsub/subscriptions/test-subscription]. We also need a [service account](https://cloud.google.com/iam/docs/service-accounts), an IAM -policy, as well as API credentials in order to programmatically work with the service. First, let's +policy, as well as API credentials to programmatically work with the service. First, let's create the service account: $ gcloud iam service-accounts create test-account --project test-pubsub @@ -79,7 +79,7 @@ This command generated a `credentials.json` file which will be useful later. Not pattern is `@.iam.gserviceaccount.com`. Run `gcloud iam service-accounts list --project test-pubsub` to see all service accounts associated with the given project. -Finally, we need to enable Pub/Sub for our project: +Finally, enable Pub/Sub for our project: $ gcloud services enable pubsub --project test-pubsub Operation "operations/xxx" finished successfully. @@ -91,7 +91,7 @@ which is a Broadway Cloud Pub/Sub Connector provided by [Dashbit](https://dashbi ### Starting a new project -If you plan to start a new project, just run: +If you plan to start a new project, run: $ mix new my_app --sup @@ -114,11 +114,12 @@ Don't forget to check for the latest version of dependencies. ## Define the pipeline configuration -Broadway is a process-based behaviour and to define a Broadway pipeline, we need to define three +Broadway is a process-based behaviour and a Broadway pipeline +is defined with three functions: `start_link/1`, `handle_message/3` and `handle_batch/4`. We will cover `start_link/1` in this section and the `handle_` callbacks in the next one. -Similar to other process-based behaviour, `start_link/1` simply delegates to +Similar to other process-based behaviour, `start_link/1` delegates to `Broadway.start_link/2`, which should define the producers, processors, and batchers in the Broadway pipeline. Assuming we want to consume messages from the `test-subscription`, the minimal configuration would be: @@ -164,8 +165,8 @@ For general information about setting up Broadway, see `Broadway` module docs as ## Implement Broadway callbacks -In order to process incoming messages, we need to implement the required callbacks. For the sake -of simplicity, we're considering that all messages received from the queue are strings and our +Implement the required callbacks to process incoming messages. +In this example, all messages received from the queue are strings and our processor calls `String.upcase/1` on them: defmodule MyBroadway do @@ -195,8 +196,8 @@ For more information, see `c:Broadway.handle_message/3` and `c:Broadway.handle_b ## Run the Broadway pipeline -To run your `Broadway` pipeline, you need to add it as a child in a supervision tree. Most -applications have a supervision tree defined at `lib/my_app/application.ex`. You can add Broadway +To run your `Broadway` pipeline, add it as a child in a supervision tree. Most +applications have a supervision tree defined at `lib/my_app/application.ex`. Add Broadway as a child to a supervisor as follows: children = [ @@ -205,7 +206,7 @@ as a child to a supervisor as follows: Supervisor.start_link(children, strategy: :one_for_one) -The final step is to configure credentials. You can set the following environment variable: +The final step is to configure credentials. Set the following environment variable: export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json @@ -216,7 +217,7 @@ Now the Broadway pipeline should be started when your application starts. Also, pipeline has any dependency (for example, it needs to talk to the database), make sure that it is listed *after* its dependencies in the supervision tree. -If you followed the previous section about setting the project with `gcloud`, you can now test the +In the previous section `gcloud` set up the project. Now test the the pipeline. In one terminal tab start the application: $ iex -S mix @@ -231,7 +232,7 @@ And in another tab, send a couple of test messages to Pub/Sub: messageIds: - '651427034966696' -Now, In the first tab, you should see output similar to: +In the first tab, see output similar to: ``` Got batch of finished jobs from processors, sending ACKs to Pub/Sub as a batch.: ["TEST 1", "TEST 2"] @@ -242,7 +243,7 @@ Got batch of finished jobs from processors, sending ACKs to Pub/Sub as a batch.: Some of the configuration options available for Broadway come already with a "reasonable" default value. However those values might not suit your requirements. Depending on the number of messages you get, how much processing -they need and how much IO work is going to take place, you might need completely +they need and how much IO work is going to take place, you need completely different values to optimize the flow of your pipeline. The `concurrency` option available for every set of producers, processors and batchers, among with `max_demand`, `batch_size`, and `batch_timeout` can give you a great deal @@ -254,7 +255,7 @@ See the notes on [`Producer concurrency`](https://hexdocs.pm/broadway/Broadway.h and [`Batcher concurrency`](https://hexdocs.pm/broadway/Broadway.html#module-batcher-concurrency) for details. -Here's an example on how you could tune them according to +Here's an example on how you tune them according to your needs. defmodule MyBroadway do @@ -288,5 +289,4 @@ your needs. In order to get a good set of configurations for your pipeline, it's important to respect the limitations of the servers you're running, as well as the limitations of the services you're providing/consuming -data to/from. Broadway comes with telemetry, so you can measure your -pipeline and help ensure your changes are effective. +data to/from. Measure your pipeline with [telemetry](https://hexdocs.pm/telemetry/readme.html) to ensure your changes are effective. (It comes standard.) diff --git a/guides/examples/introduction.md b/guides/examples/introduction.md index f0f677c..fd071b2 100644 --- a/guides/examples/introduction.md +++ b/guides/examples/introduction.md @@ -15,18 +15,21 @@ ## Official Producers -Currently we officially support four Broadway producers: +Currently we officially support four Broadway producers. + +Start with the guide link for your adapter: * Amazon SQS: [Source](https://github.com/dashbitco/broadway_sqs) - [Guide](amazon-sqs.md) * Apache Kafka: [Source](https://github.com/dashbitco/broadway_kafka) - [Guide](apache-kafka.md) * Google Cloud Pub/Sub: [Source](https://github.com/dashbitco/broadway_cloud_pub_sub) - [Guide](google-cloud-pubsub.md) * RabbitMQ: [Source](https://github.com/dashbitco/broadway_rabbitmq) - [Guide](rabbitmq.md) -The guides links above will help you get started with your adapter of choice. For API reference, you can check out the `Broadway` module. +For API reference, check out the `Broadway` module. ## Non-official (Off-Broadway) Producers -For those interested in rolling their own Broadway Producers (which we actively encourage!), we recommend using the `OffBroadway` namespace, mirroring the [Off-Broadway theaters](https://en.wikipedia.org/wiki/Off-Broadway). For example, if you want to publish your own integration with Amazon SQS, you can package it as `off_broadway_sqs`, which uses the `OffBroadway.SQS` namespace. +For those interested in rolling their own Broadway Producers (which we actively encourage!), we recommend using the `OffBroadway` namespace, mirroring the [Off-Broadway theaters](https://en.wikipedia.org/wiki/Off-Broadway). For example, package +your own integration with Amazon SQS as `off_broadway_sqs`, which uses the `OffBroadway.SQS` namespace. The following Off-Broadway libraries are available (feel free to send a PR adding your own in alphabetical order): diff --git a/guides/examples/rabbitmq.md b/guides/examples/rabbitmq.md index f9ee865..3de03c6 100644 --- a/guides/examples/rabbitmq.md +++ b/guides/examples/rabbitmq.md @@ -4,7 +4,7 @@ RabbitMQ is an open source message broker designed to be highly scalable and distributed. It supports multiple protocols including the Advanced Message Queuing Protocol (AMQP). -## Getting Started +## Getting started In order to use Broadway with RabbitMQ, we need to: @@ -15,12 +15,11 @@ In order to use Broadway with RabbitMQ, we need to: 1. [Run the Broadway pipeline](#run-the-broadway-pipeline) 1. [Tuning the configuration](#tuning-the-configuration) (Optional) -In case you want to work with an existing queue, you can skip [step 1](#create-a-queue) +To work with an existing queue, skip [step 1](#create-a-queue) and jump to [Configure the project](#configure-the-project). -> Note: `BroadwayRabbitMQ` does not automatically create any queue. If you -configure a pipeline with a non-existent queue, the producers will crash, -bringing down the pipeline. +> Note: `BroadwayRabbitMQ` does not automatically create queues. +> Without a queue the producers will crash, bringing down the pipeline. ## Create a queue @@ -30,12 +29,12 @@ further information. Also, make sure you have the [Management](https://www.rabbitmq.com/management.html) plugin enabled, which ships with the command line tool, `rabbitmqadmin`. -After successfully installing RabbitMQ, you can declare a new queue with the +After successfully installing RabbitMQ, declare a new queue with the following command: $ rabbitmqadmin declare queue name=my_queue durable=true -You can list all declared queues to see our the one we've just created: +List all declared queues to see the one we've created: $ rabbitmqctl list_queues Timeout: 60.0 seconds ... @@ -71,16 +70,15 @@ Don't forget to check for the latest version of dependencies. ## Define the pipeline configuration -Broadway is a process-based behaviour and to define a Broadway pipeline, -we need to define three functions: `start_link/1`, `handle_message/3` -and optionally `handle_batch/4`. We will cover `start_link/1` in this -section and the `handle_` callbacks in the next one. +Broadway is a process-based behaviour, and a Broadway pipeline +is defined by the `start_link/1` function, the `c:handle_message/3` +callback, and optionally, the `c:handle_batch/4` callback. We will cover `start_link/1` in this section and the `handle_` callbacks in the next one. -Similar to other process-based behaviours, `start_link/1` simply -delegates to `Broadway.start_link/2`, which should define the +Similar to other process-based behaviours, `start_link/1` +delegates to `Broadway.start_link/2`, which defines the producers, processors, and batchers in the Broadway pipeline. Assuming we want to consume messages from a queue called -`my_queue`, one possible configuration would be: +`my_queue` with the following configuration: defmodule MyBroadway do use Broadway @@ -118,7 +116,7 @@ Assuming we want to consume messages from a queue called end If you're consuming data from an existing broker that requires authorization, -you'll need to provide your credentials using the `connection` option: +provide your credentials using the `connection` option: ... producer: [ @@ -141,9 +139,9 @@ module docs as well as `Broadway.start_link/2`. ## Implement Broadway callbacks -In order to process incoming messages, we need to implement the +In order to process incoming messages, implement the required callbacks. For the sake of simplicity, we're considering that -all messages received from the queue are just numbers: +all messages received from the queue are numbers: defmodule MyBroadway do use Broadway @@ -173,17 +171,16 @@ purpose. First, we update the message's data individually inside For more information, see `c:Broadway.handle_message/3` and `c:Broadway.handle_batch/4`. -> Note: Since Broadway v0.2, batching is optional. In case you don't need to -> group messages as batches for further processing/publishing, you can remove -> the `:batchers` configuration along with the `handle_batch/4` callback. This -> is perfectly fine for RabbitMQ, where messages are acknowledged individually -> and never as a batch. +> Note: Since Broadway v0.2, batching is optional. Remove +> the `:batchers` configuration along with the `c:handle_batch/4` callback +> to send single messages for further processing/publishing. This +> works because RabbitMQ messages acknowledges messages individually. ## Run the Broadway pipeline -To run your `Broadway` pipeline, you just need to add as a child in +To run your `Broadway` pipeline, add it as a child in a supervision tree. Most applications have a supervision tree defined -at `lib/my_app/application.ex`. You can add Broadway as a child to a +at `lib/my_app/application.ex`. Add Broadway as a child to a supervisor as follows: children = [ @@ -197,11 +194,11 @@ Also, if your Broadway has any dependency (for example, it needs to talk to the database), make sure that Broadway is listed *after* its dependencies in the supervision tree. -You can now test your pipeline by entering an `iex` session: +Test your pipeline by entering an `iex` session: $ iex -S mix -If everything went fine, you should see lots of `info` log messages from the `amqp` +If everything went fine, you will see lots of `info` log messages from the `amqp` supervisors. If you think that's too verbose and want to do something about it, please take a look at the _"Log related to amqp supervisors are too verbose"_ subsection in the `amqp`'s [Troubleshooting](https://hexdocs.pm/amqp/readme.html#troubleshooting) @@ -244,7 +241,7 @@ You should see the output showing the generated batches: Some of the configuration options available for Broadway come already with a "reasonable" default value. However, those values might not suit your requirements. Depending on the number of messages you get, how much processing -they need and how much IO work is going to take place, you might need completely +they need and how much IO work is going to take place, you need completely different values to optimize the flow of your pipeline. The `concurrency` option available for every set of producers, processors and batchers, among with `max_demand`, `batch_size`, and `batch_timeout` can give you a great deal @@ -263,5 +260,4 @@ section of the `BroadwayRabbitMQ` documentation for details. In order to get a good set of configurations for your pipeline, it's important to respect the limitations of the servers you're running, as well as the limitations of the services you're providing/consuming -data to/from. Broadway comes with telemetry, so you can measure your -pipeline and help ensure your changes are effective. +data to/from. Measure your pipeline with [telemetry](https://hexdocs.pm/telemetry/readme.html) to ensure your changes are effective. (It comes standard.) diff --git a/lib/broadway.ex b/lib/broadway.ex index 7c573e1..a7667c6 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -38,7 +38,7 @@ defmodule Broadway do event was properly processed. * Custom failure handling - Broadway provides a `c:handle_failed/2` callback - where developers can outline custom code to handle errors. For example, + to outline custom code to handle errors. For example, if they want to move messages to another queue for further processing. * Dynamic batching - Broadway allows developers to batch messages based on @@ -48,9 +48,9 @@ defmodule Broadway do * Ordering and Partitioning - Broadway allows developers to partition messages across workers, guaranteeing messages within the same partition - are processed in order. For example, if you want to guarantee all events - tied to a given `user_id` are processed in order and not concurrently, - you can set the `:partition_by` option. See ["Ordering and partitioning"](#module-ordering-and-partitioning). + are processed in order. For example, set the `:partition_by` option to + guarantee all events tied to a given `user_id` are processed in order + and not concurrently. See ["Ordering and partitioning"](#module-ordering-and-partitioning). * Rate limiting - Broadway allows developers to rate limit all producers in a single node by a given number of messages in a time period, allowing @@ -63,14 +63,14 @@ defmodule Broadway do ## The Broadway behaviour - In order to use Broadway, you need to: + To use Broadway: - 1. Define your pipeline configuration - 2. Define a module implementing the Broadway behaviour + 1. define your pipeline configuration + 2. define a module implementing the Broadway behaviour ### Example - Broadway is a process-based behaviour, and you begin by + Broadway is a process-based behaviour, so begin by defining a module that invokes `use Broadway`. Processes defined by these modules will often be started by a supervisor, and so a `start_link/1` function is frequently @@ -104,12 +104,12 @@ defmodule Broadway do Supervisor.start_link(children, strategy: :one_for_one) - Adding your pipeline to your supervision tree in this way + Adding your pipeline to your supervision tree this way calls the default `child_spec/1` function that is generated - when `use Broadway` is invoked. If you would like to customize - the child spec passed to the supervisor, you can override the - `child_spec/1` function in your module or explicitly pass a - child spec to the supervisor when adding it to your supervision tree. + when `use Broadway` is invoked. Customize child specifications + by overriding the `child_spec/1` function in your module or + by explicitly passing the `:child_spec` option to the supervisor + when adding it to your supervision tree. The configuration above defines a pipeline with: @@ -127,7 +127,7 @@ defmodule Broadway do [processor_1] [processor_2] <- process each message ``` - After the pipeline is defined, you need to implement the `c:handle_message/3` + After the pipeline is defined, implement the `c:handle_message/3` callback which will be invoked by processors for each message. `c:handle_message/3` receives every message as a `Broadway.Message` @@ -198,8 +198,8 @@ defmodule Broadway do [batch_sqs_1] [batch_sqs_2] [batch_s3_1] <- process each batch ``` - Additionally, you have to define the `c:handle_batch/4` callback, - which batch processors invoke for each batch. You can then + Next, define the `c:handle_batch/4` callback, + which batch processors invoke with each batch. Then call `Broadway.Message.put_batcher/2` inside `c:handle_message/3` to control which batcher the message should go to. @@ -265,7 +265,7 @@ defmodule Broadway do to raise an error. For example, imagine you want to batch "special" messages and handle them differently - then all other messages. You can configure your pipeline like this: + then all other messages. Configure your pipeline like this: defmodule MyBroadway do use Broadway @@ -329,7 +329,7 @@ defmodule Broadway do `c:handle_failed/2` callback, that callback will be invoked with all the failed messages before they get acknowledged. - Note however, that `Broadway` does not provide any sort of retries + Note however, that `Broadway` does not provide retries out of the box. This is left completely as a responsibility of the producer. For instance, if you are using Amazon SQS, the default behaviour is to retry unacknowledged messages after a user-defined @@ -342,7 +342,7 @@ defmodule Broadway do Setting producer concurrency is a tradeoff between latency and internal queueing. - For efficiency, you should generally limit the amount of internal queueing. + For efficiency, limit the amount of internal queueing. Whenever additional messages are sitting in a busy processor's mailbox, they can't be delivered to another processor which may be available or become available first. @@ -386,8 +386,8 @@ defmodule Broadway do `test_message/3` and `test_batch/3` functions should be used to publish messages. - With `test_message/3`, you can push a message into the pipeline and receive - a process message when the pipeline acknowledges the data you have pushed + Call `test_message/3` to push a message into the pipeline and receive + back a process message when the pipeline acknowledges the message data has been processed. Let's see an example. Imagine the following `Broadway` module: @@ -424,7 +424,7 @@ defmodule Broadway do end end - Now in `config/test.exs` you could do: + Now in `config/test.exs` you do: config :my_app, producer_module: Broadway.DummyProducer, @@ -445,7 +445,7 @@ defmodule Broadway do {:ack, ^ref, successful_messages, failure_messages} - You can use the acknowledgment to guarantee the message has been + Use the acknowledgement to guarantee the message has been processed and therefore any side-effect from the pipeline should be visible. @@ -455,9 +455,9 @@ defmodule Broadway do and verify single messages, without imposing high timeouts to our test suites. - In case you want to test multiple messages, then you need to use + To test multiple messages, use `test_batch/3`. `test_batch/3` will respect the batching configuration, - which most likely means you need to increase your test timeouts: + which most likely means increasing your test timeouts: test "batch messages" do ref = Broadway.test_batch(MyBroadway, [1, 2, 3]) @@ -484,9 +484,8 @@ defmodule Broadway do ### Testing with Ecto - If you are using Ecto in your Broadway processors and you want - to run your tests concurrently, you need to tell Broadway to - use the Ecto SQL Sandbox during tests. This can be done in two + If your processors use Ecto, use the Ecto SQL Sandbox + during tests to run them concurrently. Tell Broadway in two steps. First, when you call `test_messages/3` in your tests, include @@ -519,7 +518,7 @@ defmodule Broadway do BroadwayEctoSandbox.attach(MyApp.Repo) - And now you should have concurrent Broadway tests that talk to the database. + And now you have concurrent Broadway tests that talk to the database. ## Ordering and partitioning @@ -530,8 +529,8 @@ defmodule Broadway do This can be done with the `:partition_by` option, which enforces that messages with a given property are always forwarded to the same stage. - In order to provide partitioning throughout the whole pipeline, just - set `:partition_by` at the root of your configuration: + Set `:partition_by` at the root of your configuration to provide + partitioning throughout the whole pipeline: defmodule MyBroadway do use Broadway @@ -572,8 +571,8 @@ defmodule Broadway do across partitions. So some partitions may be more overloaded than others, slowing down the whole pipeline. - In the example above, we have set the same partition for all - processors and batchers. You can also specify the `:partition_by` + In the example above, we set the same partition for all + processors and batchers. Alternatively, specify the `:partition_by` function for each "processor" and "batcher" individually. > #### Even partitions {: .warning} @@ -583,7 +582,7 @@ defmodule Broadway do > all order partitions. This implies two things: > > * Using `:partition_by` with a high level of concurrency can - > actually be detrimental to performance. For example, if + > be detrimental to performance. For example, if > concurrency is set to 100, you need all 100 processors to > make progress at the same time. > @@ -604,7 +603,7 @@ defmodule Broadway do > may be undesired. If your producer supports retrying, the > failed message may be retried later, out of its original order. > Those issues happen regardless of Broadway and solutions to said - > problems almost always need to be addressed outside of Broadway too. + > problems are almost always addressed outside of Broadway, too. ## Configuration storage @@ -852,9 +851,8 @@ defmodule Broadway do * `context` is the user defined data structure passed to `start_link/2`. This is the place to prepare and preload any information that will be used - by `c:handle_message/3`. For example, if you need to query the database, - instead of doing it once per message, you can do it on this callback as - a best-effort optimization. + by `c:handle_message/3`. For example, query the database on this callback as + a best-effort optimization instead of doing it once per message. The length of the list of messages received by this callback is often based on the `min_demand`/`max_demand` configuration in the processor but ultimately @@ -863,7 +861,7 @@ defmodule Broadway do `min_demand` value. Producers which are push-based, rather than pull-based, such as `BroadwayRabbitMQ.Producer`, are more likely to send messages as they arrive (which may skip batching altogether and always be single element lists). - In other words, this callback is simply a convenience for preparing messages, + In other words, this callback is a convenience for preparing messages, it does not guarantee the messages will be accumulated to a certain length. For effective batch processing, see `c:handle_batch/4`. @@ -891,14 +889,13 @@ defmodule Broadway do And it must return the (potentially) updated `Broadway.Message` struct. - This is the place to do any kind of processing with the incoming message, + This is the place to process the incoming message, e.g., transform the data into another data structure, call specific business logic to do calculations. Basically, any CPU bounded task that runs against a single message should be processed here. - In order to update the data after processing, use the - `Broadway.Message.update_data/2` function. This way the new message can be - properly forwarded and handled by the batcher: + Use the `Broadway.Message.update_data/2` function to update the data after processing, + so the new message can be properly forwarded and handled by the batcher: @impl true def handle_message(_, message, _) do @@ -906,9 +903,9 @@ defmodule Broadway do |> update_data(&do_calculation_and_returns_the_new_data/1) end - In case more than one batcher have been defined in the configuration, - you need to specify which of them the resulting message will be forwarded - to. You can do this by calling `put_batcher/2` and returning the new + In case more than one batcher has been defined in the configuration, + specify which will be forwarded the resulting message. + Do this by calling `put_batcher/2` and returning the new updated message: @impl true @@ -973,7 +970,7 @@ defmodule Broadway do * `context` is the user-defined data structure passed to `start_link/2`. This callback must return the same messages given to it, possibly updated. - For example, you could update the message data or use `Broadway.Message.configure_ack/2` + For example, you update the message data or use `Broadway.Message.configure_ack/2` in a centralized place to configure how to ack the message based on the failure reason. @@ -1061,9 +1058,8 @@ defmodule Broadway do ## Options - In order to set up how the pipeline created by Broadway should work, - you need to specify the blueprint of the pipeline. You can - do this by passing a set of options to `start_link/2`. + Specify a Broadway pipeline's blueprint to set up how it works + by passing a set of options to `start_link/2`. Each component of the pipeline has its own set of options. The Broadway options are: @@ -1270,7 +1266,7 @@ defmodule Broadway do for the Broadway pipeline `batch_size` to be filled or the `batch_timeout` to be triggered. - It returns a reference that can be used to identify the ack + It returns a reference used to identify the ack messages. See ["Testing"](#module-testing) section in module documentation @@ -1318,7 +1314,7 @@ defmodule Broadway do or if the messages in the batch take more time to process than `batch_timeout` then the caller will receive multiple messages. - It returns a reference that can be used to identify the ack + It returns a reference used to identify the ack messages. See ["Testing"](#module-testing) section in module documentation diff --git a/lib/broadway/acknowledger.ex b/lib/broadway/acknowledger.ex index 3a945da..5853c57 100644 --- a/lib/broadway/acknowledger.ex +++ b/lib/broadway/acknowledger.ex @@ -7,7 +7,7 @@ defmodule Broadway.Acknowledger do implement this behaviour and consider how the technology you're working with handles message acknowledgement. - The `c:ack/3` callback must be implemented in order to notify + The `c:ack/3` callback must be implemented to notify the origin of the data that a message can be safely removed after been successfully processed and published. In case of failed messages or messages without acknowledgement, depending @@ -22,20 +22,16 @@ defmodule Broadway.Acknowledger do @doc """ Invoked to acknowledge successful and failed messages. - * `ack_ref` is a term that uniquely identifies how messages - should be grouped and sent for acknowledgement. Imagine - you have a scenario where messages are coming from - different producers. Broadway will use this information - to correctly identify the acknowledger and pass it among - with the messages so you can properly communicate with - the source of the data for acknowledgement. `ack_ref` is + * `ack_ref` - a unique identifier used to determine how messages + are to be grouped and sent for acknowledgement. For example, + if messages come from different producers, Broadway uses its + value to identify the acknowledger and pass it along with messages + to coordinate acknowledgements with the data source . `ack_ref` is part of `t:Broadway.Message.acknowledger/0`. - * `successful` is the list of messages that were - successfully processed and published. + * `successful` - the list of messages both processed and published. - * `failed` is the list of messages that, for some reason, - could not be processed or published. + * `failed` - the list of messages that failed processing or publishing. """ @callback ack(ack_ref :: term, successful :: [Message.t()], failed :: [Message.t()]) :: diff --git a/lib/broadway/caller_acknowledger.ex b/lib/broadway/caller_acknowledger.ex index 2388a85..7727b68 100644 --- a/lib/broadway/caller_acknowledger.ex +++ b/lib/broadway/caller_acknowledger.ex @@ -2,9 +2,9 @@ defmodule Broadway.CallerAcknowledger do @moduledoc """ A simple acknowledger that sends a message back to a caller. - If you want to use this acknowledger in messages produced by your - `Broadway.Producer`, you can get its configuration by calling - the `init/0` function. For example, you can use it in + Use this acknowledger in messages produced by your + `Broadway.Producer` to get its configuration by calling + the `init/0` function. For example, use it in `Broadway.test_message/3`: some_ref = make_ref() @@ -15,9 +15,9 @@ defmodule Broadway.CallerAcknowledger do acknowledger: Broadway.CallerAcknowledger.init({self(), some_ref}, :ignored) ) - The first parameter is a tuple with the PID to receive the messages - and a unique identifier (usually a reference). Such unique identifier - is then included in the messages sent to the PID. The second parameter, + The first parameter is a 2-tuple, `{pid, ref}`. The PID is the process that + receives acknowledgment messages, and `ref` is a unique identifier (prefer `make_ref()`). + That unique identifier is then included in the messages sent to the PID. The second parameter, which is per message, is ignored. It sends a message in the format: diff --git a/lib/broadway/message.ex b/lib/broadway/message.ex index bf682d7..0be32f3 100644 --- a/lib/broadway/message.ex +++ b/lib/broadway/message.ex @@ -8,9 +8,9 @@ defmodule Broadway.Message do through the `c:Broadway.handle_message/3` callback or internally by one of the built-in stages of Broadway. - Instead of modifying the struct directly, you should use the functions + Instead of modifying the struct directly, use the functions provided by this module to manipulate messages. However, if you are implementing - a `Broadway.Producer` of your own, see `t:t/0` to see what fields you should set. + a `Broadway.Producer` of your own, see `t:t/0` for what fields to set. """ alias __MODULE__, as: Message @@ -36,11 +36,12 @@ defmodule Broadway.Message do @typedoc """ The Broadway message struct. - Most of these fields are manipulated by Broadway itself. You can - *read* the `:metadata` field, and you can use the functions in this - module to update most of the other fields. If you are implementing - your own producer, see the `Broadway.Producer` documentation - for more information on how to create and manipulate message structs. + `%Broadway.Message{}` is manipulated by the main Broadway process. + Use this module's functions to update message fields. The `:metadata` + is **read-only**. + + See the `Broadway.Producer` documentation + on how custom producers create and manipulate message structs. """ @type t :: %Message{ data: term, diff --git a/lib/broadway/noop_acknowledger.ex b/lib/broadway/noop_acknowledger.ex index 791c69e..65346ee 100644 --- a/lib/broadway/noop_acknowledger.ex +++ b/lib/broadway/noop_acknowledger.ex @@ -1,30 +1,41 @@ defmodule Broadway.NoopAcknowledger do @moduledoc """ - An acknowledger that does nothing. + An acknowledger that performs no operations. - If you want to use this acknowledger in messages produced by your - `Broadway.Producer`, you can get its configuration by calling - the `init/0` function. For example, you can use it in - `Broadway.test_message/3`: + Use this module to configure messages produced by `Broadway.Producer` + when no acknowledgment is needed. - Broadway.test_message(MyPipeline, "some data", acknowledger: Broadway.NoopAcknowledger.init()) + ## Example - Broadway sets this acknowledger automatically on messages that have been acked - via `Broadway.Message.ack_immediately/1`. + Use with `Broadway.test_message/3` for testing purposes: + + Broadway.test_message( + MyPipeline, + "some data", + acknowledger: Broadway.NoopAcknowledger.init() + ) + + This acknowledger is automatically set by Broadway for messages + acknowledged via `Broadway.Message.ack_immediately/1`. """ @behaviour Broadway.Acknowledger @doc """ - Returns the acknowledger metadata. + Returns a no-op acknowledger tuple. + + The tuple format `{module, ack_ref, data}` satisfies the + `Broadway.Acknowledger` contract without performing any operation. + + ## Example + + iex> Broadway.NoopAcknowledger.init() + {Broadway.NoopAcknowledger, nil, nil} """ @spec init() :: Broadway.Message.acknowledger() - def init do - {__MODULE__, _ack_ref = nil, _data = nil} - end + def init, do: {__MODULE__, nil, nil} @impl true - def ack(_ack_ref = nil, _successful, _failed) do - :ok - end + @doc false + def ack(_ack_ref, _successful, _failed), do: :ok end diff --git a/lib/broadway/options.ex b/lib/broadway/options.ex index 6b0e680..4d8cfed 100644 --- a/lib/broadway/options.ex +++ b/lib/broadway/options.ex @@ -146,9 +146,9 @@ defmodule Broadway.Options do > transformations, each with a different concern, then you must > have several processors. > - > However, that's not quite true. Separation of concerns is modeled + > However, that's not true. Separation of concerns is modeled > by defining several modules and functions, not processors. Processors - > are ultimately about moving data around and you should only do it + > are ultimately about moving data around - only do it > when necessary. Using processors for code organization purposes would > lead to inefficient pipelines. diff --git a/lib/broadway/producer.ex b/lib/broadway/producer.ex index c121808..1922621 100644 --- a/lib/broadway/producer.ex +++ b/lib/broadway/producer.ex @@ -18,46 +18,41 @@ defmodule Broadway.Producer do ## Injected Broadway configuration If `options` is a keyword list, Broadway injects a `:broadway` option - into the keyword list. This option contains the configuration for the + into that list. This option contains the configuration for the complete Broadway topology (see `Broadway.start_link/2`). For example, - you can use `options[:broadway][:name]` to uniquely identify the topology. + use `options[:broadway][:name]` to uniquely identify the topology. The `:broadway` configuration also has an `:index` key. This is the index of the producer in its supervision tree (starting - from `0`). This allows features such as having even producers - connect to some server while odd producers connect to another. - - If `options` is any other term, it is passed as is to the `c:GenStage.init/1` - callback. All other functions behave precisely as in `GenStage` - with the requirements that all emitted events must be `Broadway.Message` - structs. + from `0`). This enables patterns such as connecting even-indexed producers + to one server while odd-indexed producers connect to another. + If `options` is any other term besides a keyword list, it is passed directly to the `c:GenStage.init/1` + callback without modification. All other functions behave precisely as in `GenStage` + with the requirement that all emitted events must be `Broadway.Message` structs. ## Optional callbacks A `Broadway.Producer` can implement two optional Broadway callbacks, - `c:prepare_for_start/2` and `c:prepare_for_draining/1`, which are useful - for booting up and shutting down Broadway topologies respectively. + `c:prepare_for_start/2` and `c:prepare_for_draining/1`, which + boot up and shut down Broadway topologies, respectively. ## Producing Broadway messages - You should generally modify `Broadway.Message` structs by using the functions - in the `Broadway.Message` module. However, if you are implementing your - own producer, you **can manipulate** some of the struct's fields directly. + The pipeline modifies `Broadway.Message` structs using functions + from the `Broadway.Message` module, except for [custom producers](https://hexdocs.pm/broadway/custom-producers.html). - These fields are: + Manipulate these custom producer struct fields directly: - * `:data` (required) - the data of the message. Even though the function - `Broadway.Message.put_data/2` exists, when creating a `%Broadway.Message{}` - struct from scratch you will have to pass in the `:data` field directly. + * `:data` (required) - the message data. + Pass in the `:data` field directly. (Don't use `Broadway.Message.put_data/2`.) - * `:acknowledger` (required) - the acknowledger of the message, of type - `t:Broadway.Message.acknowledger/0`. + * `:acknowledger` (required) - the message acknowledger. + The acknowledger's type is `t:Broadway.Message.acknowledger/0`. - * `:metadata` (optional) - metadata about the message that your producer - can attach to the message. This is useful when you want to add some metadata - to messages, and document it for users to use in their pipelines. + * `:metadata` (optional) - the producer-attached message metadata. + Optionally document information for users to use in their pipelines. - For example, a producer could create a message by doing something like this: + For example, a custom producer creates this message: %Broadway.Message{ data: "some data here", @@ -67,27 +62,29 @@ defmodule Broadway.Producer do """ @doc """ - Invoked once by Broadway during `Broadway.start_link/2`. + Broadway invokes this callback once during `Broadway.start_link/2`. + + This callback manipulates the general topology options + and introduces any new child specifications that Broadway will + start **before** the producers' supervisor in its supervision tree. + Broadway's supervision tree operates as a `rest_for_one` supervisor + (see `Supervisor`), so if the children returned from this callback + crash, the entire pipeline shuts down until the supervisor restarts them. - The goal of this callback is to manipulate the general topology options, - if necessary at all, and introduce any new child specs that will be - started **before** the producers' supervisor in Broadway's supervision tree. - Broadway's supervision tree is a `rest_for_one` supervisor (see the documentation - for `Supervisor`), which means that if the children returned from this callback - crash they will bring down the rest of the pipeline before being restarted. + Broadway invokes this callback within the main Broadway process with the + following parameters: - This callback is guaranteed to be invoked inside the Broadway main process. + * `module` - the Broadway module passed as the first argument to `Broadway.start_link/2`. - `module` is the Broadway module passed as the first argument to - `Broadway.start_link/2`. `options` is all of Broadway topology options passed - as the second argument to `Broadway.start_link/2`. + * `options` - a keyword list of Broadway topology options passed + as the second argument to `Broadway.start_link/2`. - The return value of this callback is a tuple `{child_specs, options}`. `child_specs` - is the list of child specs to be started under Broadway's supervision tree. - `updated_options` is a potentially-updated list of Broadway options - that will be used instead of the ones passed to `Broadway.start_link/2`. This can be - used to modify the characteristics of the Broadway topology to accommodate - the children started here. + The callback returns a tuple `{child_specs, updated_options}`. `child_specs` + is a list of child specifications from the children that Broadway will start under its supervision tree. + `updated_options` contains potentially-modified Broadway options + that replace those initially passed to `Broadway.start_link/2`. This + allows adjustments to the characteristics of the Broadway topology to accommodate + the newly introduced children. ## Examples @@ -114,15 +111,15 @@ defmodule Broadway.Producer do when child_spec: :supervisor.child_spec() | {module, any} | module @doc """ - Invoked by the terminator right before Broadway starts draining in-flight + The terminator invokes this callback right before Broadway starts draining in-flight messages during shutdown. - This callback should be implemented by producers that need to do additional + Implement this callback for producers that do additional work before shutting down. That includes active producers like RabbitMQ that - must ask the data provider to stop sending messages. It will be invoked for + must ask the data provider to stop sending messages. Broadway will invoke this for each producer stage. - `state` is the current state of the producer. + * `state` - the current state of the producer. """ @callback prepare_for_draining(state :: any) :: {:noreply, [event], new_state}