Skip to content

Commit d97d83c

Browse files
committed
Merge branch 'sbi2' into jtnystrom/compressed_input
2 parents 88a50c6 + f20f493 commit d97d83c

29 files changed

+318
-107
lines changed

README.md

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@ and the fraction of reads assigned to each taxon.
1414

1515
Slacken is based on Apache Spark and is thus a distributed application. It can run on a single machine, but can
1616
also scale to a cluster with hundreds or thousands of machines. It does not keep all data in RAM during processing, but
17-
processes data in batches.
17+
processes data in batches. On a 16-core PC, Slacken needs only 16 GB of RAM to classify with the genomes from the Kraken 2 standard library.
1818

19-
We do not currently support translated mode (protein/AA sequence classification) but only nucleotide sequences. Also,
19+
Unfortunately, Slacken does not currently support translated mode (protein/AA sequence classification) but only nucleotide sequences. Also,
2020
Slacken has its own database format (Parquet based) and can not use pre-built Kraken 2 databases as they are.
2121

2222
For more motivation and details, please see [our 2025 paper in NAR Genomics and Bioinformatics](https://academic.oup.com/nargab/article/7/2/lqaf076/8158581).
2323

24+
**Users of version 1.x, please note the new command line syntax in version 2.0.** All commands and examples in this
25+
README and on the Wiki have been updated. [See the commands overview.](https://github.com/JNP-Solutions/Slacken/wiki/Slacken-commands-overview)
26+
2427
Copyright (c) Johan Nyström-Persson 2019-2025.
2528

2629
## Contents
@@ -168,7 +171,7 @@ Here,
168171
* `--reads 100` is the threshold for including a taxon in the initial set (R100).
169172
* `-l /data/standard-224c` is required, and indicates where genomes for library building may be found.
170173
* `--bracken-length 150` specifies that Bracken weights for the given read length (150) should be generated. That can be slow, and
171-
also requires extra space, so we recommend omitting `--bracken-length` when Bracken is not needed.
174+
also requires extra space, so we recommend omitting `--bracken-length` when Bracken is not needed. When generating Bracken weights, we recommend giving Slacken at least 32 GB of RAM.
172175

173176
When the command has finished, the following files will be generated:
174177

@@ -404,13 +407,17 @@ These options may also be permanently configured by editing `slacken.sh`.
404407

405408
Slacken can run on AWS EMR (Elastic MapReduce) and should also work similarly on other commercial cloud providers
406409
that support Apache Spark. In this scenario, data can be stored on AWS S3 and the computation can run on a mix of
407-
on-demand and spot (interruptible) instances. We refer the reader to the AWS EMR documentation for more details.
410+
on-demand and spot (interruptible) instances.
411+
412+
A [tutorial on Slacken with AWS EMR](https://github.com/JNP-Solutions/Slacken/wiki/Classifying-metagenomic-samples-on-AWS-Elastic-MapReduce)
413+
is available. The tutorial shows how to use Slacken to classify samples using the public indexes on AWS S3.
408414

409415
The cluster configuration we generally recommend is 4 GB RAM per CPU (but 2 GB per CPU may be enough for small workloads).
410416
For large workloads, the worker nodes should have fast physical hard drives, such as NVMe. On EMR Spark will automatically use
411417
these drives for temporary space. We have found the m7gd and m6gd machine families to work well.
412418

413-
To run on AWS EMR, first, install the AWS CLI.
419+
The tutorial above shows how to run Slacken using the EMR GUI. You can also run it on EMR from the command line.
420+
To do this, first install the [AWS CLI](https://aws.amazon.com/cli/).
414421
Copy `slacken-aws.sh.template` to a new file, e.g. `slacken-aws.sh` and edit the file to configure
415422
some settings such as the S3 bucket to use for the Slacken jar. Then, create the AWS EMR cluster. You will receive a
416423
cluster ID, either from the web GUI or from the CLI. Set the `AWS_EMR_CLUSTER` environment variable to this id:

build.sbt

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,24 @@ name := "Slacken"
22

33
version := "2.0.0"
44

5-
scalaVersion := "2.12.20"
5+
lazy val scala212 = "2.12.20"
6+
7+
lazy val scala213 = "2.13.15"
8+
9+
lazy val supportedScalaVersions = List(scala212, scala213)
10+
11+
ThisBuild / scalaVersion := scala212
12+
13+
lazy val root = (project in file(".")).
14+
settings(
15+
crossScalaVersions := supportedScalaVersions,
16+
libraryDependencies ++= {
17+
CrossVersion.partialVersion(scalaVersion.value) match {
18+
case Some((2, 13)) => List("org.scala-lang.modules" %% "scala-parallel-collections" % "1.0.0")
19+
case _ => Nil
20+
}
21+
}
22+
)
623

724
val sparkVersion = "3.5.0"
825

@@ -22,6 +39,8 @@ libraryDependencies += "org.scalatest" %% "scalatest" % "latest.integration" % "
2239

2340
libraryDependencies += "org.scalatestplus" %% "scalacheck-1-18" % "latest.integration" % "test"
2441

42+
libraryDependencies += "org.scala-lang.modules" %% "scala-collection-compat" % "2.13.0"
43+
2544
//The "provided" configuration prevents sbt-assembly from including spark in the packaged jar.
2645
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
2746

src/main/scala/com/jnpersson/fastdoop/IndexedFastaReader.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
2424
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
2525

2626
import scala.io.Source
27+
import scala.collection.BufferedIterator
2728

2829
/**
2930
* FAI (fasta index) record.

src/main/scala/com/jnpersson/kmers/HDFSUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ object HDFSUtil {
5353
new Iterator[T] {
5454
def hasNext: Boolean = rit.hasNext
5555

56-
def next: T = rit.next
56+
def next(): T = rit.next
5757
}
5858

5959
private def files(path: String)(implicit spark: SparkSession): Iterator[LocatedFileStatus] = {

src/main/scala/com/jnpersson/kmers/MinimizerCLIConf.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,16 @@ trait MinimizerCLIConf {
6666
this: ScallopConf =>
6767

6868
protected def defaultK = 35
69-
val k = opt[Int](descr = s"Length of each k-mer", default = Some(defaultK))
69+
val k = opt[Int](descr = "Length of each k-mer", default = Some(defaultK))
7070

7171
protected def defaultMinimizerWidth = 10
72-
val minimizerWidth = opt[Int](name = "m", descr = s"Width of minimizers",
72+
val minimizerWidth = opt[Int](name = "m", descr = "Width of minimizers",
7373
default = Some(defaultMinimizerWidth))
7474

7575
validate (k) { k =>
7676
if (minimizerWidth() > k) {
7777
Left("-m must be <= -k")
78-
} else Right(Unit)
78+
} else Right(())
7979
}
8080

8181
protected def defaultOrdering: String = "lexicographic"
@@ -115,7 +115,7 @@ trait MinimizerCLIConf {
115115
def defaultMinimizerSpaces: Int = 0
116116

117117
val minimizerSpaces = opt[Int](name = "spaces",
118-
descr = s"Number of masked out nucleotides in minimizer (spaced seed)",
118+
descr = "Number of masked out nucleotides in minimizer (spaced seed)",
119119
default = Some(defaultMinimizerSpaces))
120120

121121
/** Apply a spaced seed mask to minimizer priorities */

src/main/scala/com/jnpersson/kmers/SparkTool.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private[jnpersson] abstract class SparkTool(appName: String) {
3333
enableHiveSupport().
3434
getOrCreate()
3535

36-
//BareLocalFileSystem bypasses the need for winutils.exe on Windows and does no harm on other OS's
36+
//BareLocalFileSystem bypasses the need for winutils.exe on Windows and does no harm on other OS's
3737
//This affects access to file:/ paths (effectively local files)
3838
sp.sparkContext.hadoopConfiguration.
3939
setClass("fs.file.impl", classOf[BareLocalFileSystem], classOf[FileSystem])
@@ -58,6 +58,7 @@ object SparkTool {
5858
}
5959
}
6060

61+
//noinspection TypeAnnotation
6162
trait HasInputReader {
6263
this: ScallopConf =>
6364

@@ -69,7 +70,7 @@ trait HasInputReader {
6970
* CLI configuration for a Spark-based application.
7071
*/
7172
//noinspection TypeAnnotation
72-
class SparkConfiguration(args: Array[String])(implicit val spark: SparkSession) extends ScallopConf(args) {
73+
class SparkConfiguration(args: Seq[String])(implicit val spark: SparkSession) extends ScallopConf(args) {
7374
protected val showAllOpts =
7475
args.contains("--detailed-help") //to make this value available during the option construction stage
7576

src/main/scala/com/jnpersson/kmers/SplitterFormat.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import com.jnpersson.kmers.minimizer._
2121
import org.apache.spark.sql.SparkSession
2222

2323
import java.util.Properties
24+
import scala.collection.compat.immutable.ArraySeq
2425

2526
/** Logic for persisting minimizer formats (ordering and parameters) to files.
2627
* @param P the type of MinimizerPriorities that is being managed.

src/main/scala/com/jnpersson/kmers/input/FileInputs.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import org.apache.spark.sql.expressions.Window
2727
import org.apache.spark.sql.functions.{collect_list, element_at, lit, monotonically_increasing_id, slice, substring}
2828
import org.apache.spark.sql.{Dataset, SparkSession}
2929

30+
import scala.collection.parallel.immutable.ParVector
31+
import scala.collection.compat._
3032

3133
/**
3234
* A set of input files that can be parsed into [[InputFragment]]
@@ -109,7 +111,7 @@ class FileInputs(val files: Seq[String], k: Int, inputGrouping: InputGrouping =
109111
case _ =>
110112
expandedFiles.map(forFile)
111113
}
112-
val fs = readers.par.map(_.getInputFragments(withAmbiguous, sampleFraction)).seq
114+
val fs = readers.to(ParVector).map(_.getInputFragments(withAmbiguous, sampleFraction)).seq
113115
spark.sparkContext.union(fs.map(_.rdd)).toDS()
114116
}
115117

@@ -208,7 +210,7 @@ class FastqTextInput(file: String)(implicit spark: SparkSession) extends HadoopI
208210
}.rdd
209211

210212
def getSequenceTitles: Dataset[SeqTitle] =
211-
rdd.map(x => x(0)).toDS
213+
rdd.map(x => x(0)).toDS()
212214

213215
protected[input] def getFragments(): Dataset[InputFragment] =
214216
rdd.map(ar => {
@@ -239,7 +241,7 @@ class IndexedFastaInput(file: String, k: Int)(implicit spark: SparkSession)
239241
sc.newAPIHadoopFile(input, classOf[IndexedFastaFormat], classOf[Text], classOf[PartialSequence], conf).values
240242

241243
def getSequenceTitles: Dataset[SeqTitle] =
242-
rdd.map(_.getKey).toDS.distinct
244+
rdd.map(_.getKey).toDS().distinct
243245

244246
protected[input] def getFragments(): Dataset[InputFragment] = {
245247
val k = this.k
@@ -267,6 +269,6 @@ class IndexedFastaInput(file: String, k: Int)(implicit spark: SparkSession)
267269
val key = partialSeq.getKey.split(" ")(0)
268270
makeInputFragment(key, partialSeq.getSeqPosition, partialSeq.getBuffer, start, useEnd)
269271
}
270-
}).toDS
272+
}).toDS()
271273
}
272274
}

src/main/scala/com/jnpersson/kmers/input/InputReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class PairedInputReader(lhs: InputReader, rhs: InputReader)(implicit spark: Spar
106106
import spark.sqlContext.implicits._
107107
import PairedInputReader._
108108

109-
protected[input] def getFragments: Dataset[InputFragment] = {
109+
protected[input] def getFragments(): Dataset[InputFragment] = {
110110
/* As we currently have no input format that correctly handles paired reads, joining the reads by
111111
header is the best we can do (and still inexpensive in the big picture).
112112
Otherwise, it is hard to guarantee that they would be paired up correctly.

src/main/scala/com/jnpersson/kmers/minimizer/MinSplitter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ final case class MinSplitter[+P <: MinimizerPriorities](priorities: P, k: Int) {
137137
new Iterator[Supermer] {
138138
def hasNext: Boolean = window.hasNext
139139

140-
def next: Supermer = {
140+
def next(): Supermer = {
141141
val p = window.next
142142

143143
//TODO INVALID handling for computed priorities
@@ -184,7 +184,7 @@ final case class MinSplitter[+P <: MinimizerPriorities](priorities: P, k: Int) {
184184
new Iterator[Minimizer] {
185185
def hasNext: Boolean = window.hasNext
186186

187-
def next: Minimizer = {
187+
def next(): Minimizer = {
188188
val p = window.next
189189

190190
if (!matches.isValid(p)) {

0 commit comments

Comments
 (0)