Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,25 @@ namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Triggers;

/// <summary>
/// Factory used to create ITriggerBinding instances.
/// It's TryCreateAsync method is called by the runtime for all job parameters, giving it a chance to return a binding.
/// Please see <see href="https://github.com/Azure/azure-webjobs-sdk-extensions/wiki/Trigger-Binding-Extensions#binding-provider">Trigger Binding Extensions</see>
/// </summary>
class EdgeHubTriggerBindingProvider : ITriggerBindingProvider
public class EdgeHubTriggerBindingProvider : ITriggerBindingProvider
{
readonly INameResolver nameResolver;
readonly ConcurrentDictionary<string, IList<EdgeHubMessageProcessor>> receivers = new ConcurrentDictionary<string, IList<EdgeHubMessageProcessor>>();
ModuleClient moduleClient;

public EdgeHubTriggerBindingProvider(INameResolver nameResolver)
{
this.nameResolver = nameResolver;
}

public async Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
{
if (context == null)
Expand All @@ -33,13 +40,15 @@ public async Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext
return null;
}

var inputName = (this.nameResolver?.ResolveWholeString(attribute.InputName) ?? attribute.InputName).ToLowerInvariant();

await this.TrySetEventDefaultHandlerAsync();

var messageProcessor = new EdgeHubMessageProcessor();
var triggerBinding = new EdgeHubTriggerBinding(context.Parameter, messageProcessor);

this.receivers.AddOrUpdate(
attribute.InputName.ToLowerInvariant(),
inputName,
// The function used to generate a value for an absent.
// Creates a new List and adds the message processor
(k) => new List<EdgeHubMessageProcessor>()
Expand Down Expand Up @@ -70,8 +79,10 @@ async Task TrySetEventDefaultHandlerAsync()

async Task<MessageResponse> FunctionsMessageHandler(Message message, object userContext)
{
var inputName = message.InputName.ToLowerInvariant();
byte[] payload = message.GetBytes();
if (this.receivers.TryGetValue(message.InputName.ToLowerInvariant(), out IList<EdgeHubMessageProcessor> functionReceivers))

if (this.receivers.TryGetValue(inputName, out IList<EdgeHubMessageProcessor> functionReceivers))
{
foreach (EdgeHubMessageProcessor edgeHubTriggerBinding in functionReceivers)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.36.6" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.27" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.33" />
<PackageReference Include="System.Text.Encodings.Web" Version="4.7.2" />
<!--
Newtonsoft.Json < 13.0.1 has a vulnerability due to insecure defaults.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@ namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub.Config
/// Extension configuration provider used to register EdgeHub triggers and binders
/// </summary>
[Extension("EdgeHub")]
class EdgeHubExtensionConfigProvider : IExtensionConfigProvider
public class EdgeHubExtensionConfigProvider : IExtensionConfigProvider
{
readonly INameResolver nameResolver;

public EdgeHubExtensionConfigProvider(INameResolver nameResolver)
{
this.nameResolver = nameResolver;
}

public void Initialize(ExtensionConfigContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}

var bindingProvider = new EdgeHubTriggerBindingProvider();
var rule = context.AddBindingRule<EdgeHubTriggerAttribute>();
rule.AddConverter<Message, string>(ConvertMessageToString);
rule.AddConverter<Message, byte[]>(ConvertMessageToBytes);
rule.BindToTrigger<Message>(bindingProvider);
rule.BindToTrigger<Message>(new EdgeHubTriggerBindingProvider(this.nameResolver));

var rule2 = context.AddBindingRule<EdgeHubAttribute>();
rule2.BindToCollector<Message>(typeof(EdgeHubCollectorBuilder));
Expand Down