Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
511 changes: 511 additions & 0 deletions NOTICE

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/prometheus/procfs v0.16.1
github.com/radovskyb/watcher v1.0.7
github.com/sevlyar/go-daemon v0.1.6
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/cobra v1.10.1
github.com/spf13/pflag v1.0.10
github.com/spf13/viper v1.21.0
Expand All @@ -37,6 +38,7 @@ require (
github.com/AzureAD/microsoft-authentication-library-for-go v1.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang-jwt/jwt/v5 v5.3.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
Expand All @@ -49,6 +51,7 @@ require (
github.com/spf13/afero v1.15.0 // indirect
github.com/spf13/cast v1.10.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/net v0.46.0 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/gapra-msft/cobra v1.4.1-0.20220411185530-5b83e8ba06dd h1:U3d5Jlb0ANsyxk2lnlhYh7/Ov4bZpIBUxJTsVuJM9G0=
github.com/gapra-msft/cobra v1.4.1-0.20220411185530-5b83e8ba06dd/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs=
github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
Expand Down Expand Up @@ -78,6 +80,8 @@ github.com/sagikazarmark/locafero v0.12.0 h1:/NQhBAkUb4+fH1jivKHWusDYFjMOOKU88ee
github.com/sagikazarmark/locafero v0.12.0/go.mod h1:sZh36u/YSZ918v0Io+U9ogLYQJ9tLLBmM4eneO6WwsI=
github.com/sevlyar/go-daemon v0.1.6 h1:EUh1MDjEM4BI109Jign0EaknA2izkOyi0LV3ro3QQGs=
github.com/sevlyar/go-daemon v0.1.6/go.mod h1:6dJpPatBT9eUwM5VCw9Bt6CdX9Tk6UWvhW3MebLDRKE=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I=
github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
Expand All @@ -96,6 +100,8 @@ github.com/vibhansa-msft/blobfilter v0.0.0-20250115104552-d9d40722be3e/go.mod h1
github.com/vibhansa-msft/tlru v0.0.0-20240410102558-9e708419e21f h1:KmQFbsVFi45PtwEWIXugkW0X9VSJ+rZtee/WCPG5unc=
github.com/vibhansa-msft/tlru v0.0.0-20240410102558-9e708419e21f/go.mod h1:7G2C64UXEWNr8oUzspzcrymxCjD9fKAKTGbL7zO2GW8=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
Expand All @@ -114,6 +120,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
21 changes: 21 additions & 0 deletions internal/dcache/debug/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,27 @@ func readClusterSummaryHelpCallback(pFile *procFile) error {
return nil
}

// proc file: nodes-stats
// Get stats from all nodes in the cluster via RPCs and aggregate them into a NodesStats structure.
func readNodesStatsCallback(pFile *procFile) error {
nodesStats, err := rpc_client.GetNodesStats()
if err != nil {
log.Err("DebugFS::readNodesStatsCallback: failed to get nodes stats: %v", err)
return err
}

common.Assert(nodesStats != nil)

pFile.buf, err = json.MarshalIndent(nodesStats, "", " ")
if err != nil {
log.Err("DebugFS::readNodesStatsCallback: marshal failed: %v", err)
common.Assert(false, err)
return err
}

return nil
}

// Silence unused import errors for release builds.
func init() {
common.IsValidUUID("00000000-0000-0000-0000-000000000000")
Expand Down
4 changes: 4 additions & 0 deletions internal/dcache/debug/debug_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func init() {
"cluster-summary.help": &procFile{
refreshBuffer: readClusterSummaryHelpCallback,
}, // Help summary about fs=debug/cluster-summary.

"nodes-stats": &procFile{
refreshBuffer: readNodesStatsCallback,
}, // Get node level stats via RPC.
}

procDirList = make([]*internal.ObjAttr, 0, len(procFiles))
Expand Down
28 changes: 28 additions & 0 deletions internal/dcache/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,31 @@ type MVsSummary struct {
Syncing int64 `json:"syncing"`
Offline int64 `json:"offline"`
}

// Stats for all nodes in the cluster.
type NodesStats struct {
Timestamp string `json:"timestamp"`
Count int64 `json:"count"`
Nodes []*NodeInfo `json:"nodes"`
Aggregate *NodesAggregate `json:"aggregate"` // Aggregated stats across all nodes.
Errors map[string]string `json:"errors,omitempty"` // nodeID -> error string
}

// Stats for a single node in the cluster.
type NodeInfo struct {
NodeID string `json:"node_id"`
HostName string `json:"hostname"`
IPAddress string `json:"ip_address"`
MemUsed string `json:"mem_used"`
MemTotal string `json:"mem_total"`
PercentMemUsed string `json:"percent_mem_used"`
}

// Aggregated stats across all nodes in the cluster.
type NodesAggregate struct {
MemUsedBytes int64 `json:"-"`
MemTotalBytes int64 `json:"-"`
MemUsed string `json:"mem_used"`
MemTotal string `json:"mem_total"`
PercentMemUsed string `json:"percent_mem_used"`
}
12 changes: 11 additions & 1 deletion internal/dcache/rpc/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,22 @@ func (suite *rpcClientTestSuite) TestNewRPCClientTimeout() {
nodeID := "test-node-id"
nodeAddress := "10.0.0.5:9090"

client, err := newRPCClient(nodeID, nodeAddress)
client, err := newRPCClient(nodeID, 0, nodeAddress)
suite.assert.Error(err)
suite.assert.Contains(err.Error(), "timeout")
suite.assert.Nil(client)
}

func (suite *rpcClientTestSuite) TestConvertBytesToReadable() {
suite.assert.Equal("512 B", bytesToReadable(512))
suite.assert.Equal("1.00 KB", bytesToReadable(1024))
suite.assert.Equal("1.00 MB", bytesToReadable(1048576))
suite.assert.Equal("1.00 GB", bytesToReadable(1073741824))
suite.assert.Equal("12.02 GB", bytesToReadable(12911104000))
suite.assert.Equal("1.00 TB", bytesToReadable(1099511627776))
suite.assert.Equal("1.00 PB", bytesToReadable(1125899906842624))
}

func TestRPCClientTestSuite(t *testing.T) {
suite.Run(t, new(rpcClientTestSuite))
}
108 changes: 108 additions & 0 deletions internal/dcache/rpc/client/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,114 @@ func GetLogs(ctx context.Context, targetNodeID, outDir string, numLogs, chunkSiz
return outPath, nil
}

// GetNodeStats gets stats from target node.
func GetNodeStats(ctx context.Context, targetNodeID string, req *models.GetNodeStatsRequest) (*models.GetNodeStatsResponse, error) {
common.Assert(req != nil)
common.Assert(common.IsValidUUID(targetNodeID), targetNodeID)

// Caller must not set SenderNodeID, catch misbehaving callers.
common.Assert(len(req.SenderNodeID) == 0, req.SenderNodeID)
req.SenderNodeID = myNodeId

reqStr := req.String()
log.Debug("rpc_client::GetNodeStats: Sending GetNodeStats request to node %s: %s", targetNodeID, reqStr)

//
// We retry once after resetting bad connections.
//
for i := 0; i < 2; i++ {
// Get RPC client from the client pool.
client, err := cp.getRPCClient(targetNodeID)
if err != nil {
err = fmt.Errorf("rpc_client::GetNodeStats: Failed to get RPC client for node %s %s: %v [%w]",
targetNodeID, reqStr, err, NoFreeRPCClient)
log.Err("%v", err)
return nil, err
}

// Call the rpc method.
resp, err := client.svcClient.GetNodeStats(ctx, req)
if err != nil {
log.Err("rpc_client::GetNodeStats: GetNodeStats failed to node %s %s: %v",
targetNodeID, reqStr, err)

//
// Only possible errors:
// - Actual RPC error returned by the server.
// - Broken pipe means we attempted to write the RPC request after the blobfuse2 process stopped.
// - Connection closed by the server (maybe it restarted before it could respond).
// In this case we could send the request before the blobfuse2 process stopped but it
// stopped before it could respond.
// - Connection reset by the server (same as above, but peer send a TCP RST instead of FIN).
// Only read()/recv() can fail with this, write()/send() will fail with broken pipe.
// - TimedOut means the node is down or cannot be reached over the n/w.
//
// All other errors other than RPC error indicate some problem with the target node or the
// n/w, so we delete all existing connections to the node, prohibit new connections for a
// short period and then create new connections when needed.
//
// TODO: See if we need to optimize any of these cases, i.e., don't delete all connections.
//
common.Assert(rpc.IsRPCError(err) ||
rpc.IsBrokenPipe(err) ||
rpc.IsConnectionClosed(err) ||
rpc.IsConnectionReset(err) ||
rpc.IsTimedOut(err), err)

if rpc.IsBrokenPipe(err) || rpc.IsConnectionClosed(err) || rpc.IsConnectionReset(err) {
//
// Common reason for first time error could be that we have old connections and since
// then blobfuse2 process or the node has restarted causing those connections to fail
// with broken pipe or connection closed/reset errors, so first time around we don't
// mark the node as negative, but if retrying also fails with similar error, it means the
// blobfuse2 process is still down so we mark it as negative.
//
cp.deleteAllRPCClients(client, i == 1 /* confirmedBadNode */, false /* isClientClosed */)
if i == 1 {
return nil, err
}
err1 := cp.waitForNodeClientPoolToDelete(client.nodeID, client.nodeIDInt)
if err1 != nil {
return nil, err
}
// Continue with newly created client.
continue
} else if rpc.IsTimedOut(err) {
cp.deleteAllRPCClients(client, true /* confirmedBadNode */, false /* isClientClosed */)
return nil, err
}

// Fall through to release the RPC client.
resp = nil
} else {
//
// The RPC call to the target node succeeded. If the node or the RV is marked negative or iffy,
// clear it now.
//
cp.removeNegativeNode(targetNodeID)
}

// Release RPC client back to the pool.
err1 := cp.releaseRPCClient(client)
if err1 != nil {
log.Err("rpc_client::GetNodeStats: Failed to release RPC client for node %s %v: %v",
targetNodeID, reqStr, err1)
// Assert, but not fail the GetNodeStats call.
common.Assert(false, err1)
}

return resp, err
}

//
// We come here when we could not succeed even after resetting stale connections and retrying.
// This is unexpected, but can happen if the target node goes offline or restarts more than once in
// quick succession.
//
return nil, fmt.Errorf("rpc_client::GetNodeStats: Could not find a valid RPC client for node %s %s",
targetNodeID, reqStr)
}

// cleanup closes all the RPC node client pools
func Cleanup() error {
log.Info("rpc_client::Cleanup: Closing all node client pools")
Expand Down
104 changes: 104 additions & 0 deletions internal/dcache/rpc/client/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ import (

"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal/dcache"
cm "github.com/Azure/azure-storage-fuse/v2/internal/dcache/clustermap"
"github.com/Azure/azure-storage-fuse/v2/internal/dcache/rpc"
"github.com/Azure/azure-storage-fuse/v2/internal/dcache/rpc/gen-go/dcache/models"
)

// CollectAllNodeLogs downloads log tarballs from every node in the current cluster into outDir.
Expand Down Expand Up @@ -139,3 +141,105 @@ func CollectAllNodeLogs(outDir string, numLogs int64) (map[string]string, error)

return results, allErr
}

// GetNodesStats collects stats from all nodes in the cluster via RPCs and
// aggregates them into a NodesStats structure.
func GetNodesStats() (*dcache.NodesStats, error) {
log.Debug("GetNodesStats: Starting nodes stats collection")

nodeMap := cm.GetAllNodes()
if len(nodeMap) == 0 {
common.Assert(false)
return nil, fmt.Errorf("GetNodesStats: no nodes found in cluster")
}

nodesStats := &dcache.NodesStats{
Timestamp: time.Now().UTC().Format(time.RFC3339),
Count: int64(len(nodeMap)),
Aggregate: &dcache.NodesAggregate{},
Errors: make(map[string]string),
}

var wg sync.WaitGroup
var mu sync.Mutex

//
// We can start stats collection in parallel on lot of nodes as it doesn't load any single
// node. It'll be limited by ingress network b/w and disk IO on the requesting node.
//
workerCount := min(1000, len(nodeMap))
jobs := make(chan string, workerCount)

// Workers
for i := 0; i < workerCount; i++ {
wg.Add(1)

go func() {
defer wg.Done()
for nodeID := range jobs {
nodeInfo := &dcache.NodeInfo{
NodeID: nodeID,
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := GetNodeStats(ctx, nodeID, &models.GetNodeStatsRequest{})
cancel()

if err != nil {
mu.Lock()
nodesStats.Errors[nodeID] = err.Error()
mu.Unlock()
continue
}

nodeInfo.HostName = resp.HostName
nodeInfo.IPAddress = resp.IpAddress
nodeInfo.MemUsed = bytesToReadable(resp.MemUsedBytes)
nodeInfo.MemTotal = bytesToReadable(resp.MemTotalBytes)
nodeInfo.PercentMemUsed = resp.PercentMemUsed

mu.Lock()
nodesStats.Aggregate.MemUsedBytes += resp.MemUsedBytes
nodesStats.Aggregate.MemTotalBytes += resp.MemTotalBytes
nodesStats.Nodes = append(nodesStats.Nodes, nodeInfo)
mu.Unlock()
}
}()
}

// Feed jobs
for nodeID, _ := range nodeMap {
jobs <- nodeID
}
close(jobs)

// Wait for workers to finish
wg.Wait()

// Prepare aggregate stats
nodesStats.Aggregate.MemUsed = bytesToReadable(nodesStats.Aggregate.MemUsedBytes)
nodesStats.Aggregate.MemTotal = bytesToReadable(nodesStats.Aggregate.MemTotalBytes)
if nodesStats.Aggregate.MemTotalBytes > 0 {
percentUsed := (float64(nodesStats.Aggregate.MemUsedBytes) /
float64(nodesStats.Aggregate.MemTotalBytes)) * 100.0
nodesStats.Aggregate.PercentMemUsed = fmt.Sprintf("%.2f%%", percentUsed)
}

return nodesStats, nil
}

// Convert uint64 value in bytes to readable format
func bytesToReadable(bytes int64) string {
const unit = 1024
if bytes < unit {
return fmt.Sprintf("%d B", bytes)
}

div, exp := int64(unit), 0
for n := bytes / unit; n >= unit; n /= unit {
div *= unit
exp++
}

return fmt.Sprintf("%.2f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
}
Loading