From 7e9b12416e3b5a582f2ed8188af89afafd93a01f Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Tue, 18 Mar 2025 16:24:01 +0000 Subject: [PATCH] examples: Add streaming example --- examples/streaming/.gitignore | 1 + examples/streaming/README.md | 38 ++ examples/streaming/main.go | 87 ++++ examples/streaming/plugin/.gitignore | 1 + examples/streaming/plugin/plugin.go | 82 ++++ examples/streaming/proto/streamer.pb.go | 478 +++++++++++++++++++ examples/streaming/proto/streamer.proto | 34 ++ examples/streaming/proto/streamer_grpc.pb.go | 248 ++++++++++ examples/streaming/shared/client.go | 77 +++ examples/streaming/shared/interface.go | 14 + examples/streaming/shared/server.go | 90 ++++ 11 files changed, 1150 insertions(+) create mode 100644 examples/streaming/.gitignore create mode 100644 examples/streaming/README.md create mode 100644 examples/streaming/main.go create mode 100644 examples/streaming/plugin/.gitignore create mode 100644 examples/streaming/plugin/plugin.go create mode 100644 examples/streaming/proto/streamer.pb.go create mode 100644 examples/streaming/proto/streamer.proto create mode 100644 examples/streaming/proto/streamer_grpc.pb.go create mode 100644 examples/streaming/shared/client.go create mode 100644 examples/streaming/shared/interface.go create mode 100644 examples/streaming/shared/server.go diff --git a/examples/streaming/.gitignore b/examples/streaming/.gitignore new file mode 100644 index 00000000..20f11a55 --- /dev/null +++ b/examples/streaming/.gitignore @@ -0,0 +1 @@ +myfile diff --git a/examples/streaming/README.md b/examples/streaming/README.md new file mode 100644 index 00000000..07a99ab6 --- /dev/null +++ b/examples/streaming/README.md @@ -0,0 +1,38 @@ +# gRPC streaming Example + +This example builds a plugin & client which can stream larger amount of data +between them while staying below reasonable message size limits of the gRPC +protocol. + +## To execute + +Build the plugin + +``` +go build -o ./plugin/streamer ./plugin +``` + +launch client + +``` +go run main.go myfile +``` + +## To re-generate protobuf definitions + +Install protobuf tooling + +``` +brew install protobuf +``` +``` +go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.1 +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3.0 +``` + +generate files + +``` +cd proto +protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative streamer.proto +``` \ No newline at end of file diff --git a/examples/streaming/main.go b/examples/streaming/main.go new file mode 100644 index 00000000..7bfac5ad --- /dev/null +++ b/examples/streaming/main.go @@ -0,0 +1,87 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/exec" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/go-plugin/examples/streaming/shared" + "google.golang.org/grpc" +) + +func main() { + if len(os.Args) != 2 { + log.Fatal("expected path to file as an argument") + } + path := os.Args[1] + + logger := hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + Output: os.Stderr, + JSONFormat: true, + }) + + msgSizeLimit := 1000 + chunkSize := 10 + + client := plugin.NewClient(&plugin.ClientConfig{ + HandshakeConfig: plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "BASIC_PLUGIN", + MagicCookieValue: "hello", + }, + Plugins: map[string]plugin.Plugin{ + "streamer": &shared.StreamerPlugin{}, + }, + Cmd: exec.Command("./plugin/streamer"), + AllowedProtocols: []plugin.Protocol{ + plugin.ProtocolGRPC, + }, + Logger: logger, + GRPCDialOptions: []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSizeLimit)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(msgSizeLimit)), + }, + }) + defer client.Kill() + + logger.Debug("launching a client") + + rpcClient, err := client.Client() + if err != nil { + log.Fatal(err) + } + + raw, err := rpcClient.Dispense("streamer") + if err != nil { + log.Fatal(err) + } + + ctx := context.Background() + + streamer := raw.(shared.Streamer) + err = streamer.Configure(ctx, path, int64(chunkSize)) + if err != nil { + log.Fatal(err) + } + + err = streamer.Write(ctx, []byte("Lorem ipsum dolor sit amet")) + if err != nil { + log.Fatal(err) + } + + logger.Debug("writing finished") + + b, err := streamer.Read(ctx) + if err != nil { + log.Fatal(err) + } + logger.Debug(fmt.Sprintf("received %d bytes", len(b)), "bytes", string(b)) +} diff --git a/examples/streaming/plugin/.gitignore b/examples/streaming/plugin/.gitignore new file mode 100644 index 00000000..c0f62896 --- /dev/null +++ b/examples/streaming/plugin/.gitignore @@ -0,0 +1 @@ +streamer diff --git a/examples/streaming/plugin/plugin.go b/examples/streaming/plugin/plugin.go new file mode 100644 index 00000000..15e80c82 --- /dev/null +++ b/examples/streaming/plugin/plugin.go @@ -0,0 +1,82 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package main + +import ( + "context" + "io" + "os" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/go-plugin/examples/streaming/shared" +) + +type FileStreamer struct { + logger hclog.Logger + path string +} + +func (fs *FileStreamer) Configure(ctx context.Context, path string, _ int64) error { + fs.path = path + return nil +} + +func (fs *FileStreamer) Read(ctx context.Context) ([]byte, error) { + fs.logger.Debug("FileStreamer: Read", "path", fs.path) + f, err := os.OpenFile(fs.path, os.O_RDONLY, 0644) + if err != nil { + return nil, err + } + defer f.Close() + return io.ReadAll(f) +} + +func (fs *FileStreamer) Write(ctx context.Context, b []byte) error { + fs.logger.Debug("FileStreamer: Write", "path", fs.path) + f, err := os.OpenFile(fs.path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return err + } + defer f.Close() + + n, err := f.Write(b) + if err != nil { + return err + } + fs.logger.Debug("FileStreamer: Write finished", "bytes written", n) + return nil +} + +var handshakeConfig = plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "BASIC_PLUGIN", + MagicCookieValue: "hello", +} + +func main() { + logger := hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + Output: os.Stderr, + JSONFormat: true, + }) + + streamer := &FileStreamer{ + logger: logger, + } + var pluginMap = map[string]plugin.Plugin{ + "streamer": &shared.StreamerPlugin{ + Impl: streamer, + }, + } + + logger.Debug("plugin launched, about to be served") + + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: handshakeConfig, + Plugins: pluginMap, + GRPCServer: plugin.DefaultGRPCServer, + Logger: logger, + }) +} diff --git a/examples/streaming/proto/streamer.pb.go b/examples/streaming/proto/streamer.pb.go new file mode 100644 index 00000000..c5e08962 --- /dev/null +++ b/examples/streaming/proto/streamer.pb.go @@ -0,0 +1,478 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.1 +// protoc v5.29.3 +// source: streamer.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Read struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Read) Reset() { + *x = Read{} + mi := &file_streamer_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Read) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Read) ProtoMessage() {} + +func (x *Read) ProtoReflect() protoreflect.Message { + mi := &file_streamer_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Read.ProtoReflect.Descriptor instead. +func (*Read) Descriptor() ([]byte, []int) { + return file_streamer_proto_rawDescGZIP(), []int{0} +} + +type Write struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Write) Reset() { + *x = Write{} + mi := &file_streamer_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Write) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Write) ProtoMessage() {} + +func (x *Write) ProtoReflect() protoreflect.Message { + mi := &file_streamer_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Write.ProtoReflect.Descriptor instead. +func (*Write) Descriptor() ([]byte, []int) { + return file_streamer_proto_rawDescGZIP(), []int{1} +} + +type Configure struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Configure) Reset() { + *x = Configure{} + mi := &file_streamer_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Configure) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Configure) ProtoMessage() {} + +func (x *Configure) ProtoReflect() protoreflect.Message { + mi := &file_streamer_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Configure.ProtoReflect.Descriptor instead. +func (*Configure) Descriptor() ([]byte, []int) { + return file_streamer_proto_rawDescGZIP(), []int{2} +} + +type Read_Request struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Read_Request) Reset() { + *x = Read_Request{} + mi := &file_streamer_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Read_Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Read_Request) ProtoMessage() {} + +func (x *Read_Request) ProtoReflect() protoreflect.Message { + mi := &file_streamer_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Read_Request.ProtoReflect.Descriptor instead. +func (*Read_Request) Descriptor() ([]byte, []int) { + return file_streamer_proto_rawDescGZIP(), []int{0, 0} +} + +type Read_ResponseChunk struct { + state protoimpl.MessageState `protogen:"open.v1"` + ReadBytes []byte `protobuf:"bytes,1,opt,name=read_bytes,json=readBytes,proto3" json:"read_bytes,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Read_ResponseChunk) Reset() { + *x = Read_ResponseChunk{} + mi := &file_streamer_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Read_ResponseChunk) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Read_ResponseChunk) ProtoMessage() {} + +func (x *Read_ResponseChunk) ProtoReflect() protoreflect.Message { + mi := &file_streamer_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Read_ResponseChunk.ProtoReflect.Descriptor instead. +func (*Read_ResponseChunk) Descriptor() ([]byte, []int) { + return file_streamer_proto_rawDescGZIP(), []int{0, 1} +} + +func (x *Read_ResponseChunk) GetReadBytes() []byte { + if x != nil { + return x.ReadBytes + } + return nil +} + +type Write_RequestChunk struct { + state protoimpl.MessageState `protogen:"open.v1"` + BytesToWrite []byte `protobuf:"bytes,1,opt,name=bytes_to_write,json=bytesToWrite,proto3" json:"bytes_to_write,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Write_RequestChunk) Reset() { + *x = Write_RequestChunk{} + mi := &file_streamer_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Write_RequestChunk) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Write_RequestChunk) ProtoMessage() {} + +func (x *Write_RequestChunk) ProtoReflect() protoreflect.Message { + mi := &file_streamer_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Write_RequestChunk.ProtoReflect.Descriptor instead. +func (*Write_RequestChunk) Descriptor() ([]byte, []int) { + return file_streamer_proto_rawDescGZIP(), []int{1, 0} +} + +func (x *Write_RequestChunk) GetBytesToWrite() []byte { + if x != nil { + return x.BytesToWrite + } + return nil +} + +type Write_Response struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Write_Response) Reset() { + *x = Write_Response{} + mi := &file_streamer_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Write_Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Write_Response) ProtoMessage() {} + +func (x *Write_Response) ProtoReflect() protoreflect.Message { + mi := &file_streamer_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Write_Response.ProtoReflect.Descriptor instead. +func (*Write_Response) Descriptor() ([]byte, []int) { + return file_streamer_proto_rawDescGZIP(), []int{1, 1} +} + +type Configure_Request struct { + state protoimpl.MessageState `protogen:"open.v1"` + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + ChunkSize int64 `protobuf:"varint,2,opt,name=chunk_size,json=chunkSize,proto3" json:"chunk_size,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Configure_Request) Reset() { + *x = Configure_Request{} + mi := &file_streamer_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Configure_Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Configure_Request) ProtoMessage() {} + +func (x *Configure_Request) ProtoReflect() protoreflect.Message { + mi := &file_streamer_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Configure_Request.ProtoReflect.Descriptor instead. +func (*Configure_Request) Descriptor() ([]byte, []int) { + return file_streamer_proto_rawDescGZIP(), []int{2, 0} +} + +func (x *Configure_Request) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +func (x *Configure_Request) GetChunkSize() int64 { + if x != nil { + return x.ChunkSize + } + return 0 +} + +type Configure_Response struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Configure_Response) Reset() { + *x = Configure_Response{} + mi := &file_streamer_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Configure_Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Configure_Response) ProtoMessage() {} + +func (x *Configure_Response) ProtoReflect() protoreflect.Message { + mi := &file_streamer_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Configure_Response.ProtoReflect.Descriptor instead. +func (*Configure_Response) Descriptor() ([]byte, []int) { + return file_streamer_proto_rawDescGZIP(), []int{2, 1} +} + +var File_streamer_proto protoreflect.FileDescriptor + +var file_streamer_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x41, 0x0a, 0x04, 0x52, 0x65, 0x61, 0x64, 0x1a, + 0x09, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, 0x0a, 0x0d, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x12, 0x1d, 0x0a, 0x0a, 0x72, + 0x65, 0x61, 0x64, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x09, 0x72, 0x65, 0x61, 0x64, 0x42, 0x79, 0x74, 0x65, 0x73, 0x22, 0x49, 0x0a, 0x05, 0x57, 0x72, + 0x69, 0x74, 0x65, 0x1a, 0x34, 0x0a, 0x0c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x43, 0x68, + 0x75, 0x6e, 0x6b, 0x12, 0x24, 0x0a, 0x0e, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x74, 0x6f, 0x5f, + 0x77, 0x72, 0x69, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x62, 0x79, 0x74, + 0x65, 0x73, 0x54, 0x6f, 0x57, 0x72, 0x69, 0x74, 0x65, 0x1a, 0x0a, 0x0a, 0x08, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, + 0x72, 0x65, 0x1a, 0x3c, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, + 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, + 0x68, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x53, 0x69, 0x7a, 0x65, + 0x1a, 0x0a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xca, 0x01, 0x0a, + 0x0f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x40, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x12, 0x18, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x38, 0x0a, 0x04, 0x52, 0x65, 0x61, 0x64, 0x12, 0x13, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x30, 0x01, 0x12, 0x3b, 0x0a, 0x05, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, + 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x43, 0x68, 0x75, 0x6e, 0x6b, + 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_streamer_proto_rawDescOnce sync.Once + file_streamer_proto_rawDescData = file_streamer_proto_rawDesc +) + +func file_streamer_proto_rawDescGZIP() []byte { + file_streamer_proto_rawDescOnce.Do(func() { + file_streamer_proto_rawDescData = protoimpl.X.CompressGZIP(file_streamer_proto_rawDescData) + }) + return file_streamer_proto_rawDescData +} + +var file_streamer_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_streamer_proto_goTypes = []any{ + (*Read)(nil), // 0: proto.Read + (*Write)(nil), // 1: proto.Write + (*Configure)(nil), // 2: proto.Configure + (*Read_Request)(nil), // 3: proto.Read.Request + (*Read_ResponseChunk)(nil), // 4: proto.Read.ResponseChunk + (*Write_RequestChunk)(nil), // 5: proto.Write.RequestChunk + (*Write_Response)(nil), // 6: proto.Write.Response + (*Configure_Request)(nil), // 7: proto.Configure.Request + (*Configure_Response)(nil), // 8: proto.Configure.Response +} +var file_streamer_proto_depIdxs = []int32{ + 7, // 0: proto.StreamerService.Configure:input_type -> proto.Configure.Request + 3, // 1: proto.StreamerService.Read:input_type -> proto.Read.Request + 5, // 2: proto.StreamerService.Write:input_type -> proto.Write.RequestChunk + 8, // 3: proto.StreamerService.Configure:output_type -> proto.Configure.Response + 4, // 4: proto.StreamerService.Read:output_type -> proto.Read.ResponseChunk + 6, // 5: proto.StreamerService.Write:output_type -> proto.Write.Response + 3, // [3:6] is the sub-list for method output_type + 0, // [0:3] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_streamer_proto_init() } +func file_streamer_proto_init() { + if File_streamer_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_streamer_proto_rawDesc, + NumEnums: 0, + NumMessages: 9, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_streamer_proto_goTypes, + DependencyIndexes: file_streamer_proto_depIdxs, + MessageInfos: file_streamer_proto_msgTypes, + }.Build() + File_streamer_proto = out.File + file_streamer_proto_rawDesc = nil + file_streamer_proto_goTypes = nil + file_streamer_proto_depIdxs = nil +} diff --git a/examples/streaming/proto/streamer.proto b/examples/streaming/proto/streamer.proto new file mode 100644 index 00000000..d10f9df2 --- /dev/null +++ b/examples/streaming/proto/streamer.proto @@ -0,0 +1,34 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +syntax = "proto3"; +package proto; +option go_package = "./proto"; + +message Read { + message Request {} + message ResponseChunk { + bytes read_bytes = 1; + } +} + +message Write { + message RequestChunk { + bytes bytes_to_write = 1; + } + message Response {} +} + +message Configure { + message Request { + string path = 1; + int64 chunk_size = 2; + } + message Response {} +} + +service StreamerService { + rpc Configure(Configure.Request) returns (Configure.Response); + rpc Read(Read.Request) returns (stream Read.ResponseChunk); + rpc Write(stream Write.RequestChunk) returns (Write.Response); +} diff --git a/examples/streaming/proto/streamer_grpc.pb.go b/examples/streaming/proto/streamer_grpc.pb.go new file mode 100644 index 00000000..cdd2af85 --- /dev/null +++ b/examples/streaming/proto/streamer_grpc.pb.go @@ -0,0 +1,248 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.29.3 +// source: streamer.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + StreamerService_Configure_FullMethodName = "/proto.StreamerService/Configure" + StreamerService_Read_FullMethodName = "/proto.StreamerService/Read" + StreamerService_Write_FullMethodName = "/proto.StreamerService/Write" +) + +// StreamerServiceClient is the client API for StreamerService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type StreamerServiceClient interface { + Configure(ctx context.Context, in *Configure_Request, opts ...grpc.CallOption) (*Configure_Response, error) + Read(ctx context.Context, in *Read_Request, opts ...grpc.CallOption) (StreamerService_ReadClient, error) + Write(ctx context.Context, opts ...grpc.CallOption) (StreamerService_WriteClient, error) +} + +type streamerServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewStreamerServiceClient(cc grpc.ClientConnInterface) StreamerServiceClient { + return &streamerServiceClient{cc} +} + +func (c *streamerServiceClient) Configure(ctx context.Context, in *Configure_Request, opts ...grpc.CallOption) (*Configure_Response, error) { + out := new(Configure_Response) + err := c.cc.Invoke(ctx, StreamerService_Configure_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *streamerServiceClient) Read(ctx context.Context, in *Read_Request, opts ...grpc.CallOption) (StreamerService_ReadClient, error) { + stream, err := c.cc.NewStream(ctx, &StreamerService_ServiceDesc.Streams[0], StreamerService_Read_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &streamerServiceReadClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type StreamerService_ReadClient interface { + Recv() (*Read_ResponseChunk, error) + grpc.ClientStream +} + +type streamerServiceReadClient struct { + grpc.ClientStream +} + +func (x *streamerServiceReadClient) Recv() (*Read_ResponseChunk, error) { + m := new(Read_ResponseChunk) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *streamerServiceClient) Write(ctx context.Context, opts ...grpc.CallOption) (StreamerService_WriteClient, error) { + stream, err := c.cc.NewStream(ctx, &StreamerService_ServiceDesc.Streams[1], StreamerService_Write_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &streamerServiceWriteClient{stream} + return x, nil +} + +type StreamerService_WriteClient interface { + Send(*Write_RequestChunk) error + CloseAndRecv() (*Write_Response, error) + grpc.ClientStream +} + +type streamerServiceWriteClient struct { + grpc.ClientStream +} + +func (x *streamerServiceWriteClient) Send(m *Write_RequestChunk) error { + return x.ClientStream.SendMsg(m) +} + +func (x *streamerServiceWriteClient) CloseAndRecv() (*Write_Response, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(Write_Response) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// StreamerServiceServer is the server API for StreamerService service. +// All implementations must embed UnimplementedStreamerServiceServer +// for forward compatibility +type StreamerServiceServer interface { + Configure(context.Context, *Configure_Request) (*Configure_Response, error) + Read(*Read_Request, StreamerService_ReadServer) error + Write(StreamerService_WriteServer) error + mustEmbedUnimplementedStreamerServiceServer() +} + +// UnimplementedStreamerServiceServer must be embedded to have forward compatible implementations. +type UnimplementedStreamerServiceServer struct { +} + +func (UnimplementedStreamerServiceServer) Configure(context.Context, *Configure_Request) (*Configure_Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Configure not implemented") +} +func (UnimplementedStreamerServiceServer) Read(*Read_Request, StreamerService_ReadServer) error { + return status.Errorf(codes.Unimplemented, "method Read not implemented") +} +func (UnimplementedStreamerServiceServer) Write(StreamerService_WriteServer) error { + return status.Errorf(codes.Unimplemented, "method Write not implemented") +} +func (UnimplementedStreamerServiceServer) mustEmbedUnimplementedStreamerServiceServer() {} + +// UnsafeStreamerServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to StreamerServiceServer will +// result in compilation errors. +type UnsafeStreamerServiceServer interface { + mustEmbedUnimplementedStreamerServiceServer() +} + +func RegisterStreamerServiceServer(s grpc.ServiceRegistrar, srv StreamerServiceServer) { + s.RegisterService(&StreamerService_ServiceDesc, srv) +} + +func _StreamerService_Configure_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Configure_Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StreamerServiceServer).Configure(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: StreamerService_Configure_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StreamerServiceServer).Configure(ctx, req.(*Configure_Request)) + } + return interceptor(ctx, in, info, handler) +} + +func _StreamerService_Read_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Read_Request) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(StreamerServiceServer).Read(m, &streamerServiceReadServer{stream}) +} + +type StreamerService_ReadServer interface { + Send(*Read_ResponseChunk) error + grpc.ServerStream +} + +type streamerServiceReadServer struct { + grpc.ServerStream +} + +func (x *streamerServiceReadServer) Send(m *Read_ResponseChunk) error { + return x.ServerStream.SendMsg(m) +} + +func _StreamerService_Write_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(StreamerServiceServer).Write(&streamerServiceWriteServer{stream}) +} + +type StreamerService_WriteServer interface { + SendAndClose(*Write_Response) error + Recv() (*Write_RequestChunk, error) + grpc.ServerStream +} + +type streamerServiceWriteServer struct { + grpc.ServerStream +} + +func (x *streamerServiceWriteServer) SendAndClose(m *Write_Response) error { + return x.ServerStream.SendMsg(m) +} + +func (x *streamerServiceWriteServer) Recv() (*Write_RequestChunk, error) { + m := new(Write_RequestChunk) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// StreamerService_ServiceDesc is the grpc.ServiceDesc for StreamerService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var StreamerService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "proto.StreamerService", + HandlerType: (*StreamerServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Configure", + Handler: _StreamerService_Configure_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Read", + Handler: _StreamerService_Read_Handler, + ServerStreams: true, + }, + { + StreamName: "Write", + Handler: _StreamerService_Write_Handler, + ClientStreams: true, + }, + }, + Metadata: "streamer.proto", +} diff --git a/examples/streaming/shared/client.go b/examples/streaming/shared/client.go new file mode 100644 index 00000000..fd97d7ca --- /dev/null +++ b/examples/streaming/shared/client.go @@ -0,0 +1,77 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package shared + +import ( + "bytes" + "context" + "io" + + "github.com/hashicorp/go-plugin/examples/streaming/proto" +) + +type StreamerGRPCClient struct { + client proto.StreamerServiceClient + chunkByteSize int +} + +var _ Streamer = &StreamerGRPCClient{} + +func (g *StreamerGRPCClient) Configure(ctx context.Context, path string, chunkSize int64) error { + g.chunkByteSize = int(chunkSize) + _, err := g.client.Configure(ctx, &proto.Configure_Request{ + Path: path, + ChunkSize: chunkSize, + }) + return err +} + +func (g *StreamerGRPCClient) Read(ctx context.Context) ([]byte, error) { + readClient, err := g.client.Read(ctx, &proto.Read_Request{}) + if err != nil { + return nil, err + } + + // receive all byte chunks + var buf bytes.Buffer + for { + resp, err := readClient.Recv() + if err == io.EOF { + break + } else if err != nil { + return buf.Bytes(), err + } + + _, err = buf.Write(resp.ReadBytes) + if err != nil { + return buf.Bytes(), err + } + } + + return buf.Bytes(), nil +} + +func (g *StreamerGRPCClient) Write(ctx context.Context, b []byte) error { + writeClient, err := g.client.Write(ctx) + if err != nil { + return err + } + + buf := bytes.NewBuffer(b) + for chunkBytes := buf.Next(g.chunkByteSize); len(chunkBytes) > 0; chunkBytes = buf.Next(g.chunkByteSize) { + err = writeClient.Send(&proto.Write_RequestChunk{ + BytesToWrite: chunkBytes, + }) + if err != nil { + return err + } + } + + _, err = writeClient.CloseAndRecv() + if err != nil && err != io.EOF { + return err + } + + return nil +} diff --git a/examples/streaming/shared/interface.go b/examples/streaming/shared/interface.go new file mode 100644 index 00000000..91ffd8ca --- /dev/null +++ b/examples/streaming/shared/interface.go @@ -0,0 +1,14 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package shared + +import ( + "context" +) + +type Streamer interface { + Configure(ctx context.Context, path string, chunkSize int64) error + Read(ctx context.Context) ([]byte, error) + Write(ctx context.Context, b []byte) error +} diff --git a/examples/streaming/shared/server.go b/examples/streaming/shared/server.go new file mode 100644 index 00000000..13790dc1 --- /dev/null +++ b/examples/streaming/shared/server.go @@ -0,0 +1,90 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package shared + +import ( + "bytes" + "context" + "io" + + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/go-plugin/examples/streaming/proto" + "google.golang.org/grpc" +) + +type StreamerGRPCServer struct { + Impl Streamer + + chunkSize int + proto.UnimplementedStreamerServiceServer +} + +func (s *StreamerGRPCServer) Configure(ctx context.Context, req *proto.Configure_Request) (*proto.Configure_Response, error) { + s.chunkSize = int(req.ChunkSize) + return &proto.Configure_Response{}, s.Impl.Configure(ctx, req.Path, req.ChunkSize) +} + +func (s *StreamerGRPCServer) Read(req *proto.Read_Request, srv proto.StreamerService_ReadServer) error { + b, err := s.Impl.Read(srv.Context()) + if err != nil { + return err + } + + // send it by chunks + buf := bytes.NewBuffer(b) + for nextChunk := buf.Next(s.chunkSize); len(nextChunk) > 0; nextChunk = buf.Next(s.chunkSize) { + err = srv.Send(&proto.Read_ResponseChunk{ + ReadBytes: nextChunk, + }) + if err != nil { + return err + } + } + + return nil +} + +func (s *StreamerGRPCServer) Write(srv proto.StreamerService_WriteServer) error { + var buf bytes.Buffer + // receive all byte chunks + for { + writeReq, err := srv.Recv() + if err == io.EOF { + break + } else if err != nil { + return err + } + _, err = buf.Write(writeReq.BytesToWrite) + if err != nil { + return err + } + } + + err := s.Impl.Write(srv.Context(), buf.Bytes()) + if err != nil { + return err + } + + return nil +} + +var _ plugin.GRPCPlugin = &StreamerPlugin{} + +type StreamerPlugin struct { + plugin.NetRPCUnsupportedPlugin + Impl Streamer +} + +func (p *StreamerPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + proto.RegisterStreamerServiceServer(s, &StreamerGRPCServer{ + Impl: p.Impl, + }) + return nil +} + +func (p *StreamerPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { + return &StreamerGRPCClient{ + client: proto.NewStreamerServiceClient(c), + }, nil +}