Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 2 additions & 51 deletions backends/gcppubsub/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,16 @@ package gcppubsub

import (
"context"
"fmt"
"sync"
"time"

"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
sdk "github.com/streamdal/streamdal/sdks/go"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/streamdal/plumber/backends/gcppubsub/types"
"github.com/streamdal/plumber/util"

"github.com/streamdal/plumber/prometheus"
"github.com/streamdal/plumber/validate"
)
Expand All @@ -33,14 +28,6 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel
"backend": "gcp-pubsub",
})

// streamdal sdk BEGIN
sc, err := util.SetupStreamdalSDK(relayOpts, llog)
if err != nil {
return errors.Wrap(err, "kafka.Relay(): unable to create new streamdal client")
}
// defer sc.Close()
// streamdal sdk END

var m sync.Mutex

var readFunc = func(ctx context.Context, msg *pubsub.Message) {
Expand All @@ -53,42 +40,6 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel

prometheus.Incr("gcp-pubsub-relay-consumer", 1)

// streamdal sdk BEGIN
// If streamdal integration is enabled, process message via sdk
if sc != nil {
g.log.Debug("Processing message via streamdal SDK")

operationName := "relay"

if relayOpts != nil && relayOpts.GcpPubsub != nil && relayOpts.GcpPubsub.GetArgs() != nil {
if relayOpts.GcpPubsub.GetArgs().SubscriptionId == "" {
operationName = "relay-unknown-subid"
} else {
operationName = "relay-" + relayOpts.GcpPubsub.GetArgs().SubscriptionId
}
}

resp := sc.Process(ctx, &sdk.ProcessRequest{
ComponentName: "gcp-pubsub",
OperationType: sdk.OperationTypeConsumer,
OperationName: operationName,
Data: msg.Data,
})

if resp.Status == sdk.ExecStatusError {
wrappedErr := fmt.Errorf("unable to process message via streamdal: %v", resp.StatusMessage)

prometheus.IncrPromCounter("plumber_sdk_errors", 1)
util.WriteError(llog, errorCh, wrappedErr)

return
}

// Update msg value with processed data
msg.Data = resp.Data
}
// streamdal sdk END

g.log.Debug("Writing message to relay channel")

relayCh <- &types.RelayMessage{
Expand Down
36 changes: 0 additions & 36 deletions backends/kafka/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/streamdal/plumber/prometheus"
"github.com/streamdal/plumber/util"
"github.com/streamdal/plumber/validate"

sdk "github.com/streamdal/streamdal/sdks/go"
)

const (
Expand All @@ -36,14 +34,6 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh

defer reader.Close()

// streamdal sdk BEGIN
sc, err := util.SetupStreamdalSDK(relayOpts, k.log)
if err != nil {
return errors.Wrap(err, "kafka.Relay(): unable to create new streamdal client")
}
// defer sc.Close()
// streamdal sdk END

llog := k.log.WithFields(logrus.Fields{
"relay-id": relayOpts.XRelayId,
"backend": "kafka",
Expand Down Expand Up @@ -74,32 +64,6 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh

prometheus.Incr("kafka-relay-consumer", 1)

// streamdal sdk BEGIN
// If streamdal integration is enabled, process message via sdk
if sc != nil {
k.log.Debug("Processing message via streamdal SDK")

resp := sc.Process(ctx, &sdk.ProcessRequest{
ComponentName: "kafka",
OperationType: sdk.OperationTypeConsumer,
OperationName: "relay",
Data: msg.Value,
})

if resp.Status == sdk.ExecStatusError {
wrappedErr := fmt.Errorf("unable to process message via streamdal: %v", resp.StatusMessage)

prometheus.IncrPromCounter("plumber_sdk_errors", 1)
util.WriteError(llog, errorCh, wrappedErr)

continue
}

// Update msg value with processed data
msg.Value = resp.Data
}
// streamdal sdk END

k.log.Debugf("Writing Kafka message to relay channel: %s", msg.Value)

relayCh <- &types.RelayMessage{
Expand Down
154 changes: 0 additions & 154 deletions backends/streamdal/auth.go

This file was deleted.

63 changes: 0 additions & 63 deletions backends/streamdal/auth_test.go

This file was deleted.

Loading
Loading