From 368b2aaa6593e0a0aab24b739c761243f6b8b581 Mon Sep 17 00:00:00 2001 From: Vasile Gorcinschi Date: Wed, 26 Nov 2025 12:36:44 -0500 Subject: [PATCH 1/4] feat: Added ElastiCache (Valkey) message queue template Demonstrates Streams with consumer groups for message queue functionality in Vercel Functions. Includes: - Contact form message producer (POST /api/messages) - Consumer group message processor with XREADGROUP and XAUTOCLAIM (GET /api/messages) - Message acknowledgement endpoint (DELETE /api/messages) - UI components for form submission and message processing - valkey-glide client integration with proper error handling --- .../message-queue-elasticache/.eslintrc.json | 4 + .../message-queue-elasticache/.gitignore | 42 + solutions/message-queue-elasticache/README.md | 242 ++ .../app/api/messages/route.ts | 314 ++ .../message-queue-elasticache/app/layout.tsx | 18 + .../message-queue-elasticache/app/page.tsx | 186 + .../app/process/page.tsx | 298 ++ .../images/empty-mailbox.png | Bin 0 -> 219505 bytes .../images/message-submission.png | Bin 0 -> 233898 bytes .../images/message-view.png | Bin 0 -> 265202 bytes .../images/next-message.png | Bin 0 -> 217138 bytes .../message-queue-elasticache/next.config.js | 17 + .../message-queue-elasticache/package.json | 30 + .../message-queue-elasticache/pnpm-lock.yaml | 3769 +++++++++++++++++ .../postcss.config.js | 8 + .../public/favicon.ico | Bin 0 -> 15086 bytes .../tailwind.config.js | 9 + .../message-queue-elasticache/tsconfig.json | 21 + .../message-queue-elasticache/turbo.json | 9 + .../message-queue-elasticache/vercel.json | 4 + 20 files changed, 4971 insertions(+) create mode 100644 solutions/message-queue-elasticache/.eslintrc.json create mode 100644 solutions/message-queue-elasticache/.gitignore create mode 100644 solutions/message-queue-elasticache/README.md create mode 100644 solutions/message-queue-elasticache/app/api/messages/route.ts create mode 100644 solutions/message-queue-elasticache/app/layout.tsx create mode 100644 solutions/message-queue-elasticache/app/page.tsx create mode 100644 solutions/message-queue-elasticache/app/process/page.tsx create mode 100644 solutions/message-queue-elasticache/images/empty-mailbox.png create mode 100644 solutions/message-queue-elasticache/images/message-submission.png create mode 100644 solutions/message-queue-elasticache/images/message-view.png create mode 100644 solutions/message-queue-elasticache/images/next-message.png create mode 100644 solutions/message-queue-elasticache/next.config.js create mode 100644 solutions/message-queue-elasticache/package.json create mode 100644 solutions/message-queue-elasticache/pnpm-lock.yaml create mode 100644 solutions/message-queue-elasticache/postcss.config.js create mode 100644 solutions/message-queue-elasticache/public/favicon.ico create mode 100644 solutions/message-queue-elasticache/tailwind.config.js create mode 100644 solutions/message-queue-elasticache/tsconfig.json create mode 100644 solutions/message-queue-elasticache/turbo.json create mode 100644 solutions/message-queue-elasticache/vercel.json diff --git a/solutions/message-queue-elasticache/.eslintrc.json b/solutions/message-queue-elasticache/.eslintrc.json new file mode 100644 index 0000000000..a2569c2c7c --- /dev/null +++ b/solutions/message-queue-elasticache/.eslintrc.json @@ -0,0 +1,4 @@ +{ + "root": true, + "extends": "next/core-web-vitals" +} diff --git a/solutions/message-queue-elasticache/.gitignore b/solutions/message-queue-elasticache/.gitignore new file mode 100644 index 0000000000..26fd87c7cb --- /dev/null +++ b/solutions/message-queue-elasticache/.gitignore @@ -0,0 +1,42 @@ +# See https://help.github.com/articles/ignoring-files/ for more about ignoring files. + +# Dependencies +/node_modules +/.pnp +.pnp.js + +# Testing +/coverage + +# Next.js +/.next/ +/out/ +next-env.d.ts + +# Production +build +dist + +# Misc +.DS_Store +*.pem + +# Debug +npm-debug.log* +yarn-debug.log* +yarn-error.log* + +# Local ENV files +.env.local +.env.development.local +.env.test.local +.env.production.local + +# Vercel +.vercel + +# Turborepo +.turbo + +# typescript +*.tsbuildinfo \ No newline at end of file diff --git a/solutions/message-queue-elasticache/README.md b/solutions/message-queue-elasticache/README.md new file mode 100644 index 0000000000..d6cffde842 --- /dev/null +++ b/solutions/message-queue-elasticache/README.md @@ -0,0 +1,242 @@ +--- +name: Message Queue with AWS ElastiCache and Next.js +slug: message-queue-elasticache +description: Learn to use AWS ElastiCache (Valkey) with Next.js API Routes for reliable message queue processing using streams. +framework: Next.js +deployUrl: https://vercel.com/new/clone?repository-url=https://github.com/vercel/examples/tree/main/solutions/message-queue-elasticache&project-name=message-queue-elasticache&repository-name=message-queue-elasticache&env=VALKEY_ENDPOINT&envDescription=Valkey%20endpoint%20URL +--- + +# Next.js + AWS ElastiCache Message Queue + +This is an example of a Next.js application using AWS ElastiCache (Valkey) for implementing a reliable message queue with streams. The template demonstrates a contact form processor where messages are queued, consumed, and acknowledged using Valkey's streaming capabilities. + +## How to Use + +This template demonstrates the code pattern for implementing message queues with Valkey streams. It's designed to work with AWS ElastiCache in production environments. + +### Local Development + +Execute [`create-next-app`](https://github.com/vercel/next.js/tree/canary/packages/create-next-app) with [pnpm](https://pnpm.io/installation) to bootstrap the example: + +```bash +pnpm create next-app --example https://github.com/vercel/examples/tree/main/solutions/message-queue-elasticache +``` + +**Run Valkey locally:** + +Using Docker: + +```bash +docker run -d -p 6379:6379 valkey/valkey-bundle:latest +``` + +Or install Valkey directly following the [official installation guide](https://valkey.io/download/). + +**Configure environment:** + +Create an `.env.local` file: + +```bash +VALKEY_ENDPOINT=localhost:6379 +``` + +**Start the development server:** + +```bash +pnpm dev +``` + +Visit to see the application. + +### Production Deployment with AWS ElastiCache + +AWS ElastiCache clusters run within a VPC (private network), which requires network connectivity setup for production deployments on Vercel. + +#### Networking Requirements + +**For Vercel Enterprise customers**, connectivity to AWS ElastiCache is available through [Vercel Secure Compute](https://vercel.com/docs/connectivity/secure-compute), which enables private network access between Vercel Functions and AWS VPC resources. + +**High-level setup steps:** + +**AWS Side:** + +1. Create an ElastiCache for Valkey cluster (version 7.0+) in your AWS VPC +2. Configure security groups to allow traffic from Vercel's CIDR block +3. Set up VPC peering or AWS PrivateLink based on your architecture +4. Note the cluster endpoint URL + +**Vercel Side:** + +1. Contact Vercel to enable Secure Compute for your Enterprise account +2. Coordinate with Vercel to receive your dedicated CIDR block +3. Add the ElastiCache endpoint to your project environment variables: + + ```bash + VALKEY_ENDPOINT=your-cluster.cache.amazonaws.com:6379 + ``` + +4. Deploy your application + +[![Deploy with Vercel](https://vercel.com/button)](https://vercel.com/new/clone?repository-url=https://github.com/vercel/examples/tree/main/solutions/message-queue-elasticache&project-name=message-queue-elasticache&repository-name=message-queue-elasticache&env=VALKEY_ENDPOINT&envDescription=Valkey%20endpoint%20URL) + +For detailed networking configuration, refer to the [Vercel Secure Compute documentation](https://vercel.com/docs/connectivity/secure-compute). + +## How It Works + +This template demonstrates a reliable serverless message queue workflow: + +1. **Producer**: A visitor submits a contact form, and the message is immediately written to a Valkey stream +2. **Consumer**: A reviewer opens the processing view, which reads the next unhandled message from the consumer group +3. **Acknowledgment**: When the reviewer confirms, the app acknowledges the message and removes it from the pending list + +**Key Features:** + +- Single consumer group prevents message duplication +- Message acknowledgment ensures reliable processing +- Refreshing the page won't cause duplicate processing +- Demonstrates how ElastiCache Streams support reliable serverless workflows + +## API Endpoints + +The application provides a single API route (`/api/messages`) with three HTTP methods demonstrating the message queue pattern: + +### Message Operations + +- `POST /api/messages` - Add a new message to the queue (contact form submission) +- `GET /api/messages` - Read the next unprocessed message from the consumer group +- `DELETE /api/messages?messageId=` - Acknowledge and remove a message from the pending list + +## Testing + +**Important Notes:** + +- Consumer groups track which messages have been delivered. Once a message is consumed (via GET), it moves to the Pending Entries List (PEL) and won't appear in subsequent GET requests until acknowledged. +- The `streamMessageId` returned by GET is Valkey's unique stream entry ID and **must be used** for the DELETE operation. +- Always complete the full flow: POST → GET → DELETE. + +### Complete Message Flow Example + +**1. Produce Message (Add to Queue)** + +```bash +curl -X POST http://localhost:3000/api/messages \ + -H "Content-Type: application/json" \ + -d '{ + "name": "Test User", + "email": "test@example.com", + "message": "Hello from local dev!" + }' +``` + +Response: + +```json +{ + "streamMessageId": "1764009314892-0", + "timestamp": "2024-11-24T18:35:14.890Z" +} +``` + +**2. Consume Message (Read from Queue)** + +```bash +curl http://localhost:3000/api/messages +``` + +Response with message: + +```json +{ + "message": { + "streamMessageId": "1764009314892-0", + "name": "Test User", + "email": "test@example.com", + "message": "Hello from local dev!", + "timestamp": "2024-11-24T18:35:14.890Z", + "claimed": true + } +} +``` + +Response when queue is empty: + +```json +{ "message": null } +``` + +**Note**: The `claimed` field indicates whether this message was recovered from the Pending Entries List (a previously delivered but unacknowledged message). Messages idle for more than 60 seconds are automatically reclaimed. + +**3. Acknowledge Message (Mark as Processed)** + +**Critical**: Use the `streamMessageId` from step 2 for the DELETE operation: + +```bash +curl -X DELETE "http://localhost:3000/api/messages?messageId=1764009314892-0" +``` + +Response: + +```json +{ "success": true } +``` + +### User Interface Flow + +This template consists of two UI views that demonstrate the complete message queue workflow: + +#### Submitting Messages (`/`) + +The home page features a contact form where visitors can submit their name, email, and message. Upon submission, the message is immediately added to the Valkey stream and a confirmation is displayed. + +![Message Submission Form](./images/message-submission.png) + +#### Processing Messages (``) + +The processing view allows reviewers to consume and acknowledge messages from the queue. The page loads the messages on page load and displays: + +- Message details (name, email, message content) +- Timestamp when the message was submitted +- A warning banner if the message was recovered from the pending entries list + +![Displayed Message](./images/message-view.png) + +Clicking **Acknowledge** confirms the message and removes it from the queue. A success +message appears with a **Next Message** button to load the next message in the queue. + +![Message Acknowledgement](./images/next-message.png) + +When the queue is empty, a "No message to process" indicator appears. + +![Empty Mailbox](./images/empty-mailbox.png) + +### Troubleshooting + +**Message Recovery**: If you GET a message but don't DELETE (acknowledge) it, the message stays in the Pending Entries List. After 60 seconds of idle time, subsequent GET requests will **automatically reclaim** that message (indicated by `"claimed": true` in the response). This is a reliability feature that handles consumer failures. + +**For clean testing**, if you want to reset and start fresh: + +```bash +# Access your Valkey instance +docker exec -it valkey-cli + +# Delete the entire stream (removes consumer group too) +DEL contact-messages + +# Or just delete the consumer group +XGROUP DESTROY contact-messages contact-processors + +# The consumer group will be recreated automatically on next GET +``` + +**Checking Pending Messages**: To see what's currently in the Pending Entries List: + +```bash +# Access Valkey +docker exec -it valkey-cli + +# View pending messages +XPENDING contact-messages contact-processors + +# View all messages in the stream +XRANGE contact-messages - + +``` diff --git a/solutions/message-queue-elasticache/app/api/messages/route.ts b/solutions/message-queue-elasticache/app/api/messages/route.ts new file mode 100644 index 0000000000..aa9d228070 --- /dev/null +++ b/solutions/message-queue-elasticache/app/api/messages/route.ts @@ -0,0 +1,314 @@ +import { NextResponse } from 'next/server' +import { GlideClient, GlideClientConfiguration } from '@valkey/valkey-glide' +import { randomUUID } from 'crypto' +import { stringify } from 'querystring' + +// Force Node.js runtime for native modules +export const runtime = 'nodejs' + +const STREAM_NAME = 'contact-messages' +const CONSUMER_GROUP = 'contact-processors' + +let client: GlideClient | null = null + +/** + * Get or initialize Valkey client + */ +async function getClient(): Promise { + if (client) { + return client + } + + const endpoint = process.env.VALKEY_ENDPOINT + if (!endpoint) { + throw new Error('VALKEY_ENDPOINT environment variable is not set') + } + + const [host, portStr] = endpoint.split(':') + const port = parseInt(portStr, 10) + + if (!host || isNaN(port)) { + throw new Error('VALKEY_ENDPOINT must be in format host:port') + } + + const config: GlideClientConfiguration = { + addresses: [{ host, port }], + } + + client = await GlideClient.createClient(config) + return client +} + +async function ensureConsumerGroup(client: GlideClient): Promise { + try { + /** + * Try to create the consumer group + * XGROUP CREATE stream group id [MKSTREAM] + * See also {@link https://valkey.io/commands/xgroup-create/} + */ + await client.customCommand([ + 'XGROUP', + 'CREATE', + STREAM_NAME, + CONSUMER_GROUP, + '0', + 'MKSTREAM', + ]) + } catch (error: any) { + // Ignore if group already exists + if (!error?.message?.includes('BUSYGROUP')) { + throw error + } + } +} + +function handleError(error: unknown, defaultMessage: string) { + console.error('API Error:', error) + const message = + error instanceof Error + ? `${defaultMessage}. ${error.message}` + : defaultMessage + return NextResponse.json({ error: message }, { status: 500 }) +} + +type ContactDataForm = { name: string; email: string; message: string } + +type ValidationSuccess = { + valid: true + data: ContactDataForm +} + +type ValidationError = { + valid: false + error: string +} + +type ValidationResult = ValidationSuccess | ValidationError + +function validateContactForm(data: any): ValidationResult { + if (!data.name || typeof data.name !== 'string') { + return { valid: false, error: 'Name is required and must be a string' } + } + if (!data.email || typeof data.email !== 'string') { + return { valid: false, error: 'Email is required and must be a string' } + } + if (!data.message || typeof data.message !== 'string') { + return { valid: false, error: 'Message is required and must be a string' } + } + + return { + valid: true, + data: { + name: data.name.trim(), + email: data.email.trim(), + message: data.message.trim(), + }, + } +} + +/** + * POST - Produce a message + * Adds a contact form submission to the Valkey stream + */ +export async function POST(request: Request) { + try { + const body = await request.json() + const validation = validateContactForm(body) + + if (!validation.valid) { + return NextResponse.json({ error: validation.error }, { status: 400 }) + } + + const { name, email, message } = validation.data + + const client = await getClient() + const timestamp = new Date().toISOString() + + /** + * Append entry (field, value pairs) to the specified stream: {@link https://valkey.io/commands/xadd/} + */ + const streamMessageId = await client.customCommand([ + 'XADD', + STREAM_NAME, + '*', + 'name', + name, + 'email', + email, + 'message', + message, + 'timestamp', + timestamp, + ]) + + return NextResponse.json( + { + streamMessageId, + timestamp, + }, + { status: 201 } + ) + } catch (error) { + return handleError(error, 'Failed to produce message') + } +} + +/** + * GET - Consume a message + * Reads the next unprocessed message from the consumer group. + * First tries to claim any pending messages that have been idle > 60 seconds, + * then falls back to reading new messages. + */ +export async function GET() { + try { + const client = await getClient() + await ensureConsumerGroup(client) + + const consumerName = `consumer-${randomUUID()}` + + /** + * XAUTOCLAIM automatically reclaims messages that + * have not been acknowledged, 60 seconds paramter tells + * the backend to reclaim aby messages that have not + * been consumed for more than this time. + * See more {@link https://valkey.io/commands/xautoclaim/} + */ + const claimResponse = await client.customCommand([ + 'XAUTOCLAIM', + STREAM_NAME, + CONSUMER_GROUP, + consumerName, + '60000', + '0-0', + 'COUNT', + '1', + ]) + + // XAUTOCLAIM returns: [next_id, [messages], deleted_ids] + // valkey-glide wraps messages as objects with 'key' and 'value' properties + if ( + Array.isArray(claimResponse) && + claimResponse.length >= 2 && + Array.isArray(claimResponse[1]) && + claimResponse[1].length > 0 + ) { + const messageObj = claimResponse[1][0] as { + key: string + value: Array<[string, string]> + } + + const streamMessageId = messageObj.key + const fieldsArray = messageObj.value + + // Convert array of pairs to Record + const messageData: Record = {} + for (const [field, value] of fieldsArray) { + messageData[field] = value + } + + return NextResponse.json( + { + message: { + streamMessageId, + name: messageData.name, + email: messageData.email, + message: messageData.message, + timestamp: messageData.timestamp, + claimed: true, // Indicate this was a claimed message + }, + }, + { status: 200 } + ) + } + + /** + * No pending messages to claim: + * Dequeue latest message from the stream + * See also {@link https://valkey.io/commands/xreadgroup/} + * and {@link https://valkey.io/commands/xread/} + */ + const response = await client.customCommand([ + 'XREADGROUP', + 'GROUP', + CONSUMER_GROUP, + consumerName, + 'COUNT', + '1', + 'STREAMS', + STREAM_NAME, + '>', + ]) + + // Response format: [[stream_name, [[message_id, [field1, value1, field2, value2, ...]]]]] + if (!response || !Array.isArray(response) || response.length === 0) { + return NextResponse.json({ message: null }, { status: 200 }) + } + + // getting the head, since XREADGROUP supports reading from multiple streams at the same time + const streamData = response[0] as { key: string; value: any[] } + const messages = streamData.value + + if (!messages || messages.length === 0) { + return NextResponse.json({ message: null }, { status: 200 }) + } + + const messageObj = messages[0] as { + key: string + value: Array<[string, string]> + } + + const streamMessageId = messageObj.key + const fieldsArray = messageObj.value + + // Convert array of pairs to Record + const messageData: Record = {} + for (const [field, value] of fieldsArray) { + messageData[field] = value + } + + return NextResponse.json( + { + message: { + streamMessageId, + name: messageData.name, + email: messageData.email, + message: messageData.message, + timestamp: messageData.timestamp, + }, + }, + { status: 200 } + ) + } catch (error) { + return handleError(error, 'Failed to consume message') + } +} + +/** + * DELETE - Acknowledge a message + * Marks a message as processed and removes it from the pending list + */ +export async function DELETE(request: Request) { + try { + const { searchParams } = new URL(request.url) + const messageId = searchParams.get('messageId') + + if (!messageId) { + return NextResponse.json( + { error: 'messageId query parameter is required' }, + { status: 400 } + ) + } + + const client = await getClient() + + /** + * XACK removes one or more messages from the pending list of a stream + * See also {@link https://valkey.io/commands/xack/ message_id} + */ + await client.customCommand(['XACK', STREAM_NAME, CONSUMER_GROUP, messageId]) + + return NextResponse.json({ success: true }, { status: 200 }) + } catch (error) { + return handleError(error, 'Failed to acknowledge message') + } +} diff --git a/solutions/message-queue-elasticache/app/layout.tsx b/solutions/message-queue-elasticache/app/layout.tsx new file mode 100644 index 0000000000..ce91600a37 --- /dev/null +++ b/solutions/message-queue-elasticache/app/layout.tsx @@ -0,0 +1,18 @@ +import type { ReactNode } from 'react' +import { Layout, getMetadata } from '@vercel/examples-ui' +import '@vercel/examples-ui/globals.css' + +export const metadata = getMetadata({ + title: 'message-queue-elasticache', + description: 'message-queue-elasticache', +}) + +export default function RootLayout({ children }: { children: ReactNode }) { + return ( + + + {children} + + + ) +} diff --git a/solutions/message-queue-elasticache/app/page.tsx b/solutions/message-queue-elasticache/app/page.tsx new file mode 100644 index 0000000000..ae81c808d9 --- /dev/null +++ b/solutions/message-queue-elasticache/app/page.tsx @@ -0,0 +1,186 @@ +'use client' + +import { useState } from 'react' +import Link from 'next/link' + +export default function Home() { + const [name, setName] = useState('') + const [email, setEmail] = useState('') + const [message, setMessage] = useState('') + const [result, setResult] = useState('') + const [isSubmitting, setIsSubmitting] = useState(false) + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault() + setIsSubmitting(true) + setResult('') + + try { + const response = await fetch('/api/messages', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ name, email, message }), + }) + + const data = await response.json() + + if (response.ok) { + setResult( + `✓ Message submitted successfully! (ID: ${data.streamMessageId})` + ) + // Clear form + setName('') + setEmail('') + setMessage('') + } else { + setResult(`✗ Error: ${data.error || 'Failed to submit message'}`) + } + } catch (error) { + setResult(`✗ Error: ${error}`) + } finally { + setIsSubmitting(false) + } + } + + return ( +
+

Contact Form

+

+ Submit a message to the queue. Messages are processed in order by + reviewers. +

+ +
+
+ + setName(e.target.value)} + placeholder="Enter your name" + required + style={{ + width: '100%', + padding: '8px', + border: '1px solid #ddd', + borderRadius: '4px', + }} + /> +
+ +
+ + setEmail(e.target.value)} + placeholder="Enter your email" + required + style={{ + width: '100%', + padding: '8px', + border: '1px solid #ddd', + borderRadius: '4px', + }} + /> +
+ +
+ +