diff --git a/src/main/scala/com/kwartile/lib/cc/ConnectedComponent.scala b/src/main/scala/com/kwartile/lib/cc/ConnectedComponent.scala index 8acc51b..b97f0a9 100644 --- a/src/main/scala/com/kwartile/lib/cc/ConnectedComponent.scala +++ b/src/main/scala/com/kwartile/lib/cc/ConnectedComponent.scala @@ -39,7 +39,7 @@ object ConnectedComponent extends Serializable { * @param nodePairs on which to apply Small Star operations * @return new nodePairs after the operation and conncectivy change count */ - private def smallStar(nodePairs: RDD[(Long, Long)]): (RDD[(Long, Long)], Int) = { + private def smallStar(nodePairs: RDD[(Long, Long)]): (RDD[(Long, Long)], Long) = { /** * generate RDD of (self, List(neighbors)) where self > neighbors @@ -99,7 +99,7 @@ object ConnectedComponent extends Serializable { val (v, l) = iter.toSeq.unzip val sum = l.foldLeft(0)(_ + _) Iterator(sum) - }).sum.toInt + }).sum.toLong val newNodePairs = newNodePairsWithChangeCount.map(x => x._1).flatMap(x => x) newNodePairsWithChangeCount.unpersist(false) @@ -111,7 +111,7 @@ object ConnectedComponent extends Serializable { * @param nodePairs on which to apply Large Star operations * @return new nodePairs after the operation and conncectivy change count */ - private def largeStar(nodePairs: RDD[(Long, Long)]): (RDD[(Long, Long)], Int) = { + private def largeStar(nodePairs: RDD[(Long, Long)]): (RDD[(Long, Long)], Long) = { /** * generate RDD of (self, List(neighbors)) @@ -165,7 +165,7 @@ object ConnectedComponent extends Serializable { val (v, l) = iter.toSeq.unzip val sum = l.foldLeft(0)(_ + _) Iterator(sum) - }).sum.toInt + }).sum.toLong /** * Sum all the changeCounts @@ -229,7 +229,7 @@ object ConnectedComponent extends Serializable { @tailrec private def alternatingAlgo(nodePairs: RDD[(Long, Long)], - largeStarConnectivityChangeCount: Int, smallStarConnectivityChangeCount: Int, didConverge: Boolean, + largeStarConnectivityChangeCount: Long, smallStarConnectivityChangeCount: Long, didConverge: Boolean, currIterationCount: Int, maxIterationCount: Int): (RDD[(Long, Long)], Boolean, Int) = { val iterationCount = currIterationCount + 1 @@ -269,7 +269,7 @@ object ConnectedComponent extends Serializable { buildPairs(aClique) }).flatMap(x=>x) - val (cc, didConverge, iterCount) = alternatingAlgo(nodePairs, 9999999, 9999999, false, 0, maxIterationCount) + val (cc, didConverge, iterCount) = alternatingAlgo(nodePairs, 9999999L, 9999999L, false, 0, maxIterationCount) if (didConverge) { (cc, didConverge, iterCount)