Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
ring/ring-core {:mvn/version "1.11.0"}
hiccup/hiccup {:mvn/version "2.0.0-RC2"}
bidi/bidi {:mvn/version "2.1.6"}
it.burning/cron-expression-descriptor {:mvn/version "1.2.10"}}
it.burning/cron-expression-descriptor {:mvn/version "1.2.10"}
com.cognitect.aws/api {:mvn/version "0.8.710-beta01"}
com.cognitect.aws/endpoints {:mvn/version "1.1.12.772"}
com.cognitect.aws/sqs {:mvn/version "869.2.1687.0"}}
:paths ["src" "resources/public"]

:aliases {:test {:extra-paths ["test"]
Expand Down
82 changes: 82 additions & 0 deletions src/goose/brokers/sqs/broker.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
(ns goose.brokers.sqs.broker
(:require
[goose.broker :as b]
[goose.brokers.sqs.worker :as sqs-worker]
[goose.brokers.sqs.sqs-requests :as sqs-request]
[cognitect.aws.client.api :as aws]
[goose.utils :as u]))

(defprotocol Close
"Closes connection to SQS Message Broker."
(close [this]))

(defrecord SQS [client opts]
b/Broker
;; SQS supports nothing more than:
;; - Enqueue a job
;; - Start a worker process
;; - Enqueued jobs API
;; - Batch jobs (Will implement later)

(enqueue
[this job]
(let [{:keys [client]} this
{:keys [queue-url]} opts]

(sqs-request/enqueue client queue-url (u/encode-to-str job))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why encode-to-str instead of encode?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First the reason to encode was that the json/write-str used in sqs_requests.clj ignored the namespaces
So a function like goose.core/process-job would turn to just process-job(It took hours to figure this out when I was implementing it)

So encode-to-str will convert this to a simple string, that can be serialized and deserialized into json which is more readable imo


(start-worker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For background processing, clients might want to write to different queues, and a worker could process jobs off different queues. Need to design the enqueue & worker API keeping this in mind.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

different queues as in you mean different SQS services(in same account) and different queues within each service. Or you just mean we will allow Only one AWS account with a SQS service where they can connect to multiple queues?

Or you mean support for different accounts of AWS also?

[this worker-opts]
(let [{:keys [client]} this
{:keys [queue-url]} opts]
(sqs-worker/start client queue-url worker-opts)))

Close
(close [_]
(println "SQS client does not require explicit closure.")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close is not required to be implemented as part of Broker protocol if not needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay!

Just a follow-up question - RMQ doesn't implements some methods from broker protocol. For instance the cron-job function. But still we can call it through perform-every of client. Though it ends up throwing a run-time error, Is there a way to let the developer know that they cannot do this(Just so their IDE/LSP can say them not to)(I know it is already mentioned in docs)


(def default-opts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default-opts could point to an instance of SQS hosted locally for development purposes, for instance localstack, vsouza or roribio16's SQS docker images.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for letting me know that this existed! 😄

"Default Config is not allowed like redis or rmq
### Keys
`:region`: AWS Region
Example: \"ap-south-1\" , \"us-east-1\"
`:queue-url`: The Queue URL provided by the AWS SQS
Example: \"https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue\""
{:region nil
:queue-url nil})

(defn- create-sqs
"Helper function that creates an SQS broker implementation.
It abstracts the common logic for both producer and consumer."

[opts]
(let [client (aws/client {:api :sqs :region (:region opts)})]
(->SQS client opts)))

(defn new-producer
"Creates an SQS broker implementation for producer (client).
### Args
`opts` : Map of AWS client options.
### Usage
```clojure
(new-producer {:region \"us-east-1\" :queue-url \"https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue\"})
```"
[opts]
(create-sqs opts))

(defn new-consumer
"Creates an SQS broker implementation for worker (consumer).
### Args
`opts` : Map of AWS client options.
### Usage
```clojure
(new-consumer {:region \"us-east-1\" :queue-url \"https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue\"})
```"
[opts]
(create-sqs opts))
12 changes: 12 additions & 0 deletions src/goose/brokers/sqs/sqs_requests.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(ns goose.brokers.sqs.sqs-requests
(:require
[cognitect.aws.client.api :as aws]
[clojure.data.json :as json]))

(defn enqueue
"Enqueues a job for immediate execution."
[client queue-url job]
(aws/invoke client {:op :SendMessage
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an underlying connection pool that is utilized by aws/invoke? Could we have access to this pool and pass in explicitly as a dependency?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No as of now it does not expose a direct connection pool. We can customize it though https://github.com/cognitect-labs/aws-api?tab=readme-ov-file#overriding-the-http-client

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cognitect API uses a shared HTTP client internally to cache connections and pool outgoing requests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes it easier then

:request {:QueueUrl queue-url
:MessageBody (json/write-str job)
:DelaySeconds 0}}))
37 changes: 37 additions & 0 deletions src/goose/brokers/sqs/worker.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
(ns goose.brokers.sqs.worker
(:require
[cognitect.aws.client.api :as aws]
[clojure.core.async :as async]
[goose.utils :as u]
[goose.consumer :as c]))

(defn start
"Polls messages from an SQS queue, executes each message as a job, and deletes it upon success.
Args:
`client` : AWS SQS client.
`queue-url` : URL of the SQS queue.
`opts` : Additional options required for job execution."
[client queue-url opts]
(let [continue? (atom true)]
(async/go-loop []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goose uses claypoole instead of async for concurrent execution.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha!

(when @continue?
(let [response (aws/invoke client {:op :ReceiveMessage
:request {:QueueUrl queue-url
:MaxNumberOfMessages 1
:WaitTimeSeconds 10}})]
(when-let [messages (:Messages response)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dequeuing a job, holding a lock over the job to avoid double executions, executing a job, enqueuing for retry/death upon failure, and deletion from queue post processing are very complex operations. This section needs a lot of re-work keeping all these things in mind.

Checkout redis.worker/start and rmq.worker/start for reference.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright will look into it

(doseq [message messages]

(let [{:keys [Body ReceiptHandle]} message
job (u/decode-from-str Body)]
(try
(c/execute-job opts job)
(aws/invoke client {:op :DeleteMessage
:request {:QueueUrl queue-url
:ReceiptHandle ReceiptHandle}})
(catch Exception e
(println "Error executing job:" e))))))
(recur))))
(fn stop-polling []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker needs to return an implementation of goose.worker/stop for client to stop it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay!

(reset! continue? false))))