Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package io.reactivex.mantis.network.push;

import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -37,12 +41,23 @@ public class ConsistentHashingRouter<K, V> extends Router<KeyValuePair<K, V>> {
private static long validCacheAgeMSec = 5000;
private HashFunction hashFunction;
private AtomicReference<SnapshotCache<SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>>>> cachedRingRef = new AtomicReference<>();
private final MetricGroupId metricGroup = new MetricGroupId("ConsistentHashingRouter");
private final Metrics metrics;
private final Counter collisionsCounter;

public ConsistentHashingRouter(String name,
Func1<KeyValuePair<K, V>, byte[]> dataEncoder,
HashFunction hashFunction) {
super("ConsistentHashingRouter_" + name, dataEncoder);
this.hashFunction = hashFunction;

Metrics metrics = new Metrics.Builder()
.id(metricGroup)
.addCounter("numHashCollisions")
.build();

this.metrics = MetricsRegistry.getInstance().registerAndGet(metrics);
this.collisionsCounter = this.metrics.getCounter("numHashCollisions");
}

@Override
Expand Down Expand Up @@ -98,6 +113,7 @@ private AsyncConnection<KeyValuePair<K, V>> lookupConnection(long hash, SortedMa

private void computeRing(Set<AsyncConnection<KeyValuePair<K, V>>> connections) {
SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> ring = new TreeMap<Long, AsyncConnection<KeyValuePair<K, V>>>();
Map<Long, List<String>> collisions = new HashMap<>();
for (AsyncConnection<KeyValuePair<K, V>> connection : connections) {
for (int i = 0; i < connectionRepetitionOnRing; i++) {
// hash node on ring
Expand All @@ -108,12 +124,21 @@ private void computeRing(Set<AsyncConnection<KeyValuePair<K, V>>> connections) {
byte[] connectionBytes = (connectionId + "-" + i).getBytes();
long hash = hashFunction.computeHash(connectionBytes);
if (ring.containsKey(hash)) {
logger.error("Hash collision when computing ring. {} hashed to a value already in the ring.", connectionId + "-" + i);
this.collisionsCounter.increment();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might still be nice to have logging for which connections are duplicates. Could we maybe track at the connection level (instead of the partition level) and emit one log line with all of the duplicate connections (not virtual nodes in the hash ring)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a debug log that should come out in a structure like:

{
  <hash>: [
    0: <first connection with hash>
    1..N: <colliding connections>
  ]
}

I chose a debug log that we'll have to explicitly opt into because I expect the structure to be huge and potentially cause this pausing like behavior we've been seeing today.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and toString() contains all the interesting bits of info we'd need, taken from AsyncConnection.java:

    @Override
    public String toString() {
        return "AsyncConnection [host=" + host + ", port=" + port
            + ", groupId=" + groupId + ", slotId=" + slotId + ", id=" + id
            + "]";
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think toString will be called by the logger lib so I don't need to call it myself

if(!collisions.containsKey(hash)) {
collisions.put(hash, new ArrayList<>());
collisions.get(hash).add(ring.get(hash).getSlotId());
}

collisions.get(hash).add(connection.toString());
}
ring.put(hash, connection);
}
}
cachedRingRef.set(new SnapshotCache<SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>>>(ring));
if(!collisions.isEmpty()) {
logger.debug("hash collisions were found while recomputing ring: {}", collisions);
}
}

private SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> hashConnections(Set<AsyncConnection<KeyValuePair<K, V>>> connections) {
Expand Down
Loading