1111using System . Collections . Generic ;
1212using System . Threading . Tasks ;
1313using WebJob . Office365ActivityImporter . Engine . Entities . Serialisation ;
14+ using Azure . Identity ; // Added for ClientSecretCredential
1415
1516namespace WebJob . Office365ActivityImporter . Engine . Graph . Calls
1617{
@@ -32,7 +33,7 @@ public class CallQueueProcessor : IDisposable
3233 private ManualGraphCallClient _graphCallClient ;
3334 private bool _isInitialised = false ;
3435
35-
36+ public ServiceBusClient ServiceBusClient => _sbClient ;
3637 public static CallQueueProcessor _singleton = null ;
3738 public static async Task < CallQueueProcessor > GetCallQueueProcessor ( AppConfig config , string thisTenantId , ManualGraphCallClient graphCallClient )
3839 {
@@ -53,18 +54,38 @@ private CallQueueProcessor(AppConfig config, string thisTenantId)
5354 _auth = new GraphAppIndentityOAuthContext ( _telemetry , config . ClientID , config . TenantGUID . ToString ( ) , config . ClientSecret , config . KeyVaultUrl , config . UseClientCertificate ) ;
5455 this . _thisTenantId = thisTenantId ;
5556
56- var sbCredential = new Azure . Identity . ClientSecretCredential ( config . TenantGUID . ToString ( ) , config . ClientID , config . ClientSecret ) ;
57-
58- _sbClient = new ServiceBusClient ( config . ConnectionStrings . ServiceBusConnectionString , sbCredential ) ;
59- var sbConnectionInfo = ServiceBusConnectionStringProperties . Parse ( config . ConnectionStrings . ServiceBusConnectionString ) ;
60- _processor = _sbClient . CreateProcessor ( sbConnectionInfo . EntityPath , new ServiceBusProcessorOptions
57+ // If RBAC is enabled, build ServiceBusClient using AAD credential
58+ if ( config . UseRBACForServiceBus )
59+ {
60+ _telemetry . LogInformation ( "Initializing ServiceBusClient using RBAC (ClientSecretCredential)." ) ;
61+ var credential = new ClientSecretCredential ( config . TenantGUID . ToString ( ) , config . ClientID , config . ClientSecret ) ;
62+ // Extract fully qualified namespace from connection string (Endpoint=sb://namespace.servicebus.windows.net/;...)
63+ var sbProps = ServiceBusConnectionStringProperties . Parse ( config . ConnectionStrings . ServiceBusConnectionString ) ;
64+ var fullyQualifiedNamespace = sbProps . FullyQualifiedNamespace ; // e.g. namespace.servicebus.windows.net
65+ _sbClient = new ServiceBusClient ( fullyQualifiedNamespace , credential ) ;
66+ _processor = _sbClient . CreateProcessor ( sbProps . EntityPath , new ServiceBusProcessorOptions
67+ {
68+ MaxConcurrentCalls = 10 ,
69+ PrefetchCount = 0 ,
70+ ReceiveMode = ServiceBusReceiveMode . PeekLock ,
71+ MaxAutoLockRenewalDuration = TimeSpan . FromHours ( 24 ) , // Queue should be configured for 5 minute lock timeout
72+ AutoCompleteMessages = false // Messages are completed only when the migrator has succeeded to migrate the file
73+ } ) ;
74+ }
75+ else
6176 {
62- MaxConcurrentCalls = 10 ,
63- PrefetchCount = 0 ,
64- ReceiveMode = ServiceBusReceiveMode . PeekLock ,
65- MaxAutoLockRenewalDuration = TimeSpan . FromHours ( 24 ) , // Queue should be configured for 5 minute lock timeout
66- AutoCompleteMessages = false // Messages are completed only when the migrator has succeeded to migrate the file
67- } ) ;
77+ // Legacy SAS connection string approach
78+ _sbClient = new ServiceBusClient ( config . ConnectionStrings . ServiceBusConnectionString ) ;
79+ var sbConnectionInfo = ServiceBusConnectionStringProperties . Parse ( config . ConnectionStrings . ServiceBusConnectionString ) ;
80+ _processor = _sbClient . CreateProcessor ( sbConnectionInfo . EntityPath , new ServiceBusProcessorOptions
81+ {
82+ MaxConcurrentCalls = 10 ,
83+ PrefetchCount = 0 ,
84+ ReceiveMode = ServiceBusReceiveMode . PeekLock ,
85+ MaxAutoLockRenewalDuration = TimeSpan . FromHours ( 24 ) , // Queue should be configured for 5 minute lock timeout
86+ AutoCompleteMessages = false // Messages are completed only when the migrator has succeeded to migrate the file
87+ } ) ;
88+ }
6889 }
6990
7091 #endregion
0 commit comments