Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,13 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
new SharedBlobListenerFactory(hostId, _hostBlobServiceClient, _exceptionHandler, _blobWrittenWatcherSetter, _loggerFactory.CreateLogger<BlobListener>()));

// Register the blob container we wish to monitor with the shared blob listener.
await RegisterWithSharedBlobListenerAsync(hostId, sharedBlobListener, primaryBlobClient,
blobTriggerQueueWriter, cancellationToken).ConfigureAwait(false);
await RegisterWithSharedBlobListenerAsync(
hostId,
sharedBlobListener,
primaryBlobClient,
targetBlobClient,
blobTriggerQueueWriter,
cancellationToken).ConfigureAwait(false);
}

// Create a "bridge" listener that will monitor the blob
Expand Down Expand Up @@ -165,14 +170,20 @@ await RegisterWithSharedBlobListenerAsync(hostId, sharedBlobListener, primaryBlo
private async Task RegisterWithSharedBlobListenerAsync(
string hostId,
SharedBlobListener sharedBlobListener,
BlobServiceClient blobClient,
BlobServiceClient primaryBlobClient,
BlobServiceClient targetBlobClient,
BlobTriggerQueueWriter blobTriggerQueueWriter,
CancellationToken cancellationToken)
{
BlobTriggerExecutor triggerExecutor = new BlobTriggerExecutor(hostId, _functionDescriptor, _input, new BlobReceiptManager(blobClient),
BlobTriggerExecutor triggerExecutor = new BlobTriggerExecutor(hostId, _functionDescriptor, _input, new BlobReceiptManager(primaryBlobClient),
blobTriggerQueueWriter, _loggerFactory.CreateLogger<BlobListener>());

await sharedBlobListener.RegisterAsync(blobClient, _container, triggerExecutor, cancellationToken).ConfigureAwait(false);
await sharedBlobListener.RegisterAsync(
primaryBlobClient,
targetBlobClient,
_container,
triggerExecutor,
cancellationToken).ConfigureAwait(false);
}

private void RegisterWithSharedBlobQueueListenerAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
{
internal interface IBlobNotificationStrategy : ITaskSeriesCommand, IBlobWrittenWatcher
{
Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClient container, ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
Task RegisterAsync(
BlobServiceClient primaryBlobServiceClient,
BlobServiceClient targetBlobServiceClient,
BlobContainerClient container,
ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ private class Logger
LoggerMessage.Define<string, string, int>(LogLevel.Debug, new EventId(300, nameof(ScanBlobLogs)),
"Log scan for recent blob updates in container '{containerName}' with PollId '{pollId}' found {blobCount} blobs.");

private static readonly Action<ILogger<BlobListener>, string, Exception> _loggingNotEnabledOnTargetAccount =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(4, nameof(LoggingNotEnabledOnTargetAccount)),
"Logging not enabled on target blob storage account, '{targetAccountUri}'");

public static void ScanBlobLogs(ILogger<BlobListener> logger, string containerName, string pollId, int blobCount) =>
_scanBlobLogs(logger, containerName, pollId, blobCount, null);

public static void LoggingNotEnabledOnTargetAccount(ILogger<BlobListener> logger, string targetAccountUri) =>
_loggingNotEnabledOnTargetAccount(logger, targetAccountUri, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ public PollLogsStrategy(IWebJobsExceptionHandler exceptionHandler, ILogger<BlobL
_exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler));
}

public async Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClient container, ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
public async Task RegisterAsync(
BlobServiceClient primaryBlobServiceClient,
BlobServiceClient targetBlobServiceClient,
BlobContainerClient container,
ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
Expand All @@ -76,10 +80,28 @@ public async Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContain

containerRegistrations.Add(triggerExecutor);

if (!_logListeners.ContainsKey(blobServiceClient))
if (targetBlobServiceClient == default)
{
BlobLogListener logListener = await BlobLogListener.CreateAsync(blobServiceClient, _logger, cancellationToken).ConfigureAwait(false);
_logListeners.Add(blobServiceClient, logListener);
// If no target client is specified, use the primary.
BlobLogListener logListener = await BlobLogListener.CreateAsync(primaryBlobServiceClient, _logger, cancellationToken).ConfigureAwait(false);
_logListeners.Add(primaryBlobServiceClient, logListener);
}
else if (!_logListeners.ContainsKey(targetBlobServiceClient))
{
try
{
BlobLogListener logListener = await BlobLogListener.CreateAsync(targetBlobServiceClient, _logger, cancellationToken).ConfigureAwait(false);
_logListeners.Add(targetBlobServiceClient, logListener);
}
// TODO: verify if this is the only permissions error code, or other possible permissions errors
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.AuthorizationPermissionMismatch)
{
Logger.LoggingNotEnabledOnTargetAccount(_logger, targetBlobServiceClient.Uri.AbsoluteUri);

// Fallback to primary client if target client does not have the permissions to be used.
BlobLogListener logListener = await BlobLogListener.CreateAsync(primaryBlobServiceClient, _logger, cancellationToken).ConfigureAwait(false);
_logListeners.Add(primaryBlobServiceClient, logListener);
}
}
}

Expand Down Expand Up @@ -256,5 +278,22 @@ private void ThrowIfDisposed()
throw new ObjectDisposedException(null);
}
}

private async Task<bool> CheckLoggingEnabledAsync(BlobServiceClient blobClient, CancellationToken cancellationToken)
{
BlobServiceProperties serviceProperties = await blobClient.GetPropertiesAsync(cancellationToken).ConfigureAwait(false);

// Retrieve the logging settings.
BlobAnalyticsLogging loggingProperties = serviceProperties.Logging;

if (!loggingProperties.Write)
{
// Log an error if logging is not enabled.
Logger.LoggingNotEnabledOnTargetAccount(_logger, blobClient.Uri.AbsoluteUri);
return false;
}
// Logging is enabled.
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,29 @@ public void Cancel()
_cancellationTokenSource.Cancel();
}

public async Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClient container, ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor, CancellationToken cancellationToken)
public async Task RegisterAsync(
BlobServiceClient primaryBlobServiceClient,
BlobServiceClient targetBlobServiceClient,
BlobContainerClient container,
ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
CancellationToken cancellationToken)
{
// Register and Execute are not concurrency-safe.
// Avoiding calling Register while Execute is running is the caller's responsibility.
ThrowIfDisposed();

// Register all in logPolling, there is no problem if we get 2 notifications of the new blob
await _pollLogStrategy.RegisterAsync(blobServiceClient, container, triggerExecutor, cancellationToken).ConfigureAwait(false);
await _pollLogStrategy.RegisterAsync(
primaryBlobServiceClient,
targetBlobServiceClient,
container,
triggerExecutor,
cancellationToken).ConfigureAwait(false);

if (!_scanInfo.TryGetValue(container, out ContainerScanInfo containerScanInfo))
{
// First, try to load serialized scanInfo for this container.
DateTime? latestStoredScan = await _blobScanInfoManager.LoadLatestScanAsync(blobServiceClient.AccountName, container.Name).ConfigureAwait(false);
DateTime? latestStoredScan = await _blobScanInfoManager.LoadLatestScanAsync(primaryBlobServiceClient.AccountName, container.Name).ConfigureAwait(false);

containerScanInfo = new ContainerScanInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ public void Notify(BlobWithContainer<BlobBaseClient> blobWritten)
_blobWrittenNotifications.Enqueue(blobWritten);
}

public Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClient container, ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
public Task RegisterAsync(
BlobServiceClient primaryBlobServiceClient,
BlobServiceClient targetBlobServiceClient,
BlobContainerClient container,
ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
CancellationToken cancellationToken)
{
// Register and Execute are not concurrency-safe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ public IBlobWrittenWatcher BlobWritterWatcher
get { return _strategy; }
}

public Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClient container, ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
public Task RegisterAsync(
BlobServiceClient primaryBlobServiceClient,
BlobServiceClient targetBlobServiceClient,
BlobContainerClient container,
ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
CancellationToken cancellationToken)
{
if (_started)
Expand All @@ -53,7 +57,12 @@ public Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClie
"Registrations may not be added while the shared listener is running.");
}

return _strategy.RegisterAsync(blobServiceClient, container, triggerExecutor, cancellationToken);
return _strategy.RegisterAsync(
primaryBlobServiceClient,
targetBlobServiceClient,
container,
triggerExecutor,
cancellationToken);
}

public Task EnsureAllStartedAsync(CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,24 @@ public static TaskSeriesCommandResult Execute(this IBlobListenerStrategy strateg
return strategy.ExecuteAsync(CancellationToken.None).GetAwaiter().GetResult();
}

public static void Register(this IBlobListenerStrategy strategy, BlobServiceClient blobServiceClient, BlobContainerClient container,
public static void Register(
this IBlobListenerStrategy strategy,
BlobServiceClient primaryBlobServiceClient,
BlobServiceClient targetBlobServiceClient,
BlobContainerClient container,
ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor)
{
if (strategy == null)
{
throw new ArgumentNullException("strategy");
}

strategy.RegisterAsync(blobServiceClient, container, triggerExecutor, CancellationToken.None).GetAwaiter().GetResult();
strategy.RegisterAsync(
primaryBlobServiceClient,
targetBlobServiceClient,
container,
triggerExecutor,
CancellationToken.None).GetAwaiter().GetResult();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ public void ScanBlobScanLogHybridPollingStrategyTestBlobListener()
var container = _blobContainerMock.Object;
IBlobListenerStrategy product = new ScanBlobScanLogHybridPollingStrategy(new TestBlobScanInfoManager(), _exceptionHandler, _logger);
LambdaBlobTriggerExecutor executor = new LambdaBlobTriggerExecutor();
product.Register(_blobClientMock.Object, container, executor);
product.Register(
_blobClientMock.Object,
_blobClientMock.Object,
container,
executor);
product.Start();

RunExecuterWithExpectedBlobs(new List<string>(), product, executor);
Expand Down Expand Up @@ -163,7 +167,11 @@ public void TestBlobListenerWithContainerBiggerThanThreshold()
.GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic)
.SetValue(product, testScanBlobLimitPerPoll);

product.Register(_blobClientMock.Object, container, executor);
product.Register(
_blobClientMock.Object,
_blobClientMock.Object,
container,
executor);
product.Start();

// populate with 5 blobs
Expand Down Expand Up @@ -191,8 +199,16 @@ public void TestBlobListenerWithMultipleContainers()
.GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic)
.SetValue(product, testScanBlobLimitPerPoll);

product.Register(_blobClientMock.Object, firstContainer, executor);
product.Register(_blobClientMock.Object, secondContainer, executor);
product.Register(
_blobClientMock.Object,
_blobClientMock.Object,
firstContainer,
executor);
product.Register(
_blobClientMock.Object,
_blobClientMock.Object,
secondContainer,
executor);
product.Start();

// populate first container with 5 blobs > page size and second with 2 blobs < page size
Expand Down Expand Up @@ -232,7 +248,11 @@ public void BlobPolling_IgnoresClockSkew()
.GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic)
.SetValue(product, testScanBlobLimitPerPoll);

product.Register(_blobClientMock.Object, container, executor);
product.Register(
_blobClientMock.Object,
_blobClientMock.Object,
container,
executor);
product.Start();

List<string> expectedNames = new List<string>();
Expand Down Expand Up @@ -271,7 +291,11 @@ public void BlobPolling_IncludesPreviousBatch()
.GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic)
.SetValue(product, testScanBlobLimitPerPoll);

product.Register(_blobClientMock.Object, container, executor);
product.Register(
_blobClientMock.Object,
_blobClientMock.Object,
container,
executor);
product.Start();

List<string> expectedNames = new List<string>();
Expand Down Expand Up @@ -307,7 +331,12 @@ public async Task RegisterAsync_InitializesWithScanInfoManager()
await Task.Delay(10);

await scanInfoManager.UpdateLatestScanAsync(AccountName, ContainerName, DateTime.UtcNow);
await product.RegisterAsync(_blobClientMock.Object, container, executor, CancellationToken.None);
await product.RegisterAsync(
_blobClientMock.Object,
_blobClientMock.Object,
container,
executor,
CancellationToken.None);

// delay slightly so we guarantee a later timestamp
await Task.Delay(10);
Expand All @@ -334,8 +363,18 @@ public async Task ExecuteAsync_UpdatesScanInfoManager()
.GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic)
.SetValue(product, testScanBlobLimitPerPoll);

await product.RegisterAsync(_blobClientMock.Object, firstContainer, executor, CancellationToken.None);
await product.RegisterAsync(_blobClientMock.Object, secondContainer, executor, CancellationToken.None);
await product.RegisterAsync(
_blobClientMock.Object,
_blobClientMock.Object,
firstContainer,
executor,
CancellationToken.None);
await product.RegisterAsync(
_blobClientMock.Object,
_blobClientMock.Object,
secondContainer,
executor,
CancellationToken.None);

var firstExpectedNames = new List<string>();
for (int i = 0; i < 3; i++)
Expand Down Expand Up @@ -383,7 +422,12 @@ public async Task ExecuteAsync_UpdatesScanInfo_WithEarliestFailure()
.GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic)
.SetValue(product, testScanBlobLimitPerPoll);

await product.RegisterAsync(_blobClientMock.Object, container, executor, CancellationToken.None);
await product.RegisterAsync(
_blobClientMock.Object,
_blobClientMock.Object,
container,
executor,
CancellationToken.None);

// Induce a failure to make sure the timestamp is earlier than the failure.
var expectedNames = new List<string>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ public async Task TestBlobListener()
var container = blobServiceClient.GetBlobContainerClient(ContainerName);
IBlobListenerStrategy product = new ScanContainersStrategy();
LambdaBlobTriggerExecutor executor = new LambdaBlobTriggerExecutor();
product.Register(blobServiceClient, container, executor);
product.Register(
blobServiceClient,
blobServiceClient,
container,
executor);
product.Start();

executor.ExecuteLambda = (_) =>
Expand Down