Skip to content

Commit 673822e

Browse files
committed
Add gRPC render metadata header
In headers, we now send the number of metrics gRPC render will stream. This can be beneficial for the receiver to know how many metrics it will receive over the stream.
1 parent 1d734b7 commit 673822e

File tree

1 file changed

+13
-0
lines changed

1 file changed

+13
-0
lines changed

carbonserver/render.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"go.uber.org/zap"
2020
"google.golang.org/grpc/codes"
21+
"google.golang.org/grpc/metadata"
2122
"google.golang.org/grpc/peer"
2223
"google.golang.org/grpc/status"
2324

@@ -646,6 +647,13 @@ func (listener *CarbonserverListener) streamMetrics(stream grpcv2.CarbonV2_Rende
646647
return
647648
}
648649

650+
const gRPCRenderMetricsCountHeaderKey = "metrics-count"
651+
652+
func sendRenderMetadataHeader(stream grpcv2.CarbonV2_RenderServer, filesCount int) error {
653+
header := metadata.Pairs(gRPCRenderMetricsCountHeaderKey, strconv.Itoa(filesCount))
654+
return stream.SendHeader(header)
655+
}
656+
649657
// Render implements Render rpc of CarbonV2 gRPC service
650658
func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, stream grpcv2.CarbonV2_RenderServer) (rpcErr error) {
651659
t0 := time.Now()
@@ -726,6 +734,10 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
726734
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
727735
tle.MetricGlobMapLength = len(metricGlobMap)
728736
filesCount := countFilesInExpandedGlobs(expandedGlobs)
737+
err = sendRenderMetadataHeader(stream, filesCount)
738+
if err != nil {
739+
return nil, err
740+
}
729741
prepareChan := make(chan response, getStreamingChannelSize(filesCount))
730742
go func() {
731743
prepareT0 := time.Now()
@@ -768,6 +780,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
768780
case res != nil:
769781
atomic.AddUint64(&listener.metrics.QueryCacheHit, 1)
770782
cachedResponses := res.([]response)
783+
err = sendRenderMetadataHeader(stream, len(cachedResponses))
771784
responseChanToStream = make(chan response, getStreamingChannelSize(len(cachedResponses)))
772785
go func() {
773786
for _, r := range cachedResponses {

0 commit comments

Comments
 (0)