Skip to content

Commit 9d56910

Browse files
committed
MV placement for reducing RV<->RV connections (#2012)
1 parent fef020c commit 9d56910

File tree

9 files changed

+389
-73
lines changed

9 files changed

+389
-73
lines changed

component/distributed_cache/distributed_cache.go

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,17 +109,18 @@ type DistributedCacheOptions struct {
109109
RVNearfullThreshold uint64 `config:"rv-nearfull-threshold" yaml:"rv-nearfull-threshold,omitempty"`
110110
MaxCacheSize uint64 `config:"max-cache-size" yaml:"max-cache-size,omitempty"`
111111

112-
MinNodes uint32 `config:"min-nodes" yaml:"min-nodes,omitempty"`
113-
MaxRVs uint32 `config:"max-rvs" yaml:"max-rvs,omitempty"`
114-
MVsPerRV uint64 `config:"mvs-per-rv" yaml:"mvs-per-rv,omitempty"`
115-
RebalancePercentage uint8 `config:"rebalance-percentage" yaml:"rebalance-percentage,omitempty"`
116-
SafeDeletes bool `config:"safe-deletes" yaml:"safe-deletes,omitempty"`
117-
CacheAccess string `config:"cache-access" yaml:"cache-access,omitempty"`
118-
IgnoreFD bool `config:"ignore-fd" yaml:"ignore-fd,omitempty"`
119-
IgnoreUD bool `config:"ignore-ud" yaml:"ignore-ud,omitempty"`
120-
ClustermapEpoch uint64 `config:"clustermap-epoch" yaml:"clustermap-epoch,omitempty"`
121-
ReadIOMode string `config:"read-io-mode" yaml:"read-io-mode,omitempty"`
122-
WriteIOMode string `config:"write-io-mode" yaml:"write-io-mode,omitempty"`
112+
MinNodes uint32 `config:"min-nodes" yaml:"min-nodes,omitempty"`
113+
MaxRVs uint32 `config:"max-rvs" yaml:"max-rvs,omitempty"`
114+
MVsPerRV uint64 `config:"mvs-per-rv" yaml:"mvs-per-rv,omitempty"`
115+
RebalancePercentage uint8 `config:"rebalance-percentage" yaml:"rebalance-percentage,omitempty"`
116+
SafeDeletes bool `config:"safe-deletes" yaml:"safe-deletes,omitempty"`
117+
CacheAccess string `config:"cache-access" yaml:"cache-access,omitempty"`
118+
IgnoreFD bool `config:"ignore-fd" yaml:"ignore-fd,omitempty"`
119+
IgnoreUD bool `config:"ignore-ud" yaml:"ignore-ud,omitempty"`
120+
RingBasedMVPlacement bool `config:"ring-based-mv-placement" yaml:"ring-based-mv-placement,omitempty"`
121+
ClustermapEpoch uint64 `config:"clustermap-epoch" yaml:"clustermap-epoch,omitempty"`
122+
ReadIOMode string `config:"read-io-mode" yaml:"read-io-mode,omitempty"`
123+
WriteIOMode string `config:"write-io-mode" yaml:"write-io-mode,omitempty"`
123124
}
124125

125126
const (
@@ -141,6 +142,7 @@ const (
141142
dcacheDirContToken = "__DCDIRENT__"
142143
defaultIgnoreFD = true // By default ignore VM Fault Domain for MV placement decisions.
143144
defaultIgnoreUD = true // By default ignore VM Update Domain for MV placement decisions.
145+
defaultRingBasedMVPlacement = true // By default use ring based MV placement (vs random).
144146
)
145147

146148
// Verification to check satisfaction criteria with Component Interface
@@ -239,6 +241,7 @@ func (dc *DistributedCache) startClusterManager() string {
239241
CacheAccess: dc.cfg.CacheAccess,
240242
IgnoreFD: dc.cfg.IgnoreFD,
241243
IgnoreUD: dc.cfg.IgnoreUD,
244+
RingBasedMVPlacement: dc.cfg.RingBasedMVPlacement,
242245
RvFullThreshold: dc.cfg.RVFullThreshold,
243246
RvNearfullThreshold: dc.cfg.RVNearfullThreshold,
244247
}
@@ -495,6 +498,16 @@ func (distributedCache *DistributedCache) Configure(_ bool) error {
495498
distributedCache.Name(), distributedCache.cfg.Replicas, cm.MinNumReplicas, cm.MaxNumReplicas)
496499
}
497500

501+
if !config.IsSet(compName + ".ring-based-mv-placement") {
502+
distributedCache.cfg.RingBasedMVPlacement = defaultRingBasedMVPlacement
503+
}
504+
cm.RingBasedMVPlacement = distributedCache.cfg.RingBasedMVPlacement
505+
506+
if cm.RingBasedMVPlacement {
507+
// Set this very high for ring based MV placement.
508+
cm.MaxMVsPerRV = 100000
509+
}
510+
498511
if !config.IsSet(compName + ".heartbeat-duration") {
499512
distributedCache.cfg.HeartbeatDuration = defaultHeartBeatDurationInSecond
500513
}
@@ -522,6 +535,12 @@ func (distributedCache *DistributedCache) Configure(_ bool) error {
522535
if config.IsSet(compName + ".mvs-per-rv") {
523536
// If user sets mvs-per-rv in the config then that value MUST be honoured.
524537
cm.MVsPerRVLocked = true
538+
539+
// For now we don't allow mvs-per-rv config with ring-based-mv-placement.
540+
if cm.RingBasedMVPlacement {
541+
return fmt.Errorf("config error in %s: [cannot set mvs-per-rv when ring-based-mv-placement is true]",
542+
distributedCache.Name())
543+
}
525544
} else {
526545
common.Assert(distributedCache.cfg.MaxRVs > 0, distributedCache.cfg)
527546
common.Assert(distributedCache.cfg.Replicas > 0, distributedCache.cfg)
@@ -570,6 +589,12 @@ func (distributedCache *DistributedCache) Configure(_ bool) error {
570589
uint64(math.Ceil(float64(numMVs*int64(distributedCache.cfg.Replicas)) /
571590
float64(distributedCache.cfg.MaxRVs)))
572591

592+
// For ring based MV placement, we don't want to limit MVsPerRV, set it very high.
593+
if cm.RingBasedMVPlacement {
594+
log.Info("DistributedCache::Configure : Forcing high MVsPerRV for RingBasedMVPlacement")
595+
distributedCache.cfg.MVsPerRV = 10000
596+
}
597+
573598
log.Info("DistributedCache::Configure : cfg.MVsPerRV: %d, minMVs: %d, maxMVs: %d, replicas: %d, maxRVs: %d",
574599
distributedCache.cfg.MVsPerRV, minMVs, maxMVs, distributedCache.cfg.Replicas, distributedCache.cfg.MaxRVs)
575600

@@ -1678,6 +1703,9 @@ func init() {
16781703
ignoreUD := config.AddBoolFlag("ignore-ud", defaultIgnoreUD, "Ignore VM update domain for MV placement decisions")
16791704
config.BindPFlag(compName+".ignore-ud", ignoreUD)
16801705

1706+
ringBasedMVPlacement := config.AddBoolFlag("ring-based-mv-placement", defaultRingBasedMVPlacement, "Use ring based MV placement algorithm")
1707+
config.BindPFlag(compName+".ring-based-mv-placement", ringBasedMVPlacement)
1708+
16811709
readIOMode := config.AddStringFlag("read-io-mode", rpc.DirectIO, "IO mode for reading chunk files (direct/buffered)")
16821710
config.BindPFlag(compName+".read-io-mode", readIOMode)
16831711

internal/dcache/cluster_manager/cluster_manager.go

Lines changed: 135 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2209,8 +2209,9 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
22092209
common.Assert(cm.IsValidRVMap(rvMap))
22102210
common.Assert(cm.IsValidMvMap(existingMVMap, NumReplicas))
22112211

2212-
log.Debug("ClusterManager::updateMVList: TotalRVs: %d, NumReplicas: %d, MVsPerRVForNewMV: %d, MVsPerRVForFixMV: %d, ExistingMVs: %d",
2213-
len(rvMap), NumReplicas, cm.MVsPerRVForNewMV, cm.MVsPerRVForFixMV.Load(), len(existingMVMap))
2212+
log.Debug("ClusterManager::updateMVList: RingBasedMVPlacement: %v, TotalRVs: %d, NumReplicas: %d, MVsPerRVForNewMV: %d, MVsPerRVForFixMV: %d, ExistingMVs: %d",
2213+
cm.RingBasedMVPlacement, len(rvMap), NumReplicas, cm.MVsPerRVForNewMV,
2214+
cm.MVsPerRVForFixMV.Load(), len(existingMVMap))
22142215

22152216
//
22162217
//
@@ -2238,6 +2239,18 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
22382239
// This continues till we do not have enough RVs (from distinct nodes) for creating
22392240
// a new MV. This is the new-mv workflow.
22402241
//
2242+
// Update: With the new ring based sliding window placement algorithm, the placement
2243+
// DOES NOT consider the slot count, instead it uses the ring position of the RVs
2244+
// for the placement, e.g., mv0 will be placed on rv0, rv1, rv2 (assuming NumReplicas=3),
2245+
// and mv1 will be placed on rv1, rv2, rv3 and so on. This placement prevents
2246+
// deeply connected network between various nodes, infact each RV is connected to only
2247+
// one upstream RV. This is particularly important for PutChunkDC where heavily
2248+
// connected network can cause too many connections to/from each node. With ring based
2249+
// placement, each RV must have connections to only its upstream RV(s).
2250+
// TODO: With the ring based placement, we don't need to maintain slots count, but that helps
2251+
// to know how many MVs and RV is part of, so we keep it for now. Later we can remove it
2252+
// if we make ring based placement the only placement algorithm.
2253+
//
22412254

22422255
log.Debug("ClusterManager::updateMVList: Updating current MV list according to the latest RV list (%d RVs, %d MVs) [%s to run]",
22432256
len(rvMap), len(existingMVMap), completeBy.Sub(time.Now()))
@@ -2308,7 +2321,15 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
23082321

23092322
//
23102323
// From availableRVsMap, this will create equivalent availableRVsList, list of RVs that are available for
2311-
// placing MVs. The RVs are sorted by the number of slots available, so that the RVs with more slots are at
2324+
// placing MVs.
2325+
// The RVs are sorted according to the placement algorithm used.
2326+
//
2327+
// For RingBasedMVPlacement,
2328+
// The RVs are sorted by their rv names in numeric sort order (rv2 < rv10). This is needed
2329+
// by the ring based sliding window placement algorithm.
2330+
//
2331+
// For non-RingBasedMVPlacement,
2332+
// The RVs are sorted by the number of slots available, so that the RVs with more slots are at
23122333
// the front so they are picked first for placing MVs, thus resulting in a more balanced distribution of MVs
23132334
// across the RVs over time as nodes go down and come up.
23142335
//
@@ -2340,6 +2361,11 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
23402361
//
23412362
usedSlots := uint32(cm.MVsPerRVForFixMV.Load()) - uint32(rv.slots)
23422363

2364+
//
2365+
// With the new ring based placement algorithm, we do not need to check for slots, but
2366+
// see deleteRVsFromAvailableMap() how it uses slots=0 to mark an RV as deleted and not
2367+
// available for placement.
2368+
//
23432369
if rv.slots == 0 || (newMV && usedSlots >= uint32(cm.MVsPerRVForNewMV)) {
23442370
continue
23452371
}
@@ -2368,10 +2394,24 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
23682394
_ = numAvailableFDs
23692395
_ = numAvailableUDs
23702396

2371-
// Sort the RVs by the number of slots available, in descending order.
2372-
sort.Slice(availableRVsList, func(i, j int) bool {
2373-
return availableRVsList[i].slots > availableRVsList[j].slots
2374-
})
2397+
// Sort the RVs by rvName in numeric sort order, so that rv2 comes before rv10.
2398+
if cm.RingBasedMVPlacement {
2399+
sort.Slice(availableRVsList, func(i, j int) bool {
2400+
var rvi, rvj int
2401+
2402+
_, err1 := fmt.Sscanf(availableRVsList[i].rvName, "rv%d", &rvi)
2403+
_ = err1
2404+
common.Assert(err1 == nil, err1, availableRVsList[i].rvName)
2405+
_, err1 = fmt.Sscanf(availableRVsList[j].rvName, "rv%d", &rvj)
2406+
common.Assert(err1 == nil, err1, availableRVsList[j].rvName)
2407+
2408+
return rvi < rvj
2409+
})
2410+
} else {
2411+
sort.Slice(availableRVsList, func(i, j int) bool {
2412+
return availableRVsList[i].slots > availableRVsList[j].slots
2413+
})
2414+
}
23752415

23762416
log.Debug("ClusterManager::getAvailableRVsList: Available RVs: %d, nodes: %d, FD: %d, UD: %d",
23772417
len(availableRVsList), numAvailableNodes, numAvailableFDs, numAvailableUDs)
@@ -2414,6 +2454,8 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
24142454
rv.slots--
24152455
log.Debug("ClusterManager::consumeRVSlot: Consumed slot for %s/%s, (used: %d, remaining: %d)",
24162456
rvName, mvName, int(cm.MVsPerRVForFixMV.Load())-rv.slots, rv.slots)
2457+
// With the ring based placement, we should never exhaust slots for an RV.
2458+
common.Assert(!cm.RingBasedMVPlacement || rv.slots > 0, rvName, mvName, nodeId, availableRVsMap)
24172459
} else {
24182460
log.Warn("ClusterManager::consumeRVSlot: %s/%s has more than %d MV replicas placed!",
24192461
rvName, mvName, cm.MVsPerRVForFixMV.Load())
@@ -2463,6 +2505,11 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
24632505
// duly updated.
24642506
//
24652507
fixMV := func(mvName string, mv dcache.MirroredVolume) {
2508+
var mvSuffix int
2509+
2510+
_, err := fmt.Sscanf(mvName, "mv%d", &mvSuffix)
2511+
common.Assert(err == nil, mvName, err)
2512+
24662513
//
24672514
// Fix-mv must be run only for degraded MVs.
24682515
// A degraded MV has one or more (but not all) component RVs as offline (which need to be replaced by
@@ -2594,11 +2641,11 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
25942641
// - Does not come from any fault domain in excludeFaultDomains list.
25952642
// - Does not come from any update domain in excludeUpdateDomains list.
25962643
// - Has same or higher availableSpace.
2644+
// - Is the first suitable RV in the ring (for ring based sliding window placement).
25972645
//
25982646
// Caller creates availableRVsList which is a list of available RVs that can be used to replace the
2599-
// offline component RVs. This is a sorted list with more suitable RVs at the front, so that we are
2600-
// more likely to pick more suitable RVs first, thus resulting in a balanced distribution of MVs across
2601-
// the RVs. We then iterate over the availableRVsList list and pick the 1st suitable RV.
2647+
// offline component RVs.
2648+
// This is a sorted differently based on RingBasedMVPlacement, see getAvailableRVsList() for details.
26022649
// As we pick RVs we update availableRVsMap which also updates availableRVsList as it is a slice of
26032650
// those pointers that availableRVsMap refers to.
26042651
//
@@ -2698,6 +2745,15 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
26982745

26992746
//
27002747
// Iterate over the availableRVsList and pick the first suitable RV.
2748+
//
2749+
// If RingBasedMVPlacement is true:
2750+
// availableRVsList is sorted by rv names in ascending order, simulating a ring of RVs in
2751+
// numeric order. We always pick the component RVs from this ring in ascending order, the goal
2752+
// is to minimize RV<->RV connections, i.e., an RV need to connect to only few other RVs for
2753+
// PutChunkDC daisy chaining. We pick the starting index in the ring based on mvSuffix, as
2754+
// that's what new-mv workflow does.
2755+
//
2756+
// If RingBasedMVPlacement is false:
27012757
// availableRVsList is sorted by the number of slots available, so that the RVs with more slots
27022758
// are at the front so they are picked first for placing MVs, thus resulting in a more balanced
27032759
// distribution of MVs across the RVs over time as nodes go down and come up.
@@ -2706,7 +2762,13 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
27062762
// should run very very fast, as we need to fix all the degraded MVs in a short time.
27072763
// Avoid any string key'ed map lookups, as they are slow, and any thing else that's slow.
27082764
//
2709-
for _, rv := range availableRVsList {
2765+
startIdx := mvSuffix % len(availableRVsList)
2766+
if !cm.RingBasedMVPlacement {
2767+
startIdx = 0
2768+
}
2769+
2770+
for idx := startIdx; idx < len(availableRVsList)+startIdx; idx++ {
2771+
rv := availableRVsList[idx%len(availableRVsList)]
27102772
// Max slots for an RV is MVsPerRVForFixMV.
27112773
common.Assert(rv.slots <= int(cm.MVsPerRVForFixMV.Load()), *rv, cm.MVsPerRVForFixMV.Load())
27122774
common.Assert(rv.slots >= 0, *rv)
@@ -2939,6 +3001,13 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
29393001
udId: rvInfo.UDId,
29403002
slots: int(cm.MVsPerRVForFixMV.Load()),
29413003
}
3004+
3005+
//
3006+
// Note: With the new sliding window based placement, slots must be set very high so that they
3007+
// never exhaust.
3008+
//
3009+
common.Assert(!cm.RingBasedMVPlacement || availableRVsMap[rvName].slots >= 1000,
3010+
rvName, availableRVsMap[rvName].slots)
29423011
}
29433012

29443013
// Cannot have more available RVs than total RVs.
@@ -3233,38 +3302,66 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
32333302

32343303
for {
32353304
//
3236-
// We need at least NumReplicas RVs with free slots to create a new MV.
3305+
// If we don't have enough RVs to get NumReplicas unique RVs for a new MV, we cannot create any new MV.
32373306
//
32383307
if len(availableRVsList) < NumReplicas {
32393308
log.Debug("ClusterManager::updateMVList: len(availableRVsList) [%d] < NumReplicas [%d]",
32403309
len(availableRVsList), NumReplicas)
32413310
break
32423311
}
32433312

3313+
//
3314+
// For RingBasedMVPlacement we can have at most as many MVs as number of RVs since we make one
3315+
// MV for each RV.
3316+
//
3317+
// TODO: Add more MVs to ensure that each MV has reasonable data and thus replication can
3318+
// finish fast. Many small MVs being replicated in parallel is better than one large
3319+
// MV.
3320+
//
3321+
if cm.RingBasedMVPlacement && len(existingMVMap) >= len(availableRVsList) {
3322+
log.Debug("ClusterManager::updateMVList: len(existingMVMap) [%d] >= len(availableRVsList) [%d]",
3323+
len(existingMVMap), len(availableRVsList))
3324+
break
3325+
}
3326+
32443327
// New MV's name, starting from index 0.
3245-
mvName := fmt.Sprintf("mv%d", len(existingMVMap))
3328+
mvSuffix := len(existingMVMap)
3329+
mvName := fmt.Sprintf("mv%d", mvSuffix)
32463330

32473331
excludeNodes := make(map[int]struct{})
32483332
excludeFaultDomains := make(map[int]struct{})
32493333
excludeUpdateDomains := make(map[int]struct{})
32503334

32513335
//
32523336
// Iterate over the availableRVsList and pick the first suitable RV.
3337+
//
3338+
// For non-RingBasedMVPlacement:
32533339
// For each MV we start from a random index in availableRVsList (and choose next NumReplicas suitable RVs).
32543340
// This ensures that we use RVs uniformly instead of exhausting RVs from the beginning and leaving upto
32553341
// NumReplicas-1 unused RVs at the end which cannot be used to create a new MV.
32563342
//
3343+
// For RingBasedMVPlacement:
3344+
// We start from a fixed RV corresponding to the MV suffix.
3345+
//
32573346
// Note: Since number of RVs can be very large (100K+) we need to be careful that this loop is very
32583347
// efficient, avoid any string key'ed map lookups, as they are slow, and any thing else that's slow.
32593348
// Note: This is O(number of MVs created) as it creates each new MV sequentially. Each MV creation involves
32603349
// sending JoinMV RPC to all component RVs (in parallel), which will be hard to get below 1ms, so for
32613350
// 20K MVs, it'll take ~20s to create all MVs, which should be fine.
32623351
//
3352+
32633353
startIdx := rand.Intn(len(availableRVsList))
32643354
if numNewRVs > 0 {
32653355
startIdx = rand.Intn(numNewRVs)
32663356
}
32673357

3358+
if cm.RingBasedMVPlacement {
3359+
startIdx = mvSuffix % len(availableRVsList)
3360+
}
3361+
3362+
log.Debug("ClusterManager::updateMVList: Placing new MV %s, startIdx: %d, numNewRVs: %d, availableRVs: %d",
3363+
mvName, startIdx, numNewRVs, len(availableRVsList))
3364+
32683365
for idx := startIdx; idx < len(availableRVsList)+startIdx; idx++ {
32693366
rv := availableRVsList[idx%len(availableRVsList)]
32703367
usedSlots := int(cm.MVsPerRVForFixMV.Load()) - rv.slots
@@ -3280,6 +3377,9 @@ func (cmi *ClusterManager) updateMVList(clusterMap *dcache.ClusterMap, completeB
32803377
//
32813378
// TODO: See if removing "full" RVs from availableRVsList is good for performance with lot of RVs.
32823379
//
3380+
3381+
// With the ring based placement, we should never exclude an RV due to slot exhaustion.
3382+
common.Assert(!cm.RingBasedMVPlacement, *rv, cm.MVsPerRVForNewMV, usedSlots)
32833383
continue
32843384
}
32853385

@@ -4629,6 +4729,28 @@ func (cmi *ClusterManager) batchUpdateComponentRVState(msgBatch []*dcache.Compon
46294729
log.Debug("ClusterManager::batchUpdateComponentRVState: %s/%s, state change (%s -> %s)",
46304730
rvName, mvName, currentState, rvNewState)
46314731
successCount++
4732+
} else if currentState == dcache.StateInbandOffline && rvNewState == dcache.StateSyncing ||
4733+
currentState == dcache.StateInbandOffline && rvNewState == dcache.StateOnline {
4734+
//
4735+
// An RV can move to inband-offline from technically any state, so the following requested
4736+
// transitions decompose to:
4737+
// {StateOutOfSync -> StateSyncing} -> {StateInbandOffline -> StateSyncing}
4738+
// {StateSyncing -> StateOnline } -> {StateInbandOffline -> StateOnline}
4739+
//
4740+
// This happens when some thread has submitted these transitions but due to some IO error
4741+
// when accessing those RVs some other thread marked those RVs as inband-offline and that
4742+
// transition got processed before this one.
4743+
// Since we cannot perform originally requested transitions anymore, fail those requests.
4744+
//
4745+
// Note that currentState->rvNewState was not the originally requested transition, but we
4746+
// don't have the original request here, so we just log currentState.
4747+
//
4748+
msg.Err <- fmt.Errorf("%s/%s state change (<%s> -> %s) no longer valid",
4749+
rvName, mvName, currentState, rvNewState)
4750+
close(msg.Err)
4751+
msg.Err = nil
4752+
failureCount++
4753+
continue
46324754
} else {
46334755
//
46344756
// Following transitions are reported when an inband PutChunk failure suggests an RV as offline.

0 commit comments

Comments
 (0)