diff --git a/core/src/main/scala/flatgraph/misc/DedupTable.scala b/core/src/main/scala/flatgraph/misc/DedupTable.scala new file mode 100644 index 00000000..a72702a8 --- /dev/null +++ b/core/src/main/scala/flatgraph/misc/DedupTable.scala @@ -0,0 +1,90 @@ +package flatgraph.misc + +import java.util + +/** A basic LinkedHashSet-like structure, based on an open hashmap. Somewhat surprisingly, it actually pays to store part of the hash in the + * upper bits of the index into tha values array + * + * This is a fastpath to avoid some unneeded expensive isequal-checks. (expensive because they imply following an object reference) + * + * The thing is arranged such that a zero value in pos signifies an empty slot + */ +private[flatgraph] class DedupTable { + var capacity = 1024 + var size = 0 + var strs = new Array[String](capacity) + var pos = new Array[Int](capacity) + + def insert(str: String): Int = { + val cap = capacity + val mask = cap - 1 + if (str == null) return -1 + val h = strengthenHash(str.hashCode) + var p = h & mask + val needle = h & ~mask + while (true) { + val idx0 = pos(p) + val idx = (idx0 & mask) - 1 + if (idx == -1) { + val dst = size + size = size + 1 + strs(dst) = str + pos(p) = (dst + 1) | needle + if (size + (size >> 1) > capacity) grow() + return dst + } else if ((idx0 & ~mask) == needle && str.equals(strs(idx))) return idx + p = p + 1 + if (p == cap) { + // overflow -- rare, so better branch than branchless + p = 0 + } + } + ??? + } + + def insertRehash(str: String): Unit = { + val cap = capacity + val mask = cap - 1 + val h = strengthenHash(str.hashCode) + var p = h & mask + val needle = h & ~mask + while (true) { + val idx0 = pos(p) + val idx = (idx0 & mask) - 1 + if (idx == -1) { + val dst = size + size = size + 1 + pos(p) = (dst + 1) | needle + return + } + p = p + 1 + if (p == cap) { + // overflow -- rare, so better branch than branchless + p = 0 + } + } + ??? + } + + def grow(): Unit = { + val oldsize = size + size = 0 + capacity = capacity * 2 + pos = new Array[Int](capacity) + strs = util.Arrays.copyOf(strs, capacity) + for (idx <- Range(0, oldsize)) insertRehash(strs(idx)) + } + + def strengthenHash(hash0: Int): Int = { + // using the simple murmur 32 bit mixing to strengthen the hash + var hash = hash0 ^ (hash0 >>> 16) + hash *= 0x85ebca6b + hash ^= hash >>> 13 + hash + // murmur does a bit more, but we don't need that. + // hash *= 0xc2b2ae35 + // hash ^= hash >>> 16 + // hash & (capacity - 1) + } + +} diff --git a/core/src/main/scala/flatgraph/storage/Serialization.scala b/core/src/main/scala/flatgraph/storage/Serialization.scala index b9d3128f..30470457 100644 --- a/core/src/main/scala/flatgraph/storage/Serialization.scala +++ b/core/src/main/scala/flatgraph/storage/Serialization.scala @@ -24,7 +24,7 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu val writeQueue = mutable.ArrayDeque[concurrent.Future[Any]]() val jobQueue = mutable.ArrayBuffer[() => (OutlineStorage, Array[Byte])]() val stringQueue = mutable.ArrayDeque[(OutlineStorage, Array[String])]() - val stringpool = mutable.LinkedHashMap[String, Int]() + val stringpool = new flatgraph.misc.DedupTable def submitCompress(block: => (OutlineStorage, ByteBuffer)): Unit = { compressQueue.addOne(executor.submit((() => block))) @@ -46,12 +46,6 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu } } - // NOT threadsafe! - private def insertString(stringPool: mutable.LinkedHashMap[String, Int])(s: String): Int = { - if (s == null) -1 - else stringPool.getOrElseUpdate(s, stringPool.size) - } - private[flatgraph] def encodeAny(item: Any, outlineStorage: OutlineStorage = new OutlineStorage, delta: Int = -1): OutlineStorage = { item match { case _: DefaultValue => null @@ -161,7 +155,7 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu writeItem(item, bytes) } val (item, strings) = stringQueue.removeHead() - val indices = strings.map(insertString(stringpool)) + val indices = strings.map(stringpool.insert) submitCompress { val bytes = new Array[Byte](4 * strings.length) ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(indices) @@ -172,7 +166,7 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu val poolLenBytes = new Array[Byte](4 * stringpool.size) val poolLenBuffer = ByteBuffer.wrap(poolLenBytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer() val poolBytes = new ByteArrayOutputStream() - for (s <- stringpool.keysIterator) { + for (s <- stringpool.strs.iterator.take(stringpool.size)) { val bytes = s.getBytes(StandardCharsets.UTF_8) poolBytes.write(bytes) poolLenBuffer.put(bytes.length)