diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListenerFactory.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListenerFactory.cs index 0d338ed5d31d..63404c1fef16 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListenerFactory.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListenerFactory.cs @@ -107,8 +107,13 @@ public async Task CreateAsync(CancellationToken cancellationToken) new SharedBlobListenerFactory(hostId, _hostBlobServiceClient, _exceptionHandler, _blobWrittenWatcherSetter, _loggerFactory.CreateLogger())); // Register the blob container we wish to monitor with the shared blob listener. - await RegisterWithSharedBlobListenerAsync(hostId, sharedBlobListener, primaryBlobClient, - blobTriggerQueueWriter, cancellationToken).ConfigureAwait(false); + await RegisterWithSharedBlobListenerAsync( + hostId, + sharedBlobListener, + primaryBlobClient, + targetBlobClient, + blobTriggerQueueWriter, + cancellationToken).ConfigureAwait(false); } // Create a "bridge" listener that will monitor the blob @@ -165,14 +170,20 @@ await RegisterWithSharedBlobListenerAsync(hostId, sharedBlobListener, primaryBlo private async Task RegisterWithSharedBlobListenerAsync( string hostId, SharedBlobListener sharedBlobListener, - BlobServiceClient blobClient, + BlobServiceClient primaryBlobClient, + BlobServiceClient targetBlobClient, BlobTriggerQueueWriter blobTriggerQueueWriter, CancellationToken cancellationToken) { - BlobTriggerExecutor triggerExecutor = new BlobTriggerExecutor(hostId, _functionDescriptor, _input, new BlobReceiptManager(blobClient), + BlobTriggerExecutor triggerExecutor = new BlobTriggerExecutor(hostId, _functionDescriptor, _input, new BlobReceiptManager(primaryBlobClient), blobTriggerQueueWriter, _loggerFactory.CreateLogger()); - await sharedBlobListener.RegisterAsync(blobClient, _container, triggerExecutor, cancellationToken).ConfigureAwait(false); + await sharedBlobListener.RegisterAsync( + primaryBlobClient, + targetBlobClient, + _container, + triggerExecutor, + cancellationToken).ConfigureAwait(false); } private void RegisterWithSharedBlobQueueListenerAsync( diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/IBlobNotificationStrategy.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/IBlobNotificationStrategy.cs index 5177897f47a7..32f50f538a33 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/IBlobNotificationStrategy.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/IBlobNotificationStrategy.cs @@ -11,7 +11,11 @@ namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners { internal interface IBlobNotificationStrategy : ITaskSeriesCommand, IBlobWrittenWatcher { - Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClient container, ITriggerExecutor triggerExecutor, + Task RegisterAsync( + BlobServiceClient primaryBlobServiceClient, + BlobServiceClient targetBlobServiceClient, + BlobContainerClient container, + ITriggerExecutor triggerExecutor, CancellationToken cancellationToken); } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.Logger.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.Logger.cs index 066d4543fd9b..6238c68862bc 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.Logger.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.Logger.cs @@ -16,8 +16,15 @@ private class Logger LoggerMessage.Define(LogLevel.Debug, new EventId(300, nameof(ScanBlobLogs)), "Log scan for recent blob updates in container '{containerName}' with PollId '{pollId}' found {blobCount} blobs."); + private static readonly Action, string, Exception> _loggingNotEnabledOnTargetAccount = + LoggerMessage.Define(LogLevel.Debug, new EventId(4, nameof(LoggingNotEnabledOnTargetAccount)), + "Logging not enabled on target blob storage account, '{targetAccountUri}'"); + public static void ScanBlobLogs(ILogger logger, string containerName, string pollId, int blobCount) => _scanBlobLogs(logger, containerName, pollId, blobCount, null); + + public static void LoggingNotEnabledOnTargetAccount(ILogger logger, string targetAccountUri) => + _loggingNotEnabledOnTargetAccount(logger, targetAccountUri, null); } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.cs index 5f0677a03f99..41160d3374a2 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.cs @@ -49,7 +49,11 @@ public PollLogsStrategy(IWebJobsExceptionHandler exceptionHandler, ILogger triggerExecutor, + public async Task RegisterAsync( + BlobServiceClient primaryBlobServiceClient, + BlobServiceClient targetBlobServiceClient, + BlobContainerClient container, + ITriggerExecutor triggerExecutor, CancellationToken cancellationToken) { ThrowIfDisposed(); @@ -76,10 +80,28 @@ public async Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContain containerRegistrations.Add(triggerExecutor); - if (!_logListeners.ContainsKey(blobServiceClient)) + if (targetBlobServiceClient == default) { - BlobLogListener logListener = await BlobLogListener.CreateAsync(blobServiceClient, _logger, cancellationToken).ConfigureAwait(false); - _logListeners.Add(blobServiceClient, logListener); + // If no target client is specified, use the primary. + BlobLogListener logListener = await BlobLogListener.CreateAsync(primaryBlobServiceClient, _logger, cancellationToken).ConfigureAwait(false); + _logListeners.Add(primaryBlobServiceClient, logListener); + } + else if (!_logListeners.ContainsKey(targetBlobServiceClient)) + { + try + { + BlobLogListener logListener = await BlobLogListener.CreateAsync(targetBlobServiceClient, _logger, cancellationToken).ConfigureAwait(false); + _logListeners.Add(targetBlobServiceClient, logListener); + } + // TODO: verify if this is the only permissions error code, or other possible permissions errors + catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.AuthorizationPermissionMismatch) + { + Logger.LoggingNotEnabledOnTargetAccount(_logger, targetBlobServiceClient.Uri.AbsoluteUri); + + // Fallback to primary client if target client does not have the permissions to be used. + BlobLogListener logListener = await BlobLogListener.CreateAsync(primaryBlobServiceClient, _logger, cancellationToken).ConfigureAwait(false); + _logListeners.Add(primaryBlobServiceClient, logListener); + } } } @@ -256,5 +278,22 @@ private void ThrowIfDisposed() throw new ObjectDisposedException(null); } } + + private async Task CheckLoggingEnabledAsync(BlobServiceClient blobClient, CancellationToken cancellationToken) + { + BlobServiceProperties serviceProperties = await blobClient.GetPropertiesAsync(cancellationToken).ConfigureAwait(false); + + // Retrieve the logging settings. + BlobAnalyticsLogging loggingProperties = serviceProperties.Logging; + + if (!loggingProperties.Write) + { + // Log an error if logging is not enabled. + Logger.LoggingNotEnabledOnTargetAccount(_logger, blobClient.Uri.AbsoluteUri); + return false; + } + // Logging is enabled. + return true; + } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/ScanBlobScanLogHybridPollingStrategy.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/ScanBlobScanLogHybridPollingStrategy.cs index 791ce800da4b..cde723b71964 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/ScanBlobScanLogHybridPollingStrategy.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/ScanBlobScanLogHybridPollingStrategy.cs @@ -61,19 +61,29 @@ public void Cancel() _cancellationTokenSource.Cancel(); } - public async Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClient container, ITriggerExecutor triggerExecutor, CancellationToken cancellationToken) + public async Task RegisterAsync( + BlobServiceClient primaryBlobServiceClient, + BlobServiceClient targetBlobServiceClient, + BlobContainerClient container, + ITriggerExecutor triggerExecutor, + CancellationToken cancellationToken) { // Register and Execute are not concurrency-safe. // Avoiding calling Register while Execute is running is the caller's responsibility. ThrowIfDisposed(); // Register all in logPolling, there is no problem if we get 2 notifications of the new blob - await _pollLogStrategy.RegisterAsync(blobServiceClient, container, triggerExecutor, cancellationToken).ConfigureAwait(false); + await _pollLogStrategy.RegisterAsync( + primaryBlobServiceClient, + targetBlobServiceClient, + container, + triggerExecutor, + cancellationToken).ConfigureAwait(false); if (!_scanInfo.TryGetValue(container, out ContainerScanInfo containerScanInfo)) { // First, try to load serialized scanInfo for this container. - DateTime? latestStoredScan = await _blobScanInfoManager.LoadLatestScanAsync(blobServiceClient.AccountName, container.Name).ConfigureAwait(false); + DateTime? latestStoredScan = await _blobScanInfoManager.LoadLatestScanAsync(primaryBlobServiceClient.AccountName, container.Name).ConfigureAwait(false); containerScanInfo = new ContainerScanInfo() { diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/ScanContainersStrategy.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/ScanContainersStrategy.cs index 17a163ab9a32..e029e78ca9a3 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/ScanContainersStrategy.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/ScanContainersStrategy.cs @@ -40,7 +40,11 @@ public void Notify(BlobWithContainer blobWritten) _blobWrittenNotifications.Enqueue(blobWritten); } - public Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClient container, ITriggerExecutor triggerExecutor, + public Task RegisterAsync( + BlobServiceClient primaryBlobServiceClient, + BlobServiceClient targetBlobServiceClient, + BlobContainerClient container, + ITriggerExecutor triggerExecutor, CancellationToken cancellationToken) { // Register and Execute are not concurrency-safe. diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobListener.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobListener.cs index 2aede2bca163..14e669a87698 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobListener.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobListener.cs @@ -44,7 +44,11 @@ public IBlobWrittenWatcher BlobWritterWatcher get { return _strategy; } } - public Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClient container, ITriggerExecutor triggerExecutor, + public Task RegisterAsync( + BlobServiceClient primaryBlobServiceClient, + BlobServiceClient targetBlobServiceClient, + BlobContainerClient container, + ITriggerExecutor triggerExecutor, CancellationToken cancellationToken) { if (_started) @@ -53,7 +57,12 @@ public Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClie "Registrations may not be added while the shared listener is running."); } - return _strategy.RegisterAsync(blobServiceClient, container, triggerExecutor, cancellationToken); + return _strategy.RegisterAsync( + primaryBlobServiceClient, + targetBlobServiceClient, + container, + triggerExecutor, + cancellationToken); } public Task EnsureAllStartedAsync(CancellationToken cancellationToken) diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobNotificationStrategyExtensions.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobNotificationStrategyExtensions.cs index 49d89f65d7bc..95fbf2f68201 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobNotificationStrategyExtensions.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobNotificationStrategyExtensions.cs @@ -21,7 +21,11 @@ public static TaskSeriesCommandResult Execute(this IBlobListenerStrategy strateg return strategy.ExecuteAsync(CancellationToken.None).GetAwaiter().GetResult(); } - public static void Register(this IBlobListenerStrategy strategy, BlobServiceClient blobServiceClient, BlobContainerClient container, + public static void Register( + this IBlobListenerStrategy strategy, + BlobServiceClient primaryBlobServiceClient, + BlobServiceClient targetBlobServiceClient, + BlobContainerClient container, ITriggerExecutor triggerExecutor) { if (strategy == null) @@ -29,7 +33,12 @@ public static void Register(this IBlobListenerStrategy strategy, BlobServiceClie throw new ArgumentNullException("strategy"); } - strategy.RegisterAsync(blobServiceClient, container, triggerExecutor, CancellationToken.None).GetAwaiter().GetResult(); + strategy.RegisterAsync( + primaryBlobServiceClient, + targetBlobServiceClient, + container, + triggerExecutor, + CancellationToken.None).GetAwaiter().GetResult(); } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/ScanBlobScanLogHybridPollingStrategyTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/ScanBlobScanLogHybridPollingStrategyTests.cs index 4f13d0661b42..4a02e4aea1e9 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/ScanBlobScanLogHybridPollingStrategyTests.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/ScanBlobScanLogHybridPollingStrategyTests.cs @@ -106,7 +106,11 @@ public void ScanBlobScanLogHybridPollingStrategyTestBlobListener() var container = _blobContainerMock.Object; IBlobListenerStrategy product = new ScanBlobScanLogHybridPollingStrategy(new TestBlobScanInfoManager(), _exceptionHandler, _logger); LambdaBlobTriggerExecutor executor = new LambdaBlobTriggerExecutor(); - product.Register(_blobClientMock.Object, container, executor); + product.Register( + _blobClientMock.Object, + _blobClientMock.Object, + container, + executor); product.Start(); RunExecuterWithExpectedBlobs(new List(), product, executor); @@ -163,7 +167,11 @@ public void TestBlobListenerWithContainerBiggerThanThreshold() .GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic) .SetValue(product, testScanBlobLimitPerPoll); - product.Register(_blobClientMock.Object, container, executor); + product.Register( + _blobClientMock.Object, + _blobClientMock.Object, + container, + executor); product.Start(); // populate with 5 blobs @@ -191,8 +199,16 @@ public void TestBlobListenerWithMultipleContainers() .GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic) .SetValue(product, testScanBlobLimitPerPoll); - product.Register(_blobClientMock.Object, firstContainer, executor); - product.Register(_blobClientMock.Object, secondContainer, executor); + product.Register( + _blobClientMock.Object, + _blobClientMock.Object, + firstContainer, + executor); + product.Register( + _blobClientMock.Object, + _blobClientMock.Object, + secondContainer, + executor); product.Start(); // populate first container with 5 blobs > page size and second with 2 blobs < page size @@ -232,7 +248,11 @@ public void BlobPolling_IgnoresClockSkew() .GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic) .SetValue(product, testScanBlobLimitPerPoll); - product.Register(_blobClientMock.Object, container, executor); + product.Register( + _blobClientMock.Object, + _blobClientMock.Object, + container, + executor); product.Start(); List expectedNames = new List(); @@ -271,7 +291,11 @@ public void BlobPolling_IncludesPreviousBatch() .GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic) .SetValue(product, testScanBlobLimitPerPoll); - product.Register(_blobClientMock.Object, container, executor); + product.Register( + _blobClientMock.Object, + _blobClientMock.Object, + container, + executor); product.Start(); List expectedNames = new List(); @@ -307,7 +331,12 @@ public async Task RegisterAsync_InitializesWithScanInfoManager() await Task.Delay(10); await scanInfoManager.UpdateLatestScanAsync(AccountName, ContainerName, DateTime.UtcNow); - await product.RegisterAsync(_blobClientMock.Object, container, executor, CancellationToken.None); + await product.RegisterAsync( + _blobClientMock.Object, + _blobClientMock.Object, + container, + executor, + CancellationToken.None); // delay slightly so we guarantee a later timestamp await Task.Delay(10); @@ -334,8 +363,18 @@ public async Task ExecuteAsync_UpdatesScanInfoManager() .GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic) .SetValue(product, testScanBlobLimitPerPoll); - await product.RegisterAsync(_blobClientMock.Object, firstContainer, executor, CancellationToken.None); - await product.RegisterAsync(_blobClientMock.Object, secondContainer, executor, CancellationToken.None); + await product.RegisterAsync( + _blobClientMock.Object, + _blobClientMock.Object, + firstContainer, + executor, + CancellationToken.None); + await product.RegisterAsync( + _blobClientMock.Object, + _blobClientMock.Object, + secondContainer, + executor, + CancellationToken.None); var firstExpectedNames = new List(); for (int i = 0; i < 3; i++) @@ -383,7 +422,12 @@ public async Task ExecuteAsync_UpdatesScanInfo_WithEarliestFailure() .GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic) .SetValue(product, testScanBlobLimitPerPoll); - await product.RegisterAsync(_blobClientMock.Object, container, executor, CancellationToken.None); + await product.RegisterAsync( + _blobClientMock.Object, + _blobClientMock.Object, + container, + executor, + CancellationToken.None); // Induce a failure to make sure the timestamp is earlier than the failure. var expectedNames = new List(); diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/ScanContainersStrategyTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/ScanContainersStrategyTests.cs index 48c9997aac42..7af669204e5c 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/ScanContainersStrategyTests.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/ScanContainersStrategyTests.cs @@ -31,7 +31,11 @@ public async Task TestBlobListener() var container = blobServiceClient.GetBlobContainerClient(ContainerName); IBlobListenerStrategy product = new ScanContainersStrategy(); LambdaBlobTriggerExecutor executor = new LambdaBlobTriggerExecutor(); - product.Register(blobServiceClient, container, executor); + product.Register( + blobServiceClient, + blobServiceClient, + container, + executor); product.Start(); executor.ExecuteLambda = (_) =>