diff --git a/aggregate-messages-servicebus-correlationid/README.md b/aggregate-messages-servicebus-correlationid/README.md new file mode 100644 index 0000000..9b3a178 --- /dev/null +++ b/aggregate-messages-servicebus-correlationid/README.md @@ -0,0 +1,452 @@ +# Aggregate messages from Azure Service Bus by CorrelationId + +## Overview + +This workflow template implements the **Aggregator enterprise integration pattern** for Azure Logic Apps Standard, designed to facilitate **migration from BizTalk Server** to Azure. The template retrieves messages from an Azure Service Bus queue in batches, decodes flat file content using an XSD schema, and groups messages by their CorrelationId property. + +This template replicates the functionality of BizTalk Server's Aggregator pattern, providing a cloud-native alternative that leverages Azure Logic Apps Standard capabilities while maintaining compatibility with existing BizTalk flat file schemas and integration patterns. + +## Pattern description + +The **Aggregator pattern** is an Enterprise Integration Pattern that collects related messages and combines them into a single, consolidated message. This template uses the CorrelationId property from Azure Service Bus messages to identify which messages belong together, making it ideal for scenarios where multiple related messages need to be processed as a unit. + +**For BizTalk Server users**: This pattern is equivalent to BizTalk's **Aggregator pattern with sequential convoy processing**. In BizTalk terminology: +- **Aggregator pattern** = The overall pattern (WHAT: collecting and combining related messages) +- **Sequential convoy** = The implementation mechanism (HOW: processing correlated messages one-by-one using correlation sets) + +This Logic Apps workflow combines both concepts: it implements the Aggregator pattern using sequential message processing within each batch, similar to how BizTalk orchestrations process convoy messages. + +## Architecture + +- **Trigger**: Azure Service Bus queue (peek-lock, non-session, batch mode) +- **Processing**: Sequential in-memory aggregation with flat file decoding +- **Output**: HTTP response with aggregated results grouped by CorrelationId +- **Error handling**: Scope-based error capture with detailed logging + +## Use cases + +This template is useful for the following scenarios: + +- **BizTalk Aggregator pattern migration**: Migrate BizTalk orchestrations that use Aggregator pattern with sequential or parallel convoy processing to Azure Logic Apps Standard +- **Order processing**: Aggregate multiple order line items (separate messages) into a complete order document +- **Document assembly**: Combine document fragments into a complete document (EDI segments, flat file records) +- **Data collection**: Group related telemetry or sensor data by device ID for batch analysis +- **Multi-step workflows**: Collect results from parallel operations using a correlation identifier before proceeding +- **Message batching**: Consolidate multiple messages with same business key for downstream processing + +### BizTalk Server migration considerations + +If you're migrating from BizTalk Server, this template provides equivalent functionality for: + +**BizTalk Aggregator Pattern Components:** +- **Correlation Sets** → **CorrelationId property**: Messages are grouped using Service Bus CorrelationId (equivalent to BizTalk correlation sets) +- **Sequential Convoy** → **Sequential processing**: Set `EnableSequentialProcessing=true` to process correlated messages one-by-one (equivalent to BizTalk sequential convoy orchestrations) +- **Message Collection** → **In-memory variables**: Uses `CorrelationGroups` dictionary to collect messages (equivalent to BizTalk orchestration variables) +- **XSD Schemas** → **Flat File Decoding**: Reuse your existing BizTalk flat file schemas without modification +- **Exception Blocks** → **Scope-based error handling**: Similar error capture and logging as BizTalk exception handlers +- **Receive Location Batching** → **Service Bus batch trigger**: Comparable to BizTalk receive locations with batching enabled + +**Key Pattern Mapping:** +- This workflow = BizTalk **Aggregator Orchestration** with **Sequential Convoy** +- NOT a parallel convoy (which would be `EnableSequentialProcessing=false`) + +**Pattern Hierarchy:** +``` +AGGREGATOR PATTERN (Enterprise Integration Pattern) +├── Purpose: Collect and combine related messages +├── Identifies messages by: CorrelationId property +│ +└── Implementation Mechanism: SEQUENTIAL CONVOY + ├── Processes messages: One-by-one in order + ├── Maintains: Message order and variable consistency + └── Alternative: Parallel Convoy (faster, no order guarantee) +``` + +**BizTalk Orchestration Equivalent:** +```csharp +// BizTalk Aggregator with Sequential Convoy +[CorrelationSet: OrderCorrelation] +while (receivedCount < expectedCount) +{ + Receive(OrderLinePort); // Sequential convoy + orderLines.Add(currentMessage); // Aggregation +} +CompleteOrder = Aggregate(orderLines); // Aggregator pattern result +Send(CompleteOrderPort); +``` + +**Logic Apps Workflow Equivalent:** +```json +Trigger: Service Bus (batch mode) +↓ +ForEach Message (sequential: concurrency=1) ← Sequential convoy + Extract CorrelationId ← Correlation set + Decode flat file + Add to CorrelationGroups[corrId] ← Aggregation +↓ +Build AggregatedResults ← Aggregator pattern result +Return HTTP Response +``` + +## Prerequisites + +Before you use this template, make sure that you have the following resources: + +### Azure Service Bus namespace and queue + +You need an Azure Service Bus namespace with a non-session queue configured: + +- **Sessions enabled**: Must be set to `false` +- **Lock duration**: At least 30 seconds (recommended: 5 minutes for large batches) +- **Max delivery count**: Configure based on your retry policy (recommended: 10) + +For more information, see [Create an Azure Service Bus namespace and queue](https://learn.microsoft.com/azure/service-bus-messaging/service-bus-quickstart-portal). + +### Flat file schema (XSD) + +You need an XSD schema that defines the structure of your flat file messages. Upload this schema to your logic app's `Artifacts/Schemas/` folder before deploying the workflow. + +**BizTalk Server compatibility**: This template supports BizTalk flat file schemas directly. You can export your existing BizTalk schemas and use them without modification in Azure Logic Apps Standard. The flat file decoding action in Logic Apps uses the same XSD schema format as BizTalk Server. + +For more information, see [Create schemas for flat file encoding and decoding](https://learn.microsoft.com/azure/logic-apps/logic-apps-enterprise-integration-flatfile). + +### Azure Service Bus connection + +You need to configure an Azure Service Bus connection in your logic app. The template uses the built-in Service Bus connector (ServiceProvider) for optimal performance in Azure Logic Apps Standard. + +For more information, see [Azure Service Bus connector overview](https://learn.microsoft.com/connectors/servicebus/). + +## Template parameters + +The template includes the following configurable parameters: + +### ServiceBusQueueName_#workflowname# +- **Type**: string +- **Required**: Yes +- **Default**: `"your-queue-name"` +- **Description**: The name of the Azure Service Bus queue to monitor for incoming messages. Replace `your-queue-name` with your actual queue name. + +### MaxBatchSize_#workflowname# +- **Type**: integer +- **Required**: Yes +- **Default**: `10` +- **Range**: 1-100 +- **Description**: The maximum number of messages to retrieve and process in a single batch. Adjust based on your message size and throughput requirements. +- **Performance guidance**: + - Small messages (<10KB): Use 50-100 for higher throughput + - Large messages (>100KB): Use 10-20 to avoid memory issues + - Strict ordering: Use lower values (5-10) for better control + +### FlatFileSchemaName_#workflowname# +- **Type**: string +- **Required**: Yes +- **Default**: `"Invoice.xsd"` +- **Description**: The name of the flat file schema (XSD) to use for decoding message content. This file must exist in your logic app's `Artifacts/Schemas/` folder. + +### DefaultCorrelationId_#workflowname# +- **Type**: string +- **Required**: No +- **Default**: `"NO_CORRELATION_ID"` +- **Description**: The fallback value to use when a message does not have a CorrelationId property. Messages with this value are grouped together. + +### ServiceBusConnectionName_#workflowname# +- **Type**: string +- **Required**: Yes +- **Default**: `"serviceBus"` +- **Description**: The name of the Azure Service Bus connection reference in your logic app's `connections.json` file. + +### EnableSequentialProcessing_#workflowname# +- **Type**: boolean +- **Required**: No +- **Default**: `true` +- **Description**: Controls message processing concurrency **within each batch**: + - `true`: Processes messages one-by-one sequentially (concurrency=1) to ensure consistent variable updates and avoid race conditions + - `false`: Allows parallel processing (up to 50 concurrent threads) for higher throughput, but aggregation order may vary + +**Note**: This setting does NOT control the order in which messages are retrieved from Service Bus. It only controls how messages within a single batch are processed by the workflow. + +### ResponseStatusCode + +- **Type**: Integer +- **Required**: No +- **Default**: `200` +- **Description**: HTTP status code to return in the response for successful processing. Standard value is 200 (OK). + +### ResponseContentType + +- **Type**: String +- **Required**: No +- **Default**: `application/json` +- **Description**: Content-Type header value for the HTTP response. Use 'application/json' for JSON-formatted responses. + +## How to use this template + +### Step 1: Create a logic app workflow from this template + +1. In the [Azure portal](https://portal.azure.com), open your Azure Logic Apps Standard resource. +2. On the logic app menu, select **Workflows**. +3. Select **Add** to create a new workflow. +4. In the template gallery, find and select **Aggregate messages from Azure Service Bus by CorrelationId**. +5. Provide values for the required parameters. +6. Select **Create** to create the workflow from the template. + +### Step 2: Configure your Azure Service Bus connection + +1. On your logic app menu, select **Connections**. +2. Select **Add** to create a new connection. +3. Search for and select **Azure Service Bus (Service Provider)**. +4. Provide your Azure Service Bus connection string. +5. Name the connection to match the `ServiceBusConnectionName_#workflowname#` parameter value. + +For more information, see [Azure Service Bus connector overview](https://learn.microsoft.com/connectors/servicebus/). + +### Step 3: Upload your flat file schema + +1. In your logic app, navigate to the `Artifacts/Schemas/` folder. +2. Upload your XSD schema file. +3. Make sure the file name matches the `FlatFileSchemaName_#workflowname#` parameter value. + +### Step 4: Test your workflow + +1. Send test messages to your Azure Service Bus queue with CorrelationId properties. +2. Monitor the workflow run history to verify that messages are grouped correctly. +3. Review the HTTP response to see the aggregated results. + +## Response format + +The workflow returns an HTTP response with the following JSON structure: + +```json +{ + "ProcessedBatchSize": 10, + "UniqueCorrelationIds": 3, + "AggregatedMessages": [ + { + "CorrelationId": "order-12345", + "MessageCount": 5, + "Messages": [ + { + "Invoice": { + "InvoiceId": "INV001", + "Items": [...] + } + } + ] + } + ], + "ProcessingTimestamp": "2025-10-31T12:00:00Z", + "Configuration": { + "QueueName": "your-queue-name", + "MaxBatchSize": 10, + "SchemaName": "Invoice.xsd", + "SequentialProcessing": true + } +} +``` + +The Response action returns the aggregated results as an HTTP response with the configured status code and content type. + +## Performance considerations + +### Batch size optimization + +The `MaxBatchSize_#workflowname#` parameter significantly impacts performance: + +| Message size | Recommended batch size | Rationale | +|--------------|----------------------|-----------| +| Small (<10KB) | 50-100 | Maximize throughput with minimal memory impact | +| Medium (10-100KB) | 20-50 | Balance throughput and memory usage | +| Large (>100KB) | 10-20 | Avoid memory issues and timeouts | + +### Sequential vs. parallel processing + +The `EnableSequentialProcessing_#workflowname#` parameter controls concurrency **within each batch**: + +- **Sequential (true)**: + - Processes messages one-by-one in the order they appear in the batch + - Prevents race conditions when updating `CorrelationGroups` variable + - Easier debugging with predictable execution flow + - Slower throughput (each message waits for previous to complete) + +- **Parallel (false)**: + - Processes up to 50 messages simultaneously + - 5-10x faster for large batches + - Aggregation order within same CorrelationId may vary + - Risk of race conditions if variables update simultaneously + +**Important**: Service Bus peek-lock guarantees FIFO order **only within a session-enabled queue**. This parameter controls processing order **after** messages are retrieved, not the retrieval order itself. + +### Lock duration + +Configure your Azure Service Bus queue's lock duration to be longer than your workflow's processing time to avoid messages being released back to the queue before processing completes. + +## Error handling + +The template includes comprehensive error handling: + +- **Scope-based error capture**: The `Process_Message_Scope` action wraps all message processing logic +- **Error logging**: Failed messages are logged with details including CorrelationId, MessageId, and error details +- **Continue on error**: The workflow continues processing remaining messages even if some messages fail +- **Error response format**: + +```json +{ + "ErrorType": "MessageProcessingFailed", + "CorrelationId": "order-12345", + "MessageId": "abc-123", + "ErrorDetails": {...}, + "Timestamp": "2025-10-31T12:00:00Z" +} +``` + +## Limitations + +Be aware of the following limitations: + +- **In-memory aggregation**: All messages in a batch are stored in memory during processing. Workflow execution timeout applies. +- **No cross-batch correlation**: Messages are aggregated only within a single batch. Messages with the same CorrelationId in different batches are not combined. +- **Lock renewal**: The template does not implement automatic lock renewal. Configure your queue's lock duration to exceed your workflow's processing time. +- **In-memory results**: The aggregated results are stored in workflow variables. For persistence, add actions to store results in Azure Storage, Cosmos DB, or other external systems. + +## Customization options + +You can customize this template to fit your specific needs: + +### Add message validation + +Insert a condition action after flat file decoding to validate message content before aggregation. + +### Implement cross-batch aggregation + +Use Azure Storage or Azure Cosmos DB to maintain aggregation state across multiple workflow runs. + +### Add dead-letter handling + +Configure dead-letter queue processing for messages that fail repeatedly. + +### Store aggregated results + +Add a Compose or HTTP action after `Log_Processing_Summary` to send results to external systems (Azure Storage, Cosmos DB, Event Hub, etc.). + +## Migrating from BizTalk Server + +This section provides guidance for organizations migrating BizTalk Server Aggregator patterns to Azure Logic Apps Standard. + +### BizTalk to Logic Apps mapping + +| BizTalk Server Component | Azure Logic Apps Equivalent | Notes | +|--------------------------|----------------------------|-------| +| Receive Location (with batching) | Service Bus trigger (peek-lock batch) | Use `MaxBatchSize` parameter | +| Flat File Disassembler | Flat File Decoding action | Compatible with BizTalk XSD schemas | +| Correlation Set | CorrelationId property | Use Service Bus message properties | +| Convoy Processing | Sequential processing + grouping | Set `EnableSequentialProcessing=true` | +| Aggregator Orchestration | Foreach + variables | In-memory aggregation with dictionaries | +| Exception Handler | Scope-based error handling | `Process_Message_Scope` and `Handle_Scope_Error` | +| Send Port | HTTP Response or downstream action | Configurable via `ResponseStatusCode` | + +### Migration steps + +1. **Export BizTalk schemas**: Export your flat file XSD schemas from BizTalk Server +2. **Upload to Logic Apps**: Place schemas in `Artifacts/Schemas/` folder +3. **Configure Service Bus**: Create Azure Service Bus queue to replace BizTalk receive location +4. **Deploy template**: Use this template as a starting point +5. **Test with sample data**: Use existing BizTalk test messages to validate behavior +6. **Adjust parameters**: Fine-tune `MaxBatchSize` and `EnableSequentialProcessing` based on your needs + +### Key differences from BizTalk + +- **Stateless batching**: Logic Apps processes one batch at a time; cross-batch correlation requires external storage +- **No automatic lock renewal**: Configure queue lock duration longer than workflow execution time +- **Cloud-native**: Leverages Azure platform capabilities (Application Insights, managed identities, etc.) +- **Simpler deployment**: No BizTalk Server infrastructure needed; serverless execution model + +### Resources for migration + +- [Migrate BizTalk Server applications to Azure Logic Apps](https://learn.microsoft.com/azure/logic-apps/logic-apps-move-from-biztalk) +- [Azure Integration Services](https://azure.microsoft.com/solutions/integration/) +- [BizTalk migration program](https://azure.microsoft.com/migration/biztalk/) + +## Troubleshooting + +### Messages not being grouped + +**Symptom**: All messages have different CorrelationIds in the output. + +**Solution**: Verify that your Azure Service Bus messages have the CorrelationId property set. Use the `DefaultCorrelationId_#workflowname#` parameter to identify messages without this property. + +### Flat file decoding fails + +**Symptom**: The workflow fails with "Schema not found" or decoding errors. + +**Solution**: +1. Verify that the XSD schema file exists in `Artifacts/Schemas/` +2. Check that the `FlatFileSchemaName_#workflowname#` parameter matches the file name exactly +3. Validate that your message content matches the schema format + +### Workflow times out + +**Symptom**: The workflow run exceeds the execution timeout. + +**Solution**: +1. Reduce the `MaxBatchSize_#workflowname#` parameter +2. Enable parallel processing by setting `EnableSequentialProcessing_#workflowname#` to `false` +3. Simplify your flat file schema to improve decoding performance + +### Connection errors + +**Symptom**: "Connection not found" or authentication errors. + +**Solution**: +1. Verify that the `ServiceBusConnectionName_#workflowname#` parameter matches your connection name in `connections.json` +2. Check that your Azure Service Bus connection string is valid and has the necessary permissions + +## Best practices + +Follow these best practices when using this template: + +1. **Start with small batches**: Begin with `MaxBatchSize_#workflowname#` set to 10 and increase gradually while monitoring performance. + +2. **Monitor run history**: Regularly review workflow run history to identify patterns in message processing times and error rates. + +3. **Use Application Insights**: Enable Application Insights integration to track detailed metrics and diagnose issues. + +4. **Version your schemas**: Include version numbers in schema file names (for example, `invoice-v1.xsd`) to support schema evolution. + +5. **Test error handling**: Send malformed messages to verify that error handling works as expected. + +6. **Configure lock duration**: Set your queue's lock duration to at least 2x your average workflow execution time. + +## Related templates + +You might also be interested in these related templates for BizTalk Server migration and integration patterns: + +- **Process messages with session support**: For scenarios requiring strict message ordering and state management (equivalent to BizTalk sequential convoys) +- **Batch message processing**: For simple batch processing without aggregation +- **Content-based router**: For routing messages to different destinations based on content (equivalent to BizTalk content-based routing) +- **Message transformation**: For XML and flat file transformations (equivalent to BizTalk maps) + +## Support and feedback + +For issues or questions about this template: + +- Review the [Azure Logic Apps documentation](https://learn.microsoft.com/azure/logic-apps/) +- Review [BizTalk Server migration guidance](https://learn.microsoft.com/azure/logic-apps/logic-apps-move-from-biztalk) +- Visit the [Azure Logic Apps community forum](https://learn.microsoft.com/answers/tags/140/azure-logic-apps) +- Report issues in the [Azure Logic Apps feedback forum](https://feedback.azure.com/d365community/forum/ca7c5c88-0925-ec11-b6e6-000d3a4f07b8) + +## Version history + +- **Version 1.0** (2025-11-01): Initial template release for BizTalk Server migration + - Supports Azure Logic Apps Standard (stateful workflows) + - Built-in Azure Service Bus connector for optimal performance + - Flat file decoding with XSD schemas (BizTalk-compatible) + - CorrelationId-based message grouping (equivalent to BizTalk convoy processing) + - Comprehensive error handling with Scope-based handlers + - 8 configurable parameters including response customization + - Sequential and parallel processing modes + - HTTP Response action for downstream integration + - Designed specifically for BizTalk Aggregator pattern migration + +## License + +This template is provided under the [MIT License](https://opensource.org/licenses/MIT). diff --git a/aggregate-messages-servicebus-correlationid/manifest.json b/aggregate-messages-servicebus-correlationid/manifest.json new file mode 100644 index 0000000..f474cfa --- /dev/null +++ b/aggregate-messages-servicebus-correlationid/manifest.json @@ -0,0 +1,122 @@ +{ + "title": "Aggregate messages from Azure Service Bus by CorrelationId", + "description": "Aggregates messages from an Azure Service Bus queue by grouping them using the CorrelationId property. Designed to replicate BizTalk Server Aggregator pattern in Azure Logic Apps Standard. Processes messages in batches, decodes flat file content using an XSD schema, and returns aggregated results.", + "prerequisites": "**Azure Service Bus**: You need an Azure Service Bus namespace with a non-session queue configured. **Flat file schema**: You need an XSD schema uploaded to the logic app's Artifacts/Schemas folder for message decoding (supports BizTalk flat file schemas). **Connection**: You need to configure an Azure Service Bus connection in your logic app. For more information, see [Azure Service Bus connector overview](https://learn.microsoft.com/connectors/servicebus/).", + "tags": [ + "Azure Service Bus", + "Aggregator", + "CorrelationId", + "Batch Processing", + "Flat File Decoding", + "Message Grouping", + "Integration Pattern", + "BizTalk Migration" + ], + "skus": [ + "standard" + ], + "kinds": [ + "stateful" + ], + "detailsDescription": "This workflow template implements the Aggregator enterprise integration pattern for Azure Logic Apps Standard, designed to facilitate migration from BizTalk Server to Azure. It retrieves messages from an Azure Service Bus queue in batches, decodes flat file content using XSD schemas (compatible with BizTalk flat file schemas), and groups messages by their CorrelationId property. The template uses built-in Service Bus operations for optimal performance and includes comprehensive error handling, making it ideal for organizations modernizing their BizTalk Server integration solutions.", + "details": [ + { + "name": "By", + "value": "Microsoft" + }, + { + "name": "Type", + "value": "Workflow" + }, + { + "name": "Trigger", + "value": "Azure Service Bus - When messages are available" + } + ], + "artifacts": [ + { + "type": "workflow", + "file": "workflow.json" + }, + { + "type": "workflow", + "file": "parameters.json" + } + ], + "images": { + "light": "workflow-light", + "dark": "workflow-dark" + }, + "parameters": [ + { + "name": "ServiceBusQueueName_#workflowname#", + "displayName": "Azure Service Bus queue name", + "type": "String", + "default": "your-queue-name", + "description": "The name of the Azure Service Bus queue to monitor for incoming messages. This queue must exist in your Service Bus namespace and should have sessions disabled.", + "required": true + }, + { + "name": "MaxBatchSize_#workflowname#", + "displayName": "Maximum batch size", + "type": "Int", + "default": "10", + "description": "The maximum number of messages to retrieve and process in a single batch. Valid range is 1-100. Adjust this value based on your message size and throughput requirements.", + "required": true + }, + { + "name": "FlatFileSchemaName_#workflowname#", + "displayName": "Flat file schema name", + "type": "String", + "default": "Invoice.xsd", + "description": "The name of the flat file schema (XSD) to use for decoding message content. This schema must be uploaded to your logic app's Artifacts/Schemas folder before deployment.", + "required": true + }, + { + "name": "DefaultCorrelationId_#workflowname#", + "displayName": "Default CorrelationId", + "type": "String", + "default": "NO_CORRELATION_ID", + "description": "The fallback value to use when a message does not have a CorrelationId property. Messages with this value will be grouped together.", + "required": false + }, + { + "name": "ServiceBusConnectionName_#workflowname#", + "displayName": "Azure Service Bus connection name", + "type": "String", + "default": "serviceBus", + "description": "The name of the Azure Service Bus connection reference in your logic app's connections.json file. This connection provides authentication credentials for accessing the Service Bus namespace.", + "required": true + }, + { + "name": "EnableSequentialProcessing_#workflowname#", + "displayName": "Enable sequential processing", + "type": "Bool", + "default": "true", + "description": "When set to true, processes messages sequentially (concurrency=1) to ensure order. When set to false, allows parallel processing for higher throughput. Use true for scenarios requiring strict message ordering.", + "required": false + }, + { + "name": "ResponseStatusCode_#workflowname#", + "displayName": "HTTP response status code", + "type": "Int", + "default": "200", + "description": "The HTTP status code to return in the response for successful processing. Standard value is 200 (OK).", + "required": false + }, + { + "name": "ResponseContentType_#workflowname#", + "displayName": "HTTP response content type", + "type": "String", + "default": "application/json", + "description": "The Content-Type header value for the HTTP response. Use 'application/json' for JSON-formatted responses.", + "required": false + } + ], + "connections": [ + { + "connectorId": "serviceBus_#workflowname#", + "kind": "inapp" + } + ] +} diff --git a/aggregate-messages-servicebus-correlationid/parameters.json b/aggregate-messages-servicebus-correlationid/parameters.json new file mode 100644 index 0000000..0319c35 --- /dev/null +++ b/aggregate-messages-servicebus-correlationid/parameters.json @@ -0,0 +1,30 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "ServiceBusQueueName_#workflowname#": { + "value": "insbcorrelation" + }, + "MaxBatchSize_#workflowname#": { + "value": 10 + }, + "FlatFileSchemaName_#workflowname#": { + "value": "Invoice.xsd" + }, + "DefaultCorrelationId_#workflowname#": { + "value": "NO_CORRELATION_ID" + }, + "ServiceBusConnectionName_#workflowname#": { + "value": "serviceBus" + }, + "EnableSequentialProcessing_#workflowname#": { + "value": true + }, + "ResponseStatusCode_#workflowname#": { + "value": 200 + }, + "ResponseContentType_#workflowname#": { + "value": "application/json" + } + } +} diff --git a/aggregate-messages-servicebus-correlationid/workflow-dark.png b/aggregate-messages-servicebus-correlationid/workflow-dark.png new file mode 100644 index 0000000..ef65fd9 Binary files /dev/null and b/aggregate-messages-servicebus-correlationid/workflow-dark.png differ diff --git a/aggregate-messages-servicebus-correlationid/workflow-light.png b/aggregate-messages-servicebus-correlationid/workflow-light.png new file mode 100644 index 0000000..aaa8193 Binary files /dev/null and b/aggregate-messages-servicebus-correlationid/workflow-light.png differ diff --git a/aggregate-messages-servicebus-correlationid/workflow.json b/aggregate-messages-servicebus-correlationid/workflow.json new file mode 100644 index 0000000..32444d7 --- /dev/null +++ b/aggregate-messages-servicebus-correlationid/workflow.json @@ -0,0 +1,431 @@ +{ + "definition": { + "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "ServiceBusQueueName": { + "type": "string", + "defaultValue": "insbcorrelation", + "metadata": { + "description": "Name of the Azure Service Bus queue to monitor for incoming messages" + } + }, + "MaxBatchSize": { + "type": "int", + "defaultValue": 10, + "metadata": { + "description": "Maximum number of messages to retrieve and process in a single batch (1-100)" + } + }, + "FlatFileSchemaName": { + "type": "string", + "defaultValue": "Invoice.xsd", + "metadata": { + "description": "Name of the flat file schema (XSD) to use for message decoding" + } + }, + "DefaultCorrelationId": { + "type": "string", + "defaultValue": "NO_CORRELATION_ID", + "metadata": { + "description": "Default value to use when a message does not have a CorrelationId" + } + }, + "ServiceBusConnectionName": { + "type": "string", + "defaultValue": "serviceBus", + "metadata": { + "description": "Name of the Azure Service Bus connection defined in connections.json" + } + }, + "EnableSequentialProcessing": { + "type": "bool", + "defaultValue": true, + "metadata": { + "description": "When true, processes messages sequentially (concurrency=1); when false, allows parallel processing" + } + }, + "ResponseStatusCode": { + "type": "int", + "defaultValue": 200, + "metadata": { + "description": "HTTP status code for successful response" + } + }, + "ResponseContentType": { + "type": "string", + "defaultValue": "application/json", + "metadata": { + "description": "Content-Type header for the HTTP response" + } + } + }, + "actions": { + "Process_Batch_Messages": { + "type": "Foreach", + "foreach": "@triggerBody()", + "actions": { + "Process_Message_Scope": { + "type": "Scope", + "actions": { + "Get_CorrelationId": { + "type": "Compose", + "inputs": "@coalesce(items('Process_Batch_Messages')?['correlationId'], parameters('DefaultCorrelationId'))", + "metadata": { + "description": "Extract CorrelationId from Azure Service Bus message properties. Falls back to DefaultCorrelationId if not present" + } + }, + "Decode_Flat_File": { + "type": "FlatFileDecoding", + "inputs": { + "content": "@items('Process_Batch_Messages')?['contentData']", + "schema": { + "source": "LogicApp", + "name": "@parameters('FlatFileSchemaName')" + } + }, + "runAfter": { + "Get_CorrelationId": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Decode flat file message content using the configured XSD schema. Converts positional text format to XML" + } + }, + "Get_Current_Group": { + "type": "Compose", + "inputs": "@if(contains(variables('CorrelationGroups'), outputs('Get_CorrelationId')), variables('CorrelationGroups')[outputs('Get_CorrelationId')], json('[]'))", + "runAfter": { + "Decode_Flat_File": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Retrieve existing messages for current CorrelationId from dictionary. Returns empty array if this is the first message for this CorrelationId" + } + }, + "Append_To_Group": { + "type": "Compose", + "inputs": "@union(outputs('Get_Current_Group'), array(body('Decode_Flat_File')))", + "runAfter": { + "Get_Current_Group": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Append newly decoded message to existing messages array for this CorrelationId using union operation" + } + }, + "Build_Updated_Groups": { + "type": "Compose", + "inputs": "@setProperty(variables('CorrelationGroups'), outputs('Get_CorrelationId'), outputs('Append_To_Group'))", + "runAfter": { + "Append_To_Group": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Build updated CorrelationGroups dictionary with new message array for current CorrelationId using setProperty function" + } + }, + "Update_CorrelationGroups": { + "type": "SetVariable", + "inputs": { + "name": "CorrelationGroups", + "value": "@outputs('Build_Updated_Groups')" + }, + "runAfter": { + "Build_Updated_Groups": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Update CorrelationGroups variable with new dictionary containing the appended message" + } + }, + "Check_If_New_CorrelationId": { + "type": "If", + "expression": { + "and": [ + { + "not": { + "contains": [ + "@variables('ProcessedCorrelationIds')", + "@outputs('Get_CorrelationId')" + ] + } + } + ] + }, + "actions": { + "Add_To_ProcessedIds": { + "type": "AppendToArrayVariable", + "inputs": { + "name": "ProcessedCorrelationIds", + "value": "@outputs('Get_CorrelationId')" + }, + "metadata": { + "description": "Add CorrelationId to tracking array to preserve order of first occurrence" + } + } + }, + "else": { + "actions": {} + }, + "runAfter": { + "Update_CorrelationGroups": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Check if this is the first message with this CorrelationId in current batch. If new, add to ProcessedCorrelationIds tracking array" + } + } + } + }, + "Handle_Scope_Error": { + "type": "If", + "expression": { + "and": [ + { + "equals": [ + "@result('Process_Message_Scope')[0]['status']", + "Failed" + ] + } + ] + }, + "actions": { + "Log_Error": { + "type": "Compose", + "inputs": { + "ErrorType": "MessageProcessingFailed", + "CorrelationId": "@coalesce(items('Process_Batch_Messages')?['correlationId'], 'UNKNOWN')", + "MessageId": "@items('Process_Batch_Messages')?['messageId']", + "ErrorDetails": "@result('Process_Message_Scope')[0]", + "Timestamp": "@utcNow()" + }, + "metadata": { + "description": "Compose error log entry with message details and scope failure information for troubleshooting" + } + } + }, + "else": { + "actions": {} + }, + "runAfter": { + "Process_Message_Scope": [ + "SUCCEEDED", + "FAILED", + "SKIPPED", + "TIMEDOUT" + ] + }, + "metadata": { + "description": "Error handler for Process_Message_Scope. Captures failures from FlatFileDecoding or other processing steps. Workflow continues processing remaining messages" + } + } + }, + "runAfter": { + "Initialize_ProcessedCorrelationIds": [ + "SUCCEEDED" + ] + }, + "runtimeConfiguration": { + "concurrency": { + "repetitions": 1 + } + }, + "metadata": { + "description": "Iterate through all messages in batch. Process each message: extract CorrelationId, decode flat file, and group by CorrelationId. Sequential processing (concurrency=1) ensures correct order" + } + }, + "Build_Aggregated_Messages_Scope": { + "type": "Scope", + "actions": { + "Build_Aggregated_Messages": { + "type": "Foreach", + "foreach": "@variables('ProcessedCorrelationIds')", + "actions": { + "Get_Messages_For_CorrelationId": { + "type": "Compose", + "inputs": "@variables('CorrelationGroups')[items('Build_Aggregated_Messages')]", + "metadata": { + "description": "Retrieve all messages for current CorrelationId from CorrelationGroups dictionary" + } + }, + "Build_Result_Object": { + "type": "Compose", + "inputs": { + "CorrelationId": "@items('Build_Aggregated_Messages')", + "MessageCount": "@length(outputs('Get_Messages_For_CorrelationId'))", + "Messages": "@outputs('Get_Messages_For_CorrelationId')" + }, + "runAfter": { + "Get_Messages_For_CorrelationId": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Build result object containing CorrelationId, count of messages, and array of decoded message bodies" + } + }, + "Append_To_Results": { + "type": "AppendToArrayVariable", + "inputs": { + "name": "AggregatedResults", + "value": "@outputs('Build_Result_Object')" + }, + "runAfter": { + "Build_Result_Object": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Add aggregated result object to final results array" + } + } + }, + "runtimeConfiguration": { + "concurrency": { + "repetitions": 1 + } + }, + "metadata": { + "description": "Build final aggregated results array. Iterates through ProcessedCorrelationIds (preserving order) and creates result objects with message groups" + } + } + }, + "runAfter": { + "Process_Batch_Messages": [ + "SUCCEEDED", + "FAILED" + ] + }, + "metadata": { + "description": "Scope for aggregation process with error handling support" + } + }, + "Handle_Aggregation_Error": { + "type": "If", + "expression": { + "and": [ + { + "equals": [ + "@result('Build_Aggregated_Messages_Scope')[0]['status']", + "Failed" + ] + } + ] + }, + "actions": { + "Log_Aggregation_Error": { + "type": "Compose", + "inputs": { + "ErrorType": "AggregationFailed", + "ErrorDetails": "@result('Build_Aggregated_Messages_Scope')[0]", + "ProcessedCorrelationIds": "@variables('ProcessedCorrelationIds')", + "PartialResults": "@variables('AggregatedResults')", + "Timestamp": "@utcNow()" + }, + "metadata": { + "description": "Log aggregation error with context including partial results and correlation IDs" + } + } + }, + "else": { + "actions": {} + }, + "runAfter": { + "Build_Aggregated_Messages_Scope": [ + "SUCCEEDED", + "FAILED", + "SKIPPED", + "TIMEDOUT" + ] + }, + "metadata": { + "description": "Error handler for aggregation scope. Logs errors but allows workflow to continue and return response" + } + }, + "Response": { + "type": "Response", + "kind": "http", + "inputs": { + "statusCode": "@parameters('ResponseStatusCode')", + "headers": { + "Content-Type": "@parameters('ResponseContentType')" + }, + "body": { + "ProcessedBatchSize": "@length(triggerBody())", + "UniqueCorrelationIds": "@length(variables('AggregatedResults'))", + "AggregatedMessages": "@variables('AggregatedResults')", + "ProcessingTimestamp": "@utcNow()", + "Configuration": { + "QueueName": "@parameters('ServiceBusQueueName')", + "MaxBatchSize": "@parameters('MaxBatchSize')", + "SchemaName": "@parameters('FlatFileSchemaName')", + "SequentialProcessing": "@parameters('EnableSequentialProcessing')" + } + } + }, + "runAfter": { + "Handle_Aggregation_Error": [ + "SUCCEEDED", + "FAILED", + "SKIPPED" + ] + }, + "metadata": { + "description": "Return HTTP response with aggregated results. Includes batch statistics, grouped messages by CorrelationId, processing timestamp, and configuration details" + } + }, + "Initialize_ProcessedCorrelationIds": { + "type": "InitializeVariable", + "inputs": { + "variables": [ + { + "name": "CorrelationGroups", + "type": "object", + "value": {} + }, + { + "name": "AggregatedResults", + "type": "array", + "value": [] + }, + { + "name": "ProcessedCorrelationIds", + "type": "array", + "value": [] + } + ] + }, + "runAfter": {} + } + }, + "outputs": {}, + "triggers": { + "When_messages_are_available_in_a_queue": { + "type": "ServiceProvider", + "inputs": { + "parameters": { + "queueName": "@parameters('ServiceBusQueueName')", + "isSessionsEnabled": false, + "maxMessageCount": "@parameters('MaxBatchSize')" + }, + "serviceProviderConfiguration": { + "connectionName": "serviceBus", + "operationId": "peekLockQueueMessagesV2", + "serviceProviderId": "/serviceProviders/serviceBus" + } + }, + "metadata": { + "description": "Built-in Azure Service Bus trigger (ServiceProvider). Retrieves batch of messages using peek-lock from non-session queue. Uses peekLockQueueMessagesV2 for better performance in Azure Logic Apps Standard" + } + } + } + }, + "kind": "Stateful" +} \ No newline at end of file diff --git a/manifest.json b/manifest.json index ba2ec48..18d67d7 100644 --- a/manifest.json +++ b/manifest.json @@ -1,58 +1,59 @@ [ - "analyze-images-ai-rag", - "get-message-ibm-mq-queue-browse-lock-abort", - "get-message-service-bus-queue-peek-lock-abandon", - "get-message-service-bus-queue-peek-lock-renew-sequential-convoy", - "get-message-service-bus-queue-peek-lock-renew", - "get-message-service-bus-topic-peek-lock-abandon", - "get-message-service-bus-topic-peek-lock-renew", - "get-sales-order-status-sap-teams-bapi", - "post-sap-goodsmovement-using-bapi", - "ingest-document-blob-openai-cosmos", - "ingest-doc-blob-document-intelligence-cosmos", - "ingest-doc-sharepoint-document-intelligence-cosmos", - "ingest-document-sharepoint-openai-cosmos", - "ingest-index-ai-azure-files-rag", - "ingest-index-ai-blob-storage-rag", - "ingest-index-ai-http-rag", - "ingest-index-ai-one-drive-business-rag", - "ingest-index-ai-sftp-rag", - "ingest-index-ai-sharepoint-rag", - "receive-request-send-response", - "send-alerts-business-process-tracking", - "sync-business-partner-sap-sharepoint-odata", - "sync-sales-order-sap-blob-storage-odata", - "try-catch", - "unblock-user-sap-teams-self-service-bapi", - "ingest-index-ai-azure-files-recurrence-rag", - "analyze-documents-ai-docintelligence", - "ingest-index-ai-azure-files-schedule-rag", - "ingest-index-ai-one-drive-schedule-rag", - "chat-with-documents-ai", - "accelerator-index-retrieve-ai-sharepoint-aisearch", - "accelerator-index-retrieve-resumes-sql", - "send-avs-logs-to-log-server", - "vectorize-onedriveforbusiness-aisearch-request", - "vectorize-onedrive-aisearch-request", - "vectorize-onedriveforbusiness-aisearch-schedule", - "vectorize-onedrive-aisearch-schedule", - "vectorize-sharepoint-aisearch-request", - "vectorize-sharepoint-aisearch-schedule", - "vectorize-azurefile-aisearch-request", - "vectorize-azurefile-aisearch-schedule", - "vectorize-amazons3-aisearch-request", - "vectorize-amazons3-aisearch-schedule", - "vectorize-dropbox-aisearch-request", - "vectorize-dropbox-aisearch-schedule", - "vectorize-sftp-aisearch-schedule", - "vectorize-sftp-aisearch-request", - "vectorize-azurequeue-aisearch-schedule", - "vectorize-azurequeue-aisearch-request", - "vectorize-servicebus-aisearch-schedule", - "tool-send-email-outlook", - "tool-get-weather-msn", - "tool-get-rows-dataverse", - "tool-get-rows-sql", - "tool-execute-query-sql", - "tool-call-uri-http" + "analyze-images-ai-rag", + "get-message-ibm-mq-queue-browse-lock-abort", + "get-message-service-bus-queue-peek-lock-abandon", + "get-message-service-bus-queue-peek-lock-renew-sequential-convoy", + "get-message-service-bus-queue-peek-lock-renew", + "get-message-service-bus-topic-peek-lock-abandon", + "get-message-service-bus-topic-peek-lock-renew", + "get-sales-order-status-sap-teams-bapi", + "post-sap-goodsmovement-using-bapi", + "ingest-document-blob-openai-cosmos", + "ingest-doc-blob-document-intelligence-cosmos", + "ingest-doc-sharepoint-document-intelligence-cosmos", + "ingest-document-sharepoint-openai-cosmos", + "ingest-index-ai-azure-files-rag", + "ingest-index-ai-blob-storage-rag", + "ingest-index-ai-http-rag", + "ingest-index-ai-one-drive-business-rag", + "ingest-index-ai-sftp-rag", + "ingest-index-ai-sharepoint-rag", + "receive-request-send-response", + "send-alerts-business-process-tracking", + "sync-business-partner-sap-sharepoint-odata", + "sync-sales-order-sap-blob-storage-odata", + "try-catch", + "unblock-user-sap-teams-self-service-bapi", + "ingest-index-ai-azure-files-recurrence-rag", + "analyze-documents-ai-docintelligence", + "ingest-index-ai-azure-files-schedule-rag", + "ingest-index-ai-one-drive-schedule-rag", + "chat-with-documents-ai", + "accelerator-index-retrieve-ai-sharepoint-aisearch", + "accelerator-index-retrieve-resumes-sql", + "send-avs-logs-to-log-server", + "vectorize-onedriveforbusiness-aisearch-request", + "vectorize-onedrive-aisearch-request", + "vectorize-onedriveforbusiness-aisearch-schedule", + "vectorize-onedrive-aisearch-schedule", + "vectorize-sharepoint-aisearch-request", + "vectorize-sharepoint-aisearch-schedule", + "vectorize-azurefile-aisearch-request", + "vectorize-azurefile-aisearch-schedule", + "vectorize-amazons3-aisearch-request", + "vectorize-amazons3-aisearch-schedule", + "vectorize-dropbox-aisearch-request", + "vectorize-dropbox-aisearch-schedule", + "vectorize-sftp-aisearch-schedule", + "vectorize-sftp-aisearch-request", + "vectorize-azurequeue-aisearch-schedule", + "vectorize-azurequeue-aisearch-request", + "vectorize-servicebus-aisearch-schedule", + "tool-send-email-outlook", + "tool-get-weather-msn", + "tool-get-rows-dataverse", + "tool-get-rows-sql", + "tool-execute-query-sql", + "tool-call-uri-http", + "aggregate-messages-servicebus-correlationid" ]