From e56d177d5b7a51f9c4a5a4c876476007302680e3 Mon Sep 17 00:00:00 2001 From: Johan Nystrom Date: Thu, 19 Jun 2025 20:26:45 +0900 Subject: [PATCH 01/10] Support .gz and .bz2 compression for fastq reads using Spark's window function (to group line) and text reader (to handle compression transparently). First draft. --- .../kmers/input/FastdoopInputs.scala | 23 +++++++++- .../jnpersson/kmers/input/FastqReader.scala | 42 +++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/com/jnpersson/kmers/input/FastqReader.scala diff --git a/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala b/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala index e6c6b0437..7e80c8d99 100644 --- a/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala +++ b/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala @@ -59,9 +59,14 @@ class FileInputs(val files: Seq[String], k: Int, maxReadLength: Int, inputGroupi * obtain an appropriate InputReader for a single file. */ def forFile(file: String): InputReader = { - if (file.toLowerCase.endsWith("fq") || file.toLowerCase.endsWith("fastq")) { + val lower = file.toLowerCase + if (lower.endsWith("fq") || lower.endsWith("fastq")) { println(s"Assuming fastq format for $file, max length $maxReadLength") new FastqShortInput(file, k, maxReadLength) + } else if (lower.endsWith(".fq.gz") || lower.endsWith(".fastq.gz") || + lower.endsWith(".fq.bz2") || lower.endsWith(".fastq.bz2")) { + println(s"Assuming compressed fastq format for $file") + new FastqTextInput(file, k) } else { //Assume fasta format val faiPath = file + ".fai" @@ -102,7 +107,7 @@ class FileInputs(val files: Seq[String], k: Int, maxReadLength: Int, inputGroupi case _ => expandedFiles.map(forFile) } - val fs = readers.map(_.getInputFragments(withAmbiguous, sampleFraction)) + val fs = readers.par.map(_.getInputFragments(withAmbiguous, sampleFraction)).seq spark.sparkContext.union(fs.map(_.rdd)).toDS() } @@ -141,6 +146,10 @@ final case class FragmentParser(k: Int) { case qrec: QRecord => makeInputFragment(qrec.getKey.split(" ")(0), FIRST_LOCATION, qrec.getBuffer, qrec.getStartValue, qrec.getEndValue) + case ar: Array[String] => + val id = ar(0).split(" ")(0) + val nucleotides = ar(1) + InputFragment(id, FIRST_LOCATION, nucleotides, None) case partialSeq: PartialSequence => val kmers = partialSeq.getBytesToProcess val start = partialSeq.getStartValue @@ -234,6 +243,16 @@ class FastqShortInput(file: String, k: Int, maxReadLength: Int) rdd.map(_.getKey).toDS.distinct } +class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends HadoopInputReader[Array[String]](file, k) { + import spark.sqlContext.implicits._ + + override protected def loadFile(input: String): RDD[Array[String]] = + FastqReader.read(input).rdd + + override def getSequenceTitles: Dataset[SeqTitle] = + rdd.map(x => x(0)).toDS +} + /** * Input reader for FASTA files containing potentially long sequences, with a .fai index * FAI indexes can be created with tools such as seqkit. diff --git a/src/main/scala/com/jnpersson/kmers/input/FastqReader.scala b/src/main/scala/com/jnpersson/kmers/input/FastqReader.scala new file mode 100644 index 000000000..6f93164cf --- /dev/null +++ b/src/main/scala/com/jnpersson/kmers/input/FastqReader.scala @@ -0,0 +1,42 @@ +/* + * This file is part of Slacken. Copyright (c) 2019-2025 Johan Nyström-Persson. + * + * Slacken is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Slacken is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Slacken. If not, see . + */ + +package com.jnpersson.kmers.input + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions.{collect_list, floor, lit, monotonically_increasing_id, row_number, slice} + +/** A reader for fastq files that uses Spark's text file input support for transparent decompression. + * This allows support for .gz and .bz2 codecs, among others. The downside is that there will be poor + * parallelism, as each input file will have to be processed in a single partition. + */ +object FastqReader { + def read(file: String)(implicit spark: SparkSession): Dataset[Array[String]] = { + import spark.implicits._ + + spark.read.text(file). + withColumn("file", lit(file)). + withColumn("rowId", monotonically_increasing_id()). + withColumn("recId", //group every 4 rows and give them the same recId + floor( + (row_number().over(Window.partitionBy("file").orderBy("rowId")) - 1) / 4) //monotonically_incr starts at 1 + ). + groupBy("file", "recId").agg(collect_list($"value").as("value")). + select(slice($"value", 1, 2)).as[Array[String]] //Currently preserves only the header and the nucleotide string + } +} From 3e75dc16a285fd6f42af26df24a9c64fb5d32297 Mon Sep 17 00:00:00 2001 From: Johan Nystrom Date: Fri, 20 Jun 2025 14:12:40 +0900 Subject: [PATCH 02/10] Streamline FastdoopInputs architecture. Remove FragmentParser and FastqReader classes. --- .../kmers/input/FastdoopInputs.scala | 133 ++++++++++-------- .../jnpersson/kmers/input/FastqReader.scala | 42 ------ 2 files changed, 76 insertions(+), 99 deletions(-) delete mode 100644 src/main/scala/com/jnpersson/kmers/input/FastqReader.scala diff --git a/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala b/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala index 7e80c8d99..8d91cdf47 100644 --- a/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala +++ b/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala @@ -23,6 +23,8 @@ import com.jnpersson.kmers.minimizer.InputFragment import org.apache.hadoop.conf.{Configuration => HConfiguration} import org.apache.hadoop.io.Text import org.apache.spark.rdd.RDD +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions.{collect_list, floor, lit, monotonically_increasing_id, row_number, slice} import org.apache.spark.sql.{Dataset, SparkSession} @@ -120,59 +122,14 @@ class FileInputs(val files: Seq[String], k: Int, maxReadLength: Int, inputGroupi } } +object HadoopInputReader { + val FIRST_LOCATION = 1 -/** - * Parser for Fastdoop records. - * Splits longer sequences into fragments of a controlled maximum length, optionally sampling them. - * - * @param k length of k-mers - */ -final case class FragmentParser(k: Int) { def makeInputFragment(header: SeqTitle, location: SeqLocation, buffer: Array[Byte], start: Int, end: Int): InputFragment = { val nucleotides = new String(buffer, start, end - start + 1) InputFragment(header, location, nucleotides, None) } - - val FIRST_LOCATION = 1 - - /** Convert a record of a supported type to InputFragment, making any necessary transformations on the way - * to guarantee preservation of all k-mers. - */ - def toFragment(record: AnyRef): InputFragment = record match { - case rec: Record => - makeInputFragment(rec.getKey.split(" ")(0), FIRST_LOCATION, rec.getBuffer, - rec.getStartValue, rec.getEndValue) - case qrec: QRecord => - makeInputFragment(qrec.getKey.split(" ")(0), FIRST_LOCATION, qrec.getBuffer, - qrec.getStartValue, qrec.getEndValue) - case ar: Array[String] => - val id = ar(0).split(" ")(0) - val nucleotides = ar(1) - InputFragment(id, FIRST_LOCATION, nucleotides, None) - case partialSeq: PartialSequence => - val kmers = partialSeq.getBytesToProcess - val start = partialSeq.getStartValue - if (kmers == 0) { - return InputFragment("", 0, "", None) - } - - val extensionPart = new String(partialSeq.getBuffer, start + kmers, k - 1) - val newlines = extensionPart.count(_ == '\n') - - //Newlines will be removed eventually, however we have to compensate for them here - //to include all k-mers properly - //Note: we assume that the extra part does not contain newlines itself - - //Although this code is general, for more than one newline in this part (as the case may be for a large k), - //deeper changes to Fastdoop may be needed. - //This value is 0-based inclusive of end - val end = start + kmers - 1 + (k - 1) + newlines - val useEnd = if (end > partialSeq.getEndValue) partialSeq.getEndValue else end - - val key = partialSeq.getKey.split(" ")(0) - makeInputFragment(key, partialSeq.getSeqPosition, partialSeq.getBuffer, start, useEnd) - } } /** @@ -190,12 +147,8 @@ abstract class HadoopInputReader[R <: AnyRef](file: String, k: Int)(implicit spa protected def loadFile(input: String): RDD[R] protected def rdd: RDD[R] = loadFile(file) - protected[kmers] val parser = FragmentParser(k) - protected[input] def getFragments(): Dataset[InputFragment] = { - val p = parser - rdd.map(p.toFragment).toDS - } + protected[input] def getFragments(): Dataset[InputFragment] } /** @@ -209,16 +162,24 @@ abstract class HadoopInputReader[R <: AnyRef](file: String, k: Int)(implicit spa class FastaShortInput(file: String, k: Int, maxReadLength: Int) (implicit spark: SparkSession) extends HadoopInputReader[Record](file, k) { import spark.sqlContext.implicits._ + import HadoopInputReader._ private val bufsiz = maxReadLength + // sequence data 1000 //ID string and separator characters - conf.set("look_ahead_buffer_size", bufsiz.toString) + conf.set("look_ahead_buffer_size", bufsiz.toString) //fastdoop parameter protected def loadFile(input: String): RDD[Record] = sc.newAPIHadoopFile(input, classOf[FASTAshortInputFileFormat], classOf[Text], classOf[Record], conf).values def getSequenceTitles: Dataset[SeqTitle] = rdd.map(_.getKey).toDS().distinct + + protected[input] def getFragments(): Dataset[InputFragment] = + rdd.map(rec => + makeInputFragment(rec.getKey.split(" ")(0), FIRST_LOCATION, rec.getBuffer, + rec.getStartValue, rec.getEndValue) + ).toDS + } /** @@ -231,26 +192,53 @@ class FastaShortInput(file: String, k: Int, maxReadLength: Int) class FastqShortInput(file: String, k: Int, maxReadLength: Int) (implicit spark: SparkSession) extends HadoopInputReader[QRecord](file, k) { import spark.sqlContext.implicits._ + import HadoopInputReader._ private val bufsiz = maxReadLength * 2 + // sequence and quality data 1000 //ID string and separator characters - conf.set("look_ahead_buffer_size", bufsiz.toString) + conf.set("look_ahead_buffer_size", bufsiz.toString) //fastdoop parameter protected def loadFile(input: String): RDD[QRecord] = sc.newAPIHadoopFile(input, classOf[FASTQInputFileFormat], classOf[Text], classOf[QRecord], conf).values def getSequenceTitles: Dataset[SeqTitle] = rdd.map(_.getKey).toDS.distinct + + protected[input] def getFragments(): Dataset[InputFragment] = + rdd.map(rec => + makeInputFragment(rec.getKey.split(" ")(0), FIRST_LOCATION, rec.getBuffer, + rec.getStartValue, rec.getEndValue) + ).toDS } class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends HadoopInputReader[Array[String]](file, k) { - import spark.sqlContext.implicits._ - override protected def loadFile(input: String): RDD[Array[String]] = - FastqReader.read(input).rdd + import spark.sqlContext.implicits._ + import HadoopInputReader._ + + override protected def loadFile(input: String): RDD[Array[String]] = { + import spark.implicits._ + + spark.read.text(file). + withColumn("file", lit(file)). + withColumn("rowId", monotonically_increasing_id()). + withColumn("recId", //group every 4 rows and give them the same recId + floor( + (row_number().over(Window.partitionBy("file").orderBy("rowId")) - 1) / 4) //monotonically_incr starts at 1 + ). + groupBy("file", "recId").agg(collect_list($"value").as("value")). + select(slice($"value", 1, 2)).as[Array[String]] //Currently preserves only the header and the nucleotide string + }.rdd override def getSequenceTitles: Dataset[SeqTitle] = rdd.map(x => x(0)).toDS + + protected[input] def getFragments(): Dataset[InputFragment] = + rdd.map(ar => { + val id = ar(0).split(" ")(0) + val nucleotides = ar(1) + InputFragment(id, FIRST_LOCATION, nucleotides, None) + }).toDS } /** @@ -263,11 +251,42 @@ class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends */ class IndexedFastaInput(file: String, k: Int)(implicit spark: SparkSession) extends HadoopInputReader[PartialSequence](file, k) { + import spark.sqlContext.implicits._ + import HadoopInputReader._ protected def loadFile(input: String): RDD[PartialSequence] = sc.newAPIHadoopFile(input, classOf[IndexedFastaFormat], classOf[Text], classOf[PartialSequence], conf).values def getSequenceTitles: Dataset[SeqTitle] = rdd.map(_.getKey).toDS.distinct + + protected[input] def getFragments(): Dataset[InputFragment] = { + val k = this.k + + rdd.map(partialSeq => { + val kmers = partialSeq.getBytesToProcess + val start = partialSeq.getStartValue + if (kmers == 0) { + InputFragment("", 0, "", None) + } else { + + val extensionPart = new String(partialSeq.getBuffer, start + kmers, k - 1) + val newlines = extensionPart.count(_ == '\n') + + //Newlines will be removed eventually, however we have to compensate for them here + //to include all k-mers properly + //Note: we assume that the extra part does not contain newlines itself + + //Although this code is general, for more than one newline in this part (as the case may be for a large k), + //deeper changes to Fastdoop may be needed. + //This value is 0-based inclusive of end + val end = start + kmers - 1 + (k - 1) + newlines + val useEnd = if (end > partialSeq.getEndValue) partialSeq.getEndValue else end + + val key = partialSeq.getKey.split(" ")(0) + makeInputFragment(key, partialSeq.getSeqPosition, partialSeq.getBuffer, start, useEnd) + } + }).toDS + } } diff --git a/src/main/scala/com/jnpersson/kmers/input/FastqReader.scala b/src/main/scala/com/jnpersson/kmers/input/FastqReader.scala deleted file mode 100644 index 6f93164cf..000000000 --- a/src/main/scala/com/jnpersson/kmers/input/FastqReader.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * This file is part of Slacken. Copyright (c) 2019-2025 Johan Nyström-Persson. - * - * Slacken is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Slacken is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Slacken. If not, see . - */ - -package com.jnpersson.kmers.input - -import org.apache.spark.sql.{Dataset, SparkSession} -import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.functions.{collect_list, floor, lit, monotonically_increasing_id, row_number, slice} - -/** A reader for fastq files that uses Spark's text file input support for transparent decompression. - * This allows support for .gz and .bz2 codecs, among others. The downside is that there will be poor - * parallelism, as each input file will have to be processed in a single partition. - */ -object FastqReader { - def read(file: String)(implicit spark: SparkSession): Dataset[Array[String]] = { - import spark.implicits._ - - spark.read.text(file). - withColumn("file", lit(file)). - withColumn("rowId", monotonically_increasing_id()). - withColumn("recId", //group every 4 rows and give them the same recId - floor( - (row_number().over(Window.partitionBy("file").orderBy("rowId")) - 1) / 4) //monotonically_incr starts at 1 - ). - groupBy("file", "recId").agg(collect_list($"value").as("value")). - select(slice($"value", 1, 2)).as[Array[String]] //Currently preserves only the header and the nucleotide string - } -} From 809736af9741e95106d2d94eb6f11856597b1501 Mon Sep 17 00:00:00 2001 From: Johan Nystrom Date: Sat, 21 Jun 2025 11:05:11 +0900 Subject: [PATCH 03/10] FastaTextInput (work in progress) to support compressed fasta files --- .../kmers/input/FastdoopInputs.scala | 66 +++++++++++++++---- 1 file changed, 52 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala b/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala index 8d91cdf47..8bcf95b65 100644 --- a/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala +++ b/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala @@ -62,23 +62,24 @@ class FileInputs(val files: Seq[String], k: Int, maxReadLength: Int, inputGroupi */ def forFile(file: String): InputReader = { val lower = file.toLowerCase + val faiPath = file + ".fai" + if (lower.endsWith("fq") || lower.endsWith("fastq")) { - println(s"Assuming fastq format for $file, max length $maxReadLength") - new FastqShortInput(file, k, maxReadLength) + println(s"Assuming fastq format for $file") + new FastqTextInput(file, k) } else if (lower.endsWith(".fq.gz") || lower.endsWith(".fastq.gz") || lower.endsWith(".fq.bz2") || lower.endsWith(".fastq.bz2")) { println(s"Assuming compressed fastq format for $file") new FastqTextInput(file, k) + } else if (HDFSUtil.fileExists(faiPath)) { + println(s"$faiPath found. Using indexed fasta format for $file") + new IndexedFastaInput(file, k) + } else if (lower.endsWith(".gz") || lower.endsWith(".bz2") ) { + println(s"Assuming compressed fasta format for $file") + new FastaTextInput(file, k) } else { - //Assume fasta format - val faiPath = file + ".fai" - if (HDFSUtil.fileExists(faiPath)) { - println(s"$faiPath found. Using indexed fasta format for $file") - new IndexedFastaInput(file, k) - } else { - println(s"$faiPath not found. Assuming simple fasta format for $file, max length $maxReadLength") - new FastaShortInput(file, k, maxReadLength) - } + println(s"$faiPath not found. Assuming simple fasta format for $file") + new FastaTextInput(file, k) } } @@ -211,12 +212,49 @@ class FastqShortInput(file: String, k: Int, maxReadLength: Int) ).toDS } -class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends HadoopInputReader[Array[String]](file, k) { +/** + * Reader for fasta records that are potentially multiline, but small enough to fit into a single string. + * Huge sequences are best processed with the [[IndexedFastaFormat]] instead (.fna files) + * Supports compression via Spark's text reader. + */ +class FastaTextInput(file: String, k: Int)(implicit spark: SparkSession) extends HadoopInputReader[Array[String]](file, k) { + import spark.sqlContext.implicits._ + import HadoopInputReader._ + + protected def loadFile(input: String): RDD[Array[String]] = { + import spark.implicits._ + spark.read.option("lineSep", ">").text(file).as[String]. //allows multi-line fasta records to be separated cleanly + flatMap(x => { + val spl = x.split("[\n\r]+") + if (spl.length >= 2) { + Some(spl) + } else { + None + } + }).rdd + } + + protected[input] def getFragments(): Dataset[InputFragment] = + rdd.map(x => { + val headerLine = x(0) + val id = headerLine.split(" ")(0) + val nucleotides = x.drop(1).mkString("") + InputFragment(id, FIRST_LOCATION, nucleotides, None) + }).toDS() + + def getSequenceTitles: Dataset[SeqTitle] = + rdd.map(x => x(0)).toDS +} +/** + * Reader for fastq records. Supports compression via Spark's text input layer. + * Supports compression via Spark's text reader. + */ +class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends HadoopInputReader[Array[String]](file, k) { import spark.sqlContext.implicits._ import HadoopInputReader._ - override protected def loadFile(input: String): RDD[Array[String]] = { + protected def loadFile(input: String): RDD[Array[String]] = { import spark.implicits._ spark.read.text(file). @@ -230,7 +268,7 @@ class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends select(slice($"value", 1, 2)).as[Array[String]] //Currently preserves only the header and the nucleotide string }.rdd - override def getSequenceTitles: Dataset[SeqTitle] = + def getSequenceTitles: Dataset[SeqTitle] = rdd.map(x => x(0)).toDS protected[input] def getFragments(): Dataset[InputFragment] = From 77b2a0d067336fe121e4215681ade2d4317b0ba0 Mon Sep 17 00:00:00 2001 From: Johan Nystrom Date: Sun, 22 Jun 2025 12:28:14 +0900 Subject: [PATCH 04/10] Bugfixes for window logic in FastqTextInput. Bugfix for lineSeparators in InputReaderProps (windows newline sequence is \r\n, not \n\r) --- .../com/jnpersson/kmers/input/FastdoopInputs.scala | 13 ++++++------- .../com/jnpersson/kmers/InputReaderProps.scala | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala b/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala index 8bcf95b65..aa246e5de 100644 --- a/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala +++ b/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.{Configuration => HConfiguration} import org.apache.hadoop.io.Text import org.apache.spark.rdd.RDD import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.functions.{collect_list, floor, lit, monotonically_increasing_id, row_number, slice} +import org.apache.spark.sql.functions.{collect_list, element_at, lit, monotonically_increasing_id, slice} import org.apache.spark.sql.{Dataset, SparkSession} @@ -260,12 +260,11 @@ class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends spark.read.text(file). withColumn("file", lit(file)). withColumn("rowId", monotonically_increasing_id()). - withColumn("recId", //group every 4 rows and give them the same recId - floor( - (row_number().over(Window.partitionBy("file").orderBy("rowId")) - 1) / 4) //monotonically_incr starts at 1 + withColumn("values", //group every 4 rows and give them the same recId + collect_list($"value").over(Window.partitionBy("file").orderBy("rowId").rowsBetween(Window.currentRow, 3)) ). - groupBy("file", "recId").agg(collect_list($"value").as("value")). - select(slice($"value", 1, 2)).as[Array[String]] //Currently preserves only the header and the nucleotide string + where(element_at($"values", 3) === "+"). + select(slice($"values", 1, 2)).as[Array[String]] //Currently preserves only the header and the nucleotide string }.rdd def getSequenceTitles: Dataset[SeqTitle] = @@ -273,7 +272,7 @@ class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends protected[input] def getFragments(): Dataset[InputFragment] = rdd.map(ar => { - val id = ar(0).split(" ")(0) + val id = ar(0).split(" ")(0).substring(1) //remove leading @ val nucleotides = ar(1) InputFragment(id, FIRST_LOCATION, nucleotides, None) }).toDS diff --git a/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala b/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala index 15abeb334..872e0ced3 100644 --- a/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala +++ b/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala @@ -89,7 +89,7 @@ class InputReaderProps extends AnyFunSuite with SparkSessionTestWrapper with Sca } yield (record, fragment) def lineSeparators: Gen[String] = - Gen.oneOf(List("\n", "\n\r")) + Gen.oneOf(List("\n", "\r\n", "\r")) def fastaFiles(seqGen: Gen[NTSeq]): Gen[SeqRecordFile] = for { From 2df9b7a165906c7d6b9e57d79ed118be9d19bc32 Mon Sep 17 00:00:00 2001 From: Johan Nystrom Date: Sun, 22 Jun 2025 13:13:13 +0900 Subject: [PATCH 05/10] Remove most of Fastdoop since spark's text reader now fulfils the purpose of reading fasta and fastq files, with the exception of indexed long (fna, fna.fai) sequences. Rename FastdoopInputs to FileInputs. --- .../fastdoop/FASTAlongInputFileFormat.java | 49 -- .../fastdoop/FASTAshortInputFileFormat.java | 60 -- .../fastdoop/FASTQInputFileFormat.java | 74 --- .../fastdoop/FASTQReadsRecordReader.java | 555 ------------------ .../fastdoop/LongReadsRecordReader.java | 295 ---------- .../java/com/jnpersson/fastdoop/QRecord.java | 136 ----- .../java/com/jnpersson/fastdoop/Record.java | 93 --- .../fastdoop/ShortReadsRecordReader.java | 384 ------------ .../java/com/jnpersson/fastdoop/Utils.java | 72 --- ...{FastdoopInputs.scala => FileInputs.scala} | 63 +- 10 files changed, 1 insertion(+), 1780 deletions(-) delete mode 100644 src/main/java/com/jnpersson/fastdoop/FASTAlongInputFileFormat.java delete mode 100644 src/main/java/com/jnpersson/fastdoop/FASTAshortInputFileFormat.java delete mode 100644 src/main/java/com/jnpersson/fastdoop/FASTQInputFileFormat.java delete mode 100644 src/main/java/com/jnpersson/fastdoop/FASTQReadsRecordReader.java delete mode 100644 src/main/java/com/jnpersson/fastdoop/LongReadsRecordReader.java delete mode 100644 src/main/java/com/jnpersson/fastdoop/QRecord.java delete mode 100644 src/main/java/com/jnpersson/fastdoop/Record.java delete mode 100644 src/main/java/com/jnpersson/fastdoop/ShortReadsRecordReader.java delete mode 100644 src/main/java/com/jnpersson/fastdoop/Utils.java rename src/main/scala/com/jnpersson/kmers/input/{FastdoopInputs.scala => FileInputs.scala} (80%) diff --git a/src/main/java/com/jnpersson/fastdoop/FASTAlongInputFileFormat.java b/src/main/java/com/jnpersson/fastdoop/FASTAlongInputFileFormat.java deleted file mode 100644 index 1e63911d9..000000000 --- a/src/main/java/com/jnpersson/fastdoop/FASTAlongInputFileFormat.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.jnpersson.fastdoop; - -import java.io.IOException; - -import org.apache.hadoop.io.Text; - -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -/** - * A {@code FileInputFormat} for reading FASTA files containing - * sequences of arbitrary length. - * - * @author Gianluca Roscigno - * - * @version 1.0 - * - * @see FileInputFormat - */ - -public class FASTAlongInputFileFormat extends FileInputFormat { - - - @Override - public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new LongReadsRecordReader(); - } -} \ No newline at end of file diff --git a/src/main/java/com/jnpersson/fastdoop/FASTAshortInputFileFormat.java b/src/main/java/com/jnpersson/fastdoop/FASTAshortInputFileFormat.java deleted file mode 100644 index ffbffdf73..000000000 --- a/src/main/java/com/jnpersson/fastdoop/FASTAshortInputFileFormat.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.jnpersson.fastdoop; - -import java.io.IOException; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -/** - * A {@code FileInputFormat} for reading FASTA files containing - * short sequences. - * - * @author Gianluca Roscigno - * - * @version 1.0 - * - * @see FileInputFormat - */ -public class FASTAshortInputFileFormat extends FileInputFormat { - - @Override - public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - - return new ShortReadsRecordReader() { - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - try { - return super.nextKeyValue(); - } catch (ArrayIndexOutOfBoundsException aiob) { - System.err.println( - "Error detected while reading fasta format. Try increasing read max. length with --maxlen.\n" + - "Alternatively, the file may be corrupted."); - throw aiob; - } - } - }; - } -} \ No newline at end of file diff --git a/src/main/java/com/jnpersson/fastdoop/FASTQInputFileFormat.java b/src/main/java/com/jnpersson/fastdoop/FASTQInputFileFormat.java deleted file mode 100644 index 879639d5e..000000000 --- a/src/main/java/com/jnpersson/fastdoop/FASTQInputFileFormat.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.jnpersson.fastdoop; - -import java.io.IOException; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -/** - * A {@code FileInputFormat} for reading FASTQ files. - * - * @author Gianluca Roscigno - * - * @version 1.0 - * - * @see FileInputFormat - */ -public class FASTQInputFileFormat extends FileInputFormat { - - @Override - public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - - return new FASTQReadsRecordReader() { - - @Override - public void initialize(InputSplit genericSplit, TaskAttemptContext context) - throws IOException, InterruptedException { - try { - super.initialize(genericSplit, context); - } catch (ArrayIndexOutOfBoundsException aiob) { - System.err.println( - "Error detected while reading fastq format. Try increasing read max. length with --maxlen.\n" + - "Alternatively, the file may be corrupted."); - throw aiob; - } - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - try { - return super.nextKeyValue(); - } catch (ArrayIndexOutOfBoundsException aiob) { - System.err.println( - "Error detected while reading fastq format. Try increasing read max. length with --maxlen.\n" + - "Alternatively, the file may be corrupted."); - throw aiob; - } - } - }; - - } - -} \ No newline at end of file diff --git a/src/main/java/com/jnpersson/fastdoop/FASTQReadsRecordReader.java b/src/main/java/com/jnpersson/fastdoop/FASTQReadsRecordReader.java deleted file mode 100644 index 0403de4c7..000000000 --- a/src/main/java/com/jnpersson/fastdoop/FASTQReadsRecordReader.java +++ /dev/null @@ -1,555 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.jnpersson.fastdoop; - -import java.io.EOFException; -import java.io.IOException; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.RecordReader; - -/** - * This class reads {@literal } pairs from an {@code InputSplit}. - * The input file is in FASTQ format. - * A FASTA record has a header line that is the key, the data line, an - * optional single line header string and a quality line. - * - * Example: - * {@literal @}SRR034939.184 090406_HWI-EAS68_9096_FC400PR_PE_1_1_10 length=100 - * CCACCTCCTGGGTTCAAGGGGTTCTCTTGCCTCAGCTNNNNNNNNNNNNGGNNNNNNNNNTNNNN - * +SRR034939.184 090406_HWI-EAS68_9096_FC400PR_PE_1_1_10 length=100 - * HDHFHHHHHHFFAFF6?{@literal <}:{@literal <}HHHHHHHHHEDHHHF##!!!!!!!!!!!!##!!!!!!!!!#!!!! - * ... - * - * @author Gianluca Roscigno - * - * @version 1.0 - * - * @see InputSplit - */ - -public class FASTQReadsRecordReader extends RecordReader { - - private FSDataInputStream inputFile; - - private long startByte; - - private long fileLength; - - private Text currKey; - - private QRecord currRecord; - - /* - * Used to buffer the content of the input split - */ - private byte[] myInputSplitBuffer; - - /* - * Auxiliary buffer used to store the ending buffer of this input split and - * the initial bytes of the next split - */ - private byte[] borderBuffer; - - /* - * Marks the current position in the input split buffer - */ - private int posBuffer; - - /* - * Stores the size of the input split buffer - */ - private int sizeBuffer; - /* - * True, if we processed the entire input split buffer. False, otherwise - */ - private boolean endMyInputSplit = false; - - boolean isLastSplit = false; - - public FASTQReadsRecordReader() { - super(); - } - - @Override - public void initialize(InputSplit genericSplit, TaskAttemptContext context) - throws IOException, InterruptedException {// Called once at - // initialization. - - posBuffer = 0; - Configuration job = context.getConfiguration(); - - int look_ahead_buffer_size = context.getConfiguration().getInt("look_ahead_buffer_size", 4096); - - /* - * We open the file corresponding to the input split and - * start processing it - */ - FileSplit split = (FileSplit) genericSplit; - Path path = split.getPath(); - fileLength = path.getFileSystem(job).getContentSummary(path).getLength(); - startByte = split.getStart(); - inputFile = path.getFileSystem(job).open(path); - Utils.safeSeek(inputFile, startByte); - - currKey = new Text("null"); - currRecord = new QRecord(); - - /* - * We read the whole content of the split in memory using - * myInputSplitBuffer. Plus, we read in the memory the first - * KV_BUFFER_SIZE of the next split - */ - myInputSplitBuffer = new byte[(int) split.getLength()]; - currRecord.setBuffer(myInputSplitBuffer); - - borderBuffer = new byte[look_ahead_buffer_size]; - - sizeBuffer = inputFile.read(startByte, myInputSplitBuffer, 0, myInputSplitBuffer.length); - Utils.safeSeek(inputFile,startByte + sizeBuffer); - - boolean isEOF = (sizeBuffer < 0 || startByte + sizeBuffer >= fileLength); - - if (isEOF) { - isLastSplit = true; - int newLineCount = 0; - int k = 1; - - while (myInputSplitBuffer[myInputSplitBuffer.length - k++] == '\n') - newLineCount++; - - byte[] tempBuffer = new byte[(int) split.getLength() - newLineCount]; - System.arraycopy(myInputSplitBuffer, 0, tempBuffer, 0, myInputSplitBuffer.length - newLineCount); - myInputSplitBuffer = tempBuffer; - } - - for (int i = 0; i < sizeBuffer; i++) { - if (myInputSplitBuffer[i] == '@') { - if (i == 0) { - posBuffer = i + 1; - break; - } - if (myInputSplitBuffer[i - 1] == '\n') { - posBuffer = i + 1; - break; - } - - } - } - - /* - * We skip the first header of the split - */ - int j = posBuffer; - - while (myInputSplitBuffer[j] != '\n') { - j++; - } - - if (myInputSplitBuffer[j + 1] == '@') - posBuffer = j + 2; - - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (endMyInputSplit) { - return false; - } - - boolean nextsplitKey = false; - boolean nextsplitValue = false; - boolean nextsplitQuality = false; - boolean nextsplitSecondHeader = false; - - currRecord.setStartKey(posBuffer); - - /* - * We look for the next short sequence my moving posBuffer until a - * newline character is found. - * End of split is implicitly managed through - * ArrayIndexOutOfBoundsException handling - */ - - try { - while (myInputSplitBuffer[posBuffer] != '\n') { - posBuffer++; - } - } catch (ArrayIndexOutOfBoundsException e) { - /* - * If we reached the end of the split while scanning a sequence, we - * use nextsplitKey to remember that more characters have to be - * fetched from the next split for retrieving the key - */ - if (!isLastSplit) { - endMyInputSplit = true; - nextsplitKey = true; - } else { - return false; - } - } - - currRecord.setEndKey(posBuffer - 1); - - if (!endMyInputSplit) { - /* - * Assuming there are more characters from the current split to - * process, we move forward the pointer - * until the symbol '+' is found - * - * posBuffer + 1 can potentially overrun the buffer end, since the exception above is not thrown - * if the final character of the split is a \n. Check the offset accordingly. - */ - currRecord.setStartValue(Utils.trimToEnd(myInputSplitBuffer, posBuffer + 1)); - - try { - posBuffer = posBuffer + 2; - - while (myInputSplitBuffer[posBuffer] != '+') { - posBuffer++; - } - - currRecord.setEndValue(posBuffer - 2); - posBuffer++; - - } catch (ArrayIndexOutOfBoundsException e) { - - if (isLastSplit) { - return false; - } - - /* - * If we reached the end of the split while scanning a sequence, - * we use nextsplitValue to remember that more characters have - * to be fetched from the next split for retrieving the value - */ - - endMyInputSplit = true; - nextsplitValue = true; - int c = 0; - - if (posBuffer > myInputSplitBuffer.length) { - posBuffer = myInputSplitBuffer.length; - } - - for (int i = posBuffer - 1; i >= 0; i--) { - if (((char) myInputSplitBuffer[i]) != '\n') - break; - - c++; - } - - currRecord.setEndValue(posBuffer - 1 - c); - - } - - } - - if (!endMyInputSplit) { - - //The exception above would not be thrown if the final character of the split is a +. - //Check the offset accordingly. - currRecord.setStartKey2(Utils.trimToEnd(myInputSplitBuffer, posBuffer)); - - try { - - try { - while (myInputSplitBuffer[posBuffer] != '\n') { - posBuffer++; - } - } catch (ArrayIndexOutOfBoundsException e) { - - if (isLastSplit) { - return false; - } - /* - * If we reached the end of the split while scanning a - * sequence, - * we use nextsplitQuality to remember that more characters - * have - * to be fetched from the next split for retrieving (and - * discarding) - * the quality linevalue - */ - endMyInputSplit = true; - nextsplitSecondHeader = true; - nextsplitQuality = true; - } - currRecord.setEndKey2(posBuffer - 1); - - if (!endMyInputSplit) { - - //The exception above would not be thrown if the final character of the split is a newline. - //Check the offset accordingly. - currRecord.setStartQuality(Utils.trimToEnd(myInputSplitBuffer, posBuffer + 1)); - currRecord.setEndQuality(currRecord.getStartQuality() + currRecord.getEndValue() - currRecord.getStartValue()); - posBuffer = (currRecord.getEndQuality() + 3); - - if (myInputSplitBuffer.length <= currRecord.getEndQuality()) { - - currRecord.setEndQuality(myInputSplitBuffer.length - 1); - posBuffer = (myInputSplitBuffer.length - 1); - - throw new ArrayIndexOutOfBoundsException(); - } else { - if (posBuffer > (myInputSplitBuffer.length - 1)) { - endMyInputSplit = true; - return true; - } - } - } - - } catch (ArrayIndexOutOfBoundsException e) { - if (isLastSplit) { - return false; - } - - endMyInputSplit = true; - nextsplitQuality = true; - } - - } - - /* - * The end of the split has been reached - */ - if (endMyInputSplit) { - - /* - * If there is another split after this one and we still need to - * retrieve the - * key of the current record, we switch to borderbuffer to fetch all - * the remaining characters - */ - - if (nextsplitKey) { - currRecord.setBuffer(borderBuffer); - int j = posBuffer - currRecord.getStartKey(); - System.arraycopy(myInputSplitBuffer, currRecord.getStartKey(), borderBuffer, 0, j); - - posBuffer = j; - - currRecord.setStartKey(0); - nextsplitValue = true; - - byte b; - - try { - - while ((b = inputFile.readByte()) != '\n') - borderBuffer[j++] = b; - - } catch (EOFException e) { - nextsplitValue = false; - } - - if (!nextsplitValue) { - return false; - } - - currRecord.setEndKey(j - 1); - - } - - /* - * If there is another split after this one and we still need to - * retrieve the value of the current record, we switch to - * borderbuffer to fetch all the remaining characters - */ - if (nextsplitValue) { - - if (!nextsplitKey) { - - currRecord.setBuffer(borderBuffer); - - int j = currRecord.getEndKey() + 1 - currRecord.getStartKey(); - System.arraycopy(myInputSplitBuffer, currRecord.getStartKey(), borderBuffer, 0, j); - - currRecord.setStartKey(0); - currRecord.setEndKey(j - 1); - - int start = currRecord.getStartValue(); - currRecord.setStartValue(j); - - if ((currRecord.getEndValue() + 1 - start) > 0) - System.arraycopy(myInputSplitBuffer, start, borderBuffer, j, (currRecord.getEndValue() + 1 - start)); - - if ((currRecord.getEndValue() - start) < 0) { - posBuffer = j; - } else { - posBuffer = j + currRecord.getEndValue() - start; - } - - currRecord.setEndValue(posBuffer); - - posBuffer++; - - } else { - posBuffer = currRecord.getEndKey() + 1; - currRecord.setStartValue(posBuffer); - } - - byte b; - - try { - while ((b = inputFile.readByte()) != '+') - if (b != '\n') - borderBuffer[posBuffer++] = b; - - } catch (EOFException e) {} - - currRecord.setEndValue(posBuffer - 1); - - posBuffer++; - currRecord.setStartKey2(posBuffer); - - try { - - while ((b = inputFile.readByte()) != '\n') - borderBuffer[posBuffer++] = b; - - currRecord.setEndKey2(posBuffer - 1); - - currRecord.setStartQuality(posBuffer); - - while ((b = inputFile.readByte()) != '\n') - borderBuffer[posBuffer++] = b; - - } catch (EOFException e) { - // End file. - } - - currRecord.setEndQuality(posBuffer - 1); - - } - - /* - * If there is another split after this one and we still need to - * retrieve the quality line of the current record, we switch to - * borderbuffer to fetch all the remaining characters - */ - if (nextsplitQuality) { - - currRecord.setBuffer(borderBuffer); - - // copy key - int j = currRecord.getEndKey() + 1 - currRecord.getStartKey(); - System.arraycopy(myInputSplitBuffer, currRecord.getStartKey(), borderBuffer, 0, j); - - currRecord.setStartKey(0); - currRecord.setEndKey(j - 1); - - // copy value - int v = currRecord.getEndValue() + 1 - currRecord.getStartValue(); - System.arraycopy(myInputSplitBuffer, currRecord.getStartValue(), borderBuffer, j, v); - - currRecord.setStartValue(j); - currRecord.setEndValue(j + v - 1); - - byte b; - - if (nextsplitSecondHeader) { - int start = currRecord.getStartKey2(); - currRecord.setStartKey2(currRecord.getEndValue() + 1); - posBuffer = currRecord.getStartKey2(); - - if ((currRecord.getEndKey2() + 1 - start) > 0) - System.arraycopy(myInputSplitBuffer, start, borderBuffer, currRecord.getStartKey2(), - (currRecord.getEndKey2() + 1 - start)); - - posBuffer = currRecord.getStartKey2() + (currRecord.getEndKey2() - start); - - currRecord.setEndKey2(posBuffer); - posBuffer++; - - while ((b = inputFile.readByte()) != '\n') - borderBuffer[posBuffer++] = b; - - - currRecord.setEndKey2(posBuffer - 1); - currRecord.setStartQuality(posBuffer); - - } else { - - int s = currRecord.getEndKey2() + 1 - currRecord.getStartKey2(); - System.arraycopy(myInputSplitBuffer, currRecord.getStartKey2(), borderBuffer, currRecord.getEndValue() + 1, - s); - currRecord.setStartKey2(currRecord.getEndValue() + 1); - currRecord.setEndKey2(currRecord.getStartKey2() + s - 1); - - int start = currRecord.getStartQuality(); - currRecord.setStartQuality(currRecord.getEndKey2() + 1); - posBuffer = currRecord.getStartQuality(); - - if ((currRecord.getEndQuality() + 1 - start) > 0) - System.arraycopy(myInputSplitBuffer, start, borderBuffer, currRecord.getStartQuality(), - (currRecord.getEndQuality() + 1 - start)); - - posBuffer = currRecord.getStartQuality() + (currRecord.getEndQuality() - start); - - currRecord.setEndQuality(posBuffer); - posBuffer++; - } - - try { - - while ((b = inputFile.readByte()) != '\n') - borderBuffer[posBuffer++] = b; - - } catch (EOFException e) {} - - currRecord.setEndQuality(posBuffer - 1); - - } - - } - - return true; - - } - - @Override - public void close() throws IOException { - - if (inputFile != null) - inputFile.close(); - } - - @Override - public Text getCurrentKey() throws IOException, InterruptedException { - return currKey; - } - - @Override - public QRecord getCurrentValue() throws IOException, InterruptedException { - return currRecord; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return sizeBuffer > 0 ? posBuffer / sizeBuffer : 1; - - } - -} \ No newline at end of file diff --git a/src/main/java/com/jnpersson/fastdoop/LongReadsRecordReader.java b/src/main/java/com/jnpersson/fastdoop/LongReadsRecordReader.java deleted file mode 100644 index 2320c3e67..000000000 --- a/src/main/java/com/jnpersson/fastdoop/LongReadsRecordReader.java +++ /dev/null @@ -1,295 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * This file has been modified from the original version for use in Discount. - */ - -package com.jnpersson.fastdoop; - -import java.io.IOException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; - - -import org.apache.hadoop.mapreduce.RecordReader; - -/** - * This class reads {@literal } pairs from an {@code InputSplit}. - * The input file is in FASTA format and contains a single long sequence. - * A FASTA record has a header line that is the key, and data lines - * that are the value. - * {@literal >header...} - * data - * ... - * - * - * Example: - * {@literal >Seq1} - * TAATCCCAAATGATTATATCCTTCTCCGATCGCTAGCTATACCTTCCAGGCGATGAACTTAGACGGAATCCACTTTGCTA - * CAACGCGATGACTCAACCGCCATGGTGGTACTAGTCGCGGAAAAGAAAGAGTAAACGCCAACGGGCTAGACACACTAATC - * CTCCGTCCCCAACAGGTATGATACCGTTGGCTTCACTTCTACTACATTCGTAATCTCTTTGTCAGTCCTCCCGTACGTTG - * GCAAAGGTTCACTGGAAAAATTGCCGACGCACAGGTGCCGGGCCGTGAATAGGGCCAGATGAACAAGGAAATAATCACCA - * CCGAGGTGTGACATGCCCTCTCGGGCAACCACTCTTCCTCATACCCCCTCTGGGCTAACTCGGAGCAAAGAACTTGGTAA - * ... - * - * @author Gianluca Roscigno - * - * @version 1.0 - * - * @see InputSplit - */ -public class LongReadsRecordReader extends RecordReader { - - private FSDataInputStream inputFile; - - private long startByte; - - private Text currKey; - - private PartialSequence currValue; - - /* - * True, if we processed the entire input split buffer. False, otherwise - */ - private boolean endMyInputSplit; - - private int k; - - /* - * Number of bases per line, from FAI index - */ - private int lineBases; - - /* - * Line width, from FAI index - */ - private int lineWidth; - - /* - * Offset sequence start in file (0-based bytes) - */ - private long seqOffset; - - public LongReadsRecordReader() { - super(); - - } - - /** - * Read a fai file (such as the one generated by 'seqkit faidx') if it exists, - * and populate seqOffset, lineBases, lineWidth, and currValue.header. - * @param path Path to the fai file - * @param job - * @return True if and only if the file existed and was read - * @throws IOException - */ - private boolean readFastaIndex(Path path, Configuration job) throws IOException { - FileSystem fs = path.getFileSystem(job); - if (!fs.exists(path)) { - return false; - } - - long length = fs.getContentSummary(path).getLength(); - byte[] buffer = new byte[(int) length]; - FSDataInputStream file = path.getFileSystem(job).open(path); - file.readFully(0, buffer); - String lines = new String(buffer); - String firstLine = lines.split("\n")[0]; - String[] firstSplit = firstLine.split("\t"); - - seqOffset = Long.parseLong(firstSplit[2]); - lineBases = Integer.parseInt(firstSplit[3]); - lineWidth = Integer.parseInt(firstSplit[4]); - currValue.setHeader(firstSplit[0]); - return true; - } - - @Override - public void initialize(InputSplit genericSplit, TaskAttemptContext context) - throws IOException, InterruptedException { - - Configuration job = context.getConfiguration(); - - /* - * k determines how many bytes of the next input split (if any) - * should be retrieved together with the bytes of the current - * input split. - */ - k = context.getConfiguration().getInt("k", 10); - - /* - * We open the file corresponding to the input split and - * start processing it - */ - FileSplit split = (FileSplit) genericSplit; - Path path = split.getPath(); - startByte = split.getStart(); - inputFile = path.getFileSystem(job).open(path); - - currKey = new Text("null"); - currValue = new PartialSequence(); - - Path faiPath = new Path(path.toString() + ".fai"); - if (!readFastaIndex(faiPath, job)) { - //If the fai file is successfully read, we will assign the sequence ID to the header. - //Otherwise, we set the file name as the header. - currValue.setHeader(path.getName()); - } - - /* - * We read the whole content of the split in memory using - * myInputSplitBuffer. Plus, we read in the memory the first - * k+2 characters of the next split - */ - - int inputSplitSize = (int) split.getLength(); - int otherbytesToReads = k + 2; - - byte[] myInputSplitBuffer = new byte[(inputSplitSize + otherbytesToReads)]; - currValue.setBuffer(myInputSplitBuffer); - - int sizeBuffer1 = inputFile.read(startByte, myInputSplitBuffer, 0, inputSplitSize); - - if (sizeBuffer1 <= 0) { - endMyInputSplit = true; - return; - } else - endMyInputSplit = false; - - int sizeBuffer2 = inputFile.read((startByte + sizeBuffer1), myInputSplitBuffer, sizeBuffer1, otherbytesToReads); - - boolean lastInputSplit = false; - - /* - * If there are no characters to read from the next split, then - * this is the last split - */ - if (sizeBuffer2 <= 0) { - lastInputSplit = true; - sizeBuffer2 = 0; - } - - int posBuffer = 0; - - /* - * If we are processing the first split of the HDFS file, then we need - * to discard the comment line - */ - if (startByte == 0) { - - for (int i = 0; i < sizeBuffer1; i++) { - - posBuffer++; - - if (myInputSplitBuffer[posBuffer - 1] == '\n') - break; - - } - - } - - /* - * If the split we are processing is not the last one, then we need - * to process its whole content - */ - if (!lastInputSplit) { - currValue.setBytesToProcess(sizeBuffer1 - posBuffer); - - if (sizeBuffer2 < (k - 1)) { - currValue.setBytesToProcess(currValue.getBytesToProcess() - ((k - 1) - sizeBuffer2)); - } - - } else { - /* - * If the split we are processing is the last one, we trim - * all the ending '\n' characters - */ - - int c = 0; - - for (int i = sizeBuffer1 - 1; i >= 0; i--) { - if (myInputSplitBuffer[i] != '\n') - break; - - c++; - } - - currValue.setBytesToProcess((sizeBuffer1 - posBuffer) - k + 1 - c); - if (currValue.getBytesToProcess() <= 0) { - endMyInputSplit = true; - } - - } - - if (lineWidth > 0 && lineBases > 0) { - //We read the .fai file successfully and can compute the position in the sequence. - if (startByte == 0) { - currValue.setSeqPosition(1); - } else { - long seqStartPosition = startByte - seqOffset; - long row = seqStartPosition / lineWidth; - long pos = row * lineBases + (seqStartPosition % lineWidth); - //Convert from 0-based to 1-based sequence position - currValue.setSeqPosition(pos + 1); - } - } else { - currValue.setSeqPosition(-1); - } - - currValue.setStartValue(posBuffer); - currValue.setEndValue(sizeBuffer1 + sizeBuffer2 - 1); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - - if (endMyInputSplit == true) - return false; - - endMyInputSplit = true; - return true; - - } - - @Override - public void close() throws IOException {// Close the record reader. - if (inputFile != null) - inputFile.close(); - } - - @Override - public Text getCurrentKey() throws IOException, InterruptedException { - return currKey; - } - - @Override - public PartialSequence getCurrentValue() throws IOException, InterruptedException { - return currValue; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return endMyInputSplit ? 1 : 0; - } - -} \ No newline at end of file diff --git a/src/main/java/com/jnpersson/fastdoop/QRecord.java b/src/main/java/com/jnpersson/fastdoop/QRecord.java deleted file mode 100644 index 085572e3f..000000000 --- a/src/main/java/com/jnpersson/fastdoop/QRecord.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.jnpersson.fastdoop; - -import java.io.Serializable; - -/** - * Utility class used to represent as a record a sequence existing - * in a FASTQ file. - * - * @author Gianluca Roscigno - * - * @version 1.0 - */ -public class QRecord implements Serializable { - - private static final long serialVersionUID = -7555239567456193078L; - - private byte[] buffer; - private int startKey, endKey; - private int startValue, endValue; - private int startKey2, endKey2; - private int startQuality, endQuality; - - public String getKey() { - return new String(buffer, startKey, (endKey - startKey + 1)); - } - - public String getValue() { - return new String(buffer, startValue, (endValue - startValue + 1)); - } - - public String getKey2() { - return new String(buffer, startKey2, (endKey2 - startKey2 + 1)); - } - - public String getQuality() { - return new String(buffer, startQuality, (endQuality - startQuality + 1)); - } - - @Override - public String toString() { - - return "@" + this.getKey() + "\n" + this.getValue() + "\n+" + this.getKey2() + "\n" + this.getQuality(); - - } - - public byte[] getBuffer() { - return buffer; - } - - public int getStartValue() { - return startValue; - } - - public int getEndValue() { - return endValue; - } - - public void setBuffer(byte[] buffer) { - this.buffer = buffer; - } - - public int getStartKey() { - return startKey; - } - - public void setStartKey(int startKey) { - this.startKey = startKey; - } - - public int getEndKey() { - return endKey; - } - - public void setEndKey(int endKey) { - this.endKey = endKey; - } - - public void setStartValue(int startValue) { - this.startValue = startValue; - } - - public void setEndValue(int endValue) { - this.endValue = endValue; - } - - public int getStartKey2() { - return startKey2; - } - - public void setStartKey2(int startKey2) { - this.startKey2 = startKey2; - } - - public int getEndKey2() { - return endKey2; - } - - public void setEndKey2(int endKey2) { - this.endKey2 = endKey2; - } - - public int getStartQuality() { - return startQuality; - } - - public void setStartQuality(int startQuality) { - this.startQuality = startQuality; - } - - public int getEndQuality() { - return endQuality; - } - - public void setEndQuality(int endQuality) { - this.endQuality = endQuality; - } - -} \ No newline at end of file diff --git a/src/main/java/com/jnpersson/fastdoop/Record.java b/src/main/java/com/jnpersson/fastdoop/Record.java deleted file mode 100644 index fab12c672..000000000 --- a/src/main/java/com/jnpersson/fastdoop/Record.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.jnpersson.fastdoop; - -import java.io.Serializable; - -/** - * Utility class used to represent as a record a sequence existing - * in a FASTA file. - * - * @author Gianluca Roscigno - * - * @version 1.0 - */ - -public class Record implements Serializable { - - private static final long serialVersionUID = 8015377043208271607L; - - private byte[] buffer; - private int startKey, endKey; - private int startValue, endValue; - - public String getKey() { - return new String(buffer, startKey, (endKey - startKey + 1)); - } - - public String getValue() { - return new String(buffer, startValue, (endValue - startValue + 1)); - } - - @Override - public String toString() { - return ">" + this.getKey() + "\n" + this.getValue(); - } - - public byte[] getBuffer() { - return buffer; - } - - public void setBuffer(byte[] buffer) { - this.buffer = buffer; - } - - public int getStartKey() { - return startKey; - } - - public void setStartKey(int startKey) { - this.startKey = startKey; - } - - public int getEndKey() { - return endKey; - } - - public void setEndKey(int endKey) { - this.endKey = endKey; - } - - public int getStartValue() { - return startValue; - } - - public void setStartValue(int startValue) { - this.startValue = startValue; - } - - public int getEndValue() { - return endValue; - } - - public void setEndValue(int endValue) { - this.endValue = endValue; - } - -} diff --git a/src/main/java/com/jnpersson/fastdoop/ShortReadsRecordReader.java b/src/main/java/com/jnpersson/fastdoop/ShortReadsRecordReader.java deleted file mode 100644 index 37ddb6858..000000000 --- a/src/main/java/com/jnpersson/fastdoop/ShortReadsRecordReader.java +++ /dev/null @@ -1,384 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * This file has been modified by Johan Nyström-Persson from the - * original version for use in Discount. - */ - -package com.jnpersson.fastdoop; - -import java.io.EOFException; -import java.io.IOException; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.RecordReader; - -/** - * This class reads {@literal } pairs from an {@code InputSplit}. - * The input file is in FASTA format. - * A FASTA record has a header line that is the key, and data lines - * that are the value. - * {@literal >header...} - * data - * ... - * - * - * Example: - * {@literal >Seq1} - * TAATCCCAAATGATTATATCCTTCTCCGATCGCTAGCTATACCTTCCAGGCGATGAACTTAGACGGAATCCACTTTGCTA - * CAACGCGATGACTCAACCGCCATGGTGGTACTAGTCGCGGAAAAGAAAGAGTAAACGCCAACGGGCTAGACACACTAATC - * CTCCGTCCCCAACAGGTATGATACCGTTGGCTTCACTTCTA - * {@literal >Seq2} - * CTACATTCGTAATCTCTTTGTCAGTCCTCCCGTACGTTGGCAAAGGTTCACTGGAAAAATTGCCGACGCACAGGTGCCGG - * GCCGTGAATAGGGCCAGATGAACAAGGAAATAATCACCACCGAGGTGTGACATGCCCTCTCGGGCAACCACTCTTCCTCA - * TACCCCCTCTGGGCTAACTCGGAGCAAAGAACTTGGTAA - * ... - * - * @author Gianluca Roscigno - * - * @version 1.0 - * - * @see InputSplit - */ - -public class ShortReadsRecordReader extends RecordReader { - - private FSDataInputStream inputFile; - - private long startByte; - - private boolean hasReadToEOF; - - private Text currKey; - - private Record currValue; - - /* - * Used to buffer the content of the input split - */ - private byte[] myInputSplitBuffer; - - private int look_ahead_buffer_size = 0; - - /* - * Auxiliary buffer used to store the ending buffer of this input split and - * the initial bytes of the next split - */ - private byte[] borderBuffer; - - /* - * Marks the current position in the input split buffer - */ - private int posBuffer; - - /* - * Stores the size of the input split buffer - */ - private int sizeBuffer; - - /* - * True, if we processed the entire input split buffer. False, otherwise - */ - private boolean endMyInputSplit = false; - - public ShortReadsRecordReader() { - super(); - } - - @Override - public void initialize(InputSplit genericSplit, TaskAttemptContext context) - throws IOException, InterruptedException { - - posBuffer = 0; - Configuration job = context.getConfiguration(); - - look_ahead_buffer_size = context.getConfiguration().getInt("look_ahead_buffer_size", 2048); - - /* - * We open the file corresponding to the input split and - * start processing it - */ - FileSplit split = (FileSplit) genericSplit; - Path path = split.getPath(); - long fileLength = path.getFileSystem(job).getContentSummary(path).getLength(); - startByte = split.getStart(); - inputFile = path.getFileSystem(job).open(path); - Utils.safeSeek(inputFile, startByte); - - currKey = new Text("null"); - currValue = new Record(); - - /* - * We read the whole content of the split in memory using - * myInputSplitBuffer. Plus, we read in the memory the first - * KV_BUFFER_SIZE of the next split - */ - - myInputSplitBuffer = new byte[(int) split.getLength()]; - currValue.setBuffer(myInputSplitBuffer); - - sizeBuffer = inputFile.read(startByte, myInputSplitBuffer, 0, myInputSplitBuffer.length); - - if (sizeBuffer <= 0) { - endMyInputSplit = true; - return; - } - - hasReadToEOF = (sizeBuffer < 0 || startByte + sizeBuffer >= fileLength); - - Utils.safeSeek(inputFile, startByte + sizeBuffer); - - /* - * We move the starting pointer past the first occurrence of the '>' - * symbol as we assume these characters - * will be processed together with the previous split - */ - for (int i = 0; i < sizeBuffer; i++) { - if (myInputSplitBuffer[i] == '>') { - posBuffer = i + 1; - break; - } - } - - if (posBuffer == 0) { - endMyInputSplit = true; - } - - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - - if (endMyInputSplit) - return false; - - if (borderBuffer == null) { - borderBuffer = new byte[look_ahead_buffer_size]; - } - - boolean nextsplitKey = false; - boolean nextsplitValue = false; - - currValue.setStartKey(posBuffer); - - /* - * We look for the next short sequence my moving posBuffer until a - * newline character is found. - * End of split is implicitly managed through - * ArrayIndexOutOfBoundsException handling - */ - try { - while (myInputSplitBuffer[posBuffer] != '\n') { - posBuffer++; - } - } catch (ArrayIndexOutOfBoundsException e) { - - /* - * If we reached the end of the split while scanning a sequence, we - * use nextsplitKey to remember that more characters have to be - * fetched from the next split for retrieving the key - */ - endMyInputSplit = true; - nextsplitKey = true; - } - - currValue.setEndKey(posBuffer - 1); - - if (!endMyInputSplit) { - /* - * Assuming there are more characters from the current split to - * process, we move forward the pointer - * until the symbol '>' is found - * - * posBuffer + 1 can potentially overrun the end of the buffer, since the exception above - * would not be thrown if the final character of the split is a \n. Check the offset accordingly. - */ - currValue.setStartValue(Utils.trimToEnd(myInputSplitBuffer, posBuffer + 1)); - - try { - while (myInputSplitBuffer[posBuffer] != '>') { - posBuffer++; - } - - currValue.setEndValue(posBuffer - 2); - posBuffer++; - - } catch (ArrayIndexOutOfBoundsException e) { - /* - * If we reached the end of the split while scanning a sequence, - * we use nextsplitValue to remember that more characters have - * to be fetched from the next split for retrieving the value - */ - endMyInputSplit = true; - nextsplitValue = true; - currValue.setEndValue(posBuffer - 1); - - } - - } - - /* - * The end of the split has been reached - */ - if (endMyInputSplit) { - - /* - * First, we check if we reached the end of the HDFS file (not of - * the split) - */ - if (hasReadToEOF) { - int c = 0; - - for (int i = posBuffer - 1; i >= 0; i--) { - if ( myInputSplitBuffer[i] != '\n') - break; - - c++; - } - - currValue.setEndValue(posBuffer - 1 - c); - - return true; - } - - /* - * If there is another split after this one and we still need to - * retrieve the - * key of the current record, we switch to borderbuffer to fetch all - * the remaining characters - */ - if (nextsplitKey) { - - currValue.setBuffer(borderBuffer); - - int j = posBuffer - currValue.getStartKey(); - - System.arraycopy(myInputSplitBuffer, currValue.getStartKey(), borderBuffer, 0, j); - - posBuffer = j; - - currValue.setStartKey(0); - nextsplitValue = true; - - byte b; - - try { - - while ((b = inputFile.readByte()) != '\n') - borderBuffer[j++] = b; - - } catch (EOFException e) {} - - if (!nextsplitValue) - return false; - - currValue.setEndKey(j - 1); - } - - /* - * If there is another split after this one and we still need to - * retrieve the value of the current record, we switch to - * borderbuffer to fetch all the remaining characters - */ - if (nextsplitValue) { - - if (!nextsplitKey) { - - currValue.setBuffer(borderBuffer); - - int j = currValue.getEndKey() + 1 - currValue.getStartKey(); - System.arraycopy(myInputSplitBuffer, currValue.getStartKey(), borderBuffer, 0, j); - - currValue.setStartKey(0); - currValue.setEndKey(j - 1); - - int start = currValue.getStartValue(); - currValue.setStartValue(j); - - if ((currValue.getEndValue() + 1 - start) > 0) // TODO VERIFICARE - System.arraycopy(myInputSplitBuffer, start, borderBuffer, j, (currValue.getEndValue() + 1 - start)); - posBuffer = j + currValue.getEndValue() + 1 - start; - - currValue.setEndValue(posBuffer); - - } else { - posBuffer = currValue.getEndKey() + 1; - currValue.setStartValue(posBuffer); - } - - byte b = 'a'; - - try { - - while ((b = inputFile.readByte()) != '>') - borderBuffer[posBuffer++] = b; - - } catch (EOFException e) {} - - if (b == '>') - currValue.setEndValue(posBuffer - 2); - else { - - int c = 0; - - for (int i = posBuffer - 1; i >= 0; i--) { - - if (borderBuffer[i] != '\n') - break; - - c++; - } - - currValue.setEndValue(posBuffer - 1 - c); - - } - - } - } - - return true; - - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return sizeBuffer > 0 ? posBuffer / sizeBuffer : 1; - } - - @Override - public void close() throws IOException { - - if (inputFile != null) - inputFile.close(); - } - - @Override - public Text getCurrentKey() throws IOException, InterruptedException { - return currKey; - } - - @Override - public Record getCurrentValue() throws IOException, InterruptedException { - return currValue; - } - -} diff --git a/src/main/java/com/jnpersson/fastdoop/Utils.java b/src/main/java/com/jnpersson/fastdoop/Utils.java deleted file mode 100644 index ad66dbf5e..000000000 --- a/src/main/java/com/jnpersson/fastdoop/Utils.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.jnpersson.fastdoop; - -import org.apache.hadoop.fs.FSDataInputStream; - -import java.io.EOFException; -import java.io.IOException; - -class Utils { - - /** - * From FSUtils in HUDI https://github.com/apache/hudi - *

- * GCS has a different behavior for detecting EOF during seek(). - * - * @param inputStream FSDataInputStream - * @return true if the inputstream or the wrapped one is of type GoogleHadoopFSInputStream - */ - public static boolean isGCSInputStream(FSDataInputStream inputStream) { - return inputStream.getClass().getCanonicalName().equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream") - || inputStream.getWrappedStream().getClass().getCanonicalName() - .equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream"); - } - - - /** - * From FSUtils in HUDI https://github.com/apache/hudi - * - * Handles difference in seek behavior for GCS and non-GCS input stream - * @param inputStream Input Stream - * @param pos Position to seek - * @throws IOException - */ - public static void safeSeek(FSDataInputStream inputStream, long pos) throws IOException { - try { - inputStream.seek(pos); - } catch (EOFException e) { - if (isGCSInputStream(inputStream)) { - inputStream.seek(pos - 1); - } else { - throw e; - } - } - } - - /** - * Adjust an offset into a buffer such that the offset does not overrun the buffer end. - * @param buffer - * @param offset - * @return - */ - public static int trimToEnd(byte[] buffer, int offset) { - return (offset <= buffer.length - 1) ? offset : (buffer.length - 1); - } -} \ No newline at end of file diff --git a/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala b/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala similarity index 80% rename from src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala rename to src/main/scala/com/jnpersson/kmers/input/FileInputs.scala index aa246e5de..709a2088e 100644 --- a/src/main/scala/com/jnpersson/kmers/input/FastdoopInputs.scala +++ b/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala @@ -17,7 +17,7 @@ package com.jnpersson.kmers.input -import com.jnpersson.fastdoop.{FASTAshortInputFileFormat, FASTQInputFileFormat, IndexedFastaFormat, PartialSequence, QRecord, Record} +import com.jnpersson.fastdoop.{IndexedFastaFormat, PartialSequence} import com.jnpersson.kmers.{HDFSUtil, SeqLocation, SeqTitle} import com.jnpersson.kmers.minimizer.InputFragment import org.apache.hadoop.conf.{Configuration => HConfiguration} @@ -152,66 +152,6 @@ abstract class HadoopInputReader[R <: AnyRef](file: String, k: Int)(implicit spa protected[input] def getFragments(): Dataset[InputFragment] } -/** - * Input reader for FASTA sequences of a fixed maximum length. - * Uses [[FASTAshortInputFileFormat]] - * @param file the file to read - * @param k length of k-mers - * @param maxReadLength maximum length of a single read - * @param file2 second file for paired-end reads - */ -class FastaShortInput(file: String, k: Int, maxReadLength: Int) - (implicit spark: SparkSession) extends HadoopInputReader[Record](file, k) { - import spark.sqlContext.implicits._ - import HadoopInputReader._ - - private val bufsiz = maxReadLength + // sequence data - 1000 //ID string and separator characters - conf.set("look_ahead_buffer_size", bufsiz.toString) //fastdoop parameter - - protected def loadFile(input: String): RDD[Record] = - sc.newAPIHadoopFile(input, classOf[FASTAshortInputFileFormat], classOf[Text], classOf[Record], conf).values - - def getSequenceTitles: Dataset[SeqTitle] = - rdd.map(_.getKey).toDS().distinct - - protected[input] def getFragments(): Dataset[InputFragment] = - rdd.map(rec => - makeInputFragment(rec.getKey.split(" ")(0), FIRST_LOCATION, rec.getBuffer, - rec.getStartValue, rec.getEndValue) - ).toDS - -} - -/** - * Input reader for FASTQ short reads. Uses [[FASTQInputFileFormat]] - * @param file the file to read - * @param k length of k-mers - * @param maxReadLength maximum length of a single read - * @param file2 second file for paired-end reads - */ -class FastqShortInput(file: String, k: Int, maxReadLength: Int) - (implicit spark: SparkSession) extends HadoopInputReader[QRecord](file, k) { - import spark.sqlContext.implicits._ - import HadoopInputReader._ - - private val bufsiz = maxReadLength * 2 + // sequence and quality data - 1000 //ID string and separator characters - conf.set("look_ahead_buffer_size", bufsiz.toString) //fastdoop parameter - - protected def loadFile(input: String): RDD[QRecord] = - sc.newAPIHadoopFile(input, classOf[FASTQInputFileFormat], classOf[Text], classOf[QRecord], conf).values - - def getSequenceTitles: Dataset[SeqTitle] = - rdd.map(_.getKey).toDS.distinct - - protected[input] def getFragments(): Dataset[InputFragment] = - rdd.map(rec => - makeInputFragment(rec.getKey.split(" ")(0), FIRST_LOCATION, rec.getBuffer, - rec.getStartValue, rec.getEndValue) - ).toDS -} - /** * Reader for fasta records that are potentially multiline, but small enough to fit into a single string. * Huge sequences are best processed with the [[IndexedFastaFormat]] instead (.fna files) @@ -248,7 +188,6 @@ class FastaTextInput(file: String, k: Int)(implicit spark: SparkSession) extends /** * Reader for fastq records. Supports compression via Spark's text input layer. - * Supports compression via Spark's text reader. */ class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends HadoopInputReader[Array[String]](file, k) { import spark.sqlContext.implicits._ From 357a0f5441b103d7f6ffcd9eb2f2a82edd752a43 Mon Sep 17 00:00:00 2001 From: Johan Nystrom Date: Sun, 22 Jun 2025 13:17:55 +0900 Subject: [PATCH 06/10] Remove unused parameter maxSequenceLength from FileInputs and related places --- src/main/scala/com/jnpersson/kmers/SparkTool.scala | 7 +------ src/main/scala/com/jnpersson/kmers/input/FileInputs.scala | 6 ++---- src/main/scala/com/jnpersson/slacken/Slacken.scala | 1 - src/test/scala/com/jnpersson/kmers/InputReaderProps.scala | 3 +-- src/test/scala/com/jnpersson/slacken/Testing.scala | 2 +- 5 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/jnpersson/kmers/SparkTool.scala b/src/main/scala/com/jnpersson/kmers/SparkTool.scala index fcb8c2828..bab3cc104 100644 --- a/src/main/scala/com/jnpersson/kmers/SparkTool.scala +++ b/src/main/scala/com/jnpersson/kmers/SparkTool.scala @@ -67,13 +67,8 @@ class SparkConfiguration(args: Array[String])(implicit val spark: SparkSession) val partitions = opt[Int](descr = "Number of shuffle partitions/parquet buckets for indexes (default 200)", default = Some(200)) - protected def defaultMaxSequenceLength = 10000000 //10M bps - val maxSequenceLength = opt[Int](name = "maxlen", - descr = s"Maximum length of a single short sequence/read (default $defaultMaxSequenceLength)", - default = Some(defaultMaxSequenceLength)) - def inputReader(files: Seq[String], k: Int, grouping: InputGrouping)(implicit spark: SparkSession) = - new FileInputs(files, k, maxSequenceLength(), grouping) + new FileInputs(files, k, grouping) def finishSetup(): this.type = { verify() diff --git a/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala b/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala index 709a2088e..f4b3dec12 100644 --- a/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala +++ b/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala @@ -33,18 +33,17 @@ import org.apache.spark.sql.{Dataset, SparkSession} * * @param files files to read. A name of the format @list.txt will be parsed as a list of files. * @param k length of k-mers - * @param maxReadLength max length of short sequences * @param inputGrouping whether input files are paired-end reads. If so, they are expected to appear in sequence, so that * the first file is a _1, the second a _2, the third a _1, etc. * @param spark the SparkSession */ -class FileInputs(val files: Seq[String], k: Int, maxReadLength: Int, inputGrouping: InputGrouping = Ungrouped)(implicit spark: SparkSession) { +class FileInputs(val files: Seq[String], k: Int, inputGrouping: InputGrouping = Ungrouped)(implicit spark: SparkSession) { protected val conf = new HConfiguration(spark.sparkContext.hadoopConfiguration) import spark.sqlContext.implicits._ /** Clone this Inputs with a different value of k. */ def withK(newK: Int): FileInputs = - new FileInputs(files, newK, maxReadLength, inputGrouping) + new FileInputs(files, newK, inputGrouping) private val expandedFiles = files.toList.flatMap(f => { if (f.startsWith("@")) { @@ -140,7 +139,6 @@ object HadoopInputReader { * @param k length of k-mers */ abstract class HadoopInputReader[R <: AnyRef](file: String, k: Int)(implicit spark: SparkSession) extends InputReader { - import spark.sqlContext.implicits._ protected val conf = new HConfiguration(sc.hadoopConfiguration) //Fastdoop parameter for correct overlap between partial sequences diff --git a/src/main/scala/com/jnpersson/slacken/Slacken.scala b/src/main/scala/com/jnpersson/slacken/Slacken.scala index f67bacb5e..687bf56c9 100644 --- a/src/main/scala/com/jnpersson/slacken/Slacken.scala +++ b/src/main/scala/com/jnpersson/slacken/Slacken.scala @@ -46,7 +46,6 @@ class SlackenConf(args: Array[String])(implicit spark: SparkSession) extends Spa override def defaultMinimizerWidth: Int = 31 override def defaultMinimizerSpaces: Int = 7 override def defaultOrdering: String = "xor" - override def defaultMaxSequenceLength: Int = 100000000 //100M bps override def defaultXORMask: Long = DEFAULT_TOGGLE_MASK override def canonicalMinimizers: Boolean = true diff --git a/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala b/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala index 872e0ced3..c02680a93 100644 --- a/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala +++ b/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala @@ -34,7 +34,6 @@ class InputReaderProps extends AnyFunSuite with SparkSessionTestWrapper with Sca implicit val sp = spark val k = 35 - val maxReadLength = 100000 // Write a new temporary file with content def generateFile(content: String, extension: String): String = { @@ -45,7 +44,7 @@ class InputReaderProps extends AnyFunSuite with SparkSessionTestWrapper with Sca //Read files using InputReader def readFiles(files: Seq[String]): FileInputs = - new FileInputs(files, k, maxReadLength) + new FileInputs(files, k) //Delete a temporary file def removeFile(file: String): Unit = { diff --git a/src/test/scala/com/jnpersson/slacken/Testing.scala b/src/test/scala/com/jnpersson/slacken/Testing.scala index ab36d214a..0f1d41b28 100644 --- a/src/test/scala/com/jnpersson/slacken/Testing.scala +++ b/src/test/scala/com/jnpersson/slacken/Testing.scala @@ -171,7 +171,7 @@ object TestData { val numberOf35Mers = Map(526997 -> 2902850, 455631 -> 3565872, 9606 -> 639784) def inputs(k: Int)(implicit spark: SparkSession) = - new FileInputs(List("testData/slacken/slacken_tinydata.fna"), k, 10000000) + new FileInputs(List("testData/slacken/slacken_tinydata.fna"), k) def library(k: Int)(implicit spark: SparkSession): GenomeLibrary = GenomeLibrary(inputs(k), "testData/slacken/seqid2taxid.map") From 272e8ba839afe7a72dacf2401b77d411e528b737 Mon Sep 17 00:00:00 2001 From: Johan Nystrom Date: Sun, 22 Jun 2025 13:27:55 +0900 Subject: [PATCH 07/10] Remove the unused parameter k from some InputReaders --- .../jnpersson/kmers/input/FileInputs.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala b/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala index f4b3dec12..2feac0b1a 100644 --- a/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala +++ b/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala @@ -65,20 +65,20 @@ class FileInputs(val files: Seq[String], k: Int, inputGrouping: InputGrouping = if (lower.endsWith("fq") || lower.endsWith("fastq")) { println(s"Assuming fastq format for $file") - new FastqTextInput(file, k) + new FastqTextInput(file) } else if (lower.endsWith(".fq.gz") || lower.endsWith(".fastq.gz") || lower.endsWith(".fq.bz2") || lower.endsWith(".fastq.bz2")) { println(s"Assuming compressed fastq format for $file") - new FastqTextInput(file, k) + new FastqTextInput(file) } else if (HDFSUtil.fileExists(faiPath)) { println(s"$faiPath found. Using indexed fasta format for $file") new IndexedFastaInput(file, k) } else if (lower.endsWith(".gz") || lower.endsWith(".bz2") ) { println(s"Assuming compressed fasta format for $file") - new FastaTextInput(file, k) + new FastaTextInput(file) } else { println(s"$faiPath not found. Assuming simple fasta format for $file") - new FastaTextInput(file, k) + new FastaTextInput(file) } } @@ -138,12 +138,9 @@ object HadoopInputReader { * @param file the file to read * @param k length of k-mers */ -abstract class HadoopInputReader[R <: AnyRef](file: String, k: Int)(implicit spark: SparkSession) extends InputReader { +abstract class HadoopInputReader[R <: AnyRef](file: String)(implicit spark: SparkSession) extends InputReader { protected val conf = new HConfiguration(sc.hadoopConfiguration) - //Fastdoop parameter for correct overlap between partial sequences - conf.set("k", k.toString) - protected def loadFile(input: String): RDD[R] protected def rdd: RDD[R] = loadFile(file) @@ -155,7 +152,7 @@ abstract class HadoopInputReader[R <: AnyRef](file: String, k: Int)(implicit spa * Huge sequences are best processed with the [[IndexedFastaFormat]] instead (.fna files) * Supports compression via Spark's text reader. */ -class FastaTextInput(file: String, k: Int)(implicit spark: SparkSession) extends HadoopInputReader[Array[String]](file, k) { +class FastaTextInput(file: String)(implicit spark: SparkSession) extends HadoopInputReader[Array[String]](file) { import spark.sqlContext.implicits._ import HadoopInputReader._ @@ -187,7 +184,7 @@ class FastaTextInput(file: String, k: Int)(implicit spark: SparkSession) extends /** * Reader for fastq records. Supports compression via Spark's text input layer. */ -class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends HadoopInputReader[Array[String]](file, k) { +class FastqTextInput(file: String)(implicit spark: SparkSession) extends HadoopInputReader[Array[String]](file) { import spark.sqlContext.implicits._ import HadoopInputReader._ @@ -224,11 +221,14 @@ class FastqTextInput(file: String, k: Int)(implicit spark: SparkSession) extends * @param k length of k-mers */ class IndexedFastaInput(file: String, k: Int)(implicit spark: SparkSession) - extends HadoopInputReader[PartialSequence](file, k) { + extends HadoopInputReader[PartialSequence](file) { import spark.sqlContext.implicits._ import HadoopInputReader._ + //Fastdoop parameter for correct overlap between partial sequences + conf.set("k", k.toString) + protected def loadFile(input: String): RDD[PartialSequence] = sc.newAPIHadoopFile(input, classOf[IndexedFastaFormat], classOf[Text], classOf[PartialSequence], conf).values From 848463c17a730f45903c6bbfdc6a02d22bd404f9 Mon Sep 17 00:00:00 2001 From: Johan Nystrom Date: Sun, 22 Jun 2025 16:08:34 +0900 Subject: [PATCH 08/10] Make fastq reader more robust, in the presence of @ and + characters in quality scores --- .../scala/com/jnpersson/kmers/input/FileInputs.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala b/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala index 2feac0b1a..9e35c3261 100644 --- a/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala +++ b/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.{Configuration => HConfiguration} import org.apache.hadoop.io.Text import org.apache.spark.rdd.RDD import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.functions.{collect_list, element_at, lit, monotonically_increasing_id, slice} +import org.apache.spark.sql.functions.{collect_list, element_at, lit, monotonically_increasing_id, slice, substring} import org.apache.spark.sql.{Dataset, SparkSession} @@ -197,7 +197,14 @@ class FastqTextInput(file: String)(implicit spark: SparkSession) extends HadoopI withColumn("values", //group every 4 rows and give them the same recId collect_list($"value").over(Window.partitionBy("file").orderBy("rowId").rowsBetween(Window.currentRow, 3)) ). - where(element_at($"values", 3) === "+"). + where( + //In a valid FASTQ record, line 1/4 begins with @ and line 3/4 begins with +. + //The characters @ and + may sometimes also appear in quality scores. + //Checking both at lines 1 and 3 is a robust way to identify the right position of the 4-line window even + //in the presence of such confounding characters. + (substring(element_at($"values", 1), 1, 1) === "@").and( + substring(element_at($"values", 3), 1, 1) === "+") + ). select(slice($"values", 1, 2)).as[Array[String]] //Currently preserves only the header and the nucleotide string }.rdd From a801ba82afb4c884f030f48a2568e1b37a53a314 Mon Sep 17 00:00:00 2001 From: Johan Nystrom Date: Sun, 22 Jun 2025 16:27:56 +0900 Subject: [PATCH 09/10] Clean up comments --- .../scala/com/jnpersson/kmers/input/FileInputs.scala | 5 ++--- .../scala/com/jnpersson/kmers/input/InputReader.scala | 9 ++++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala b/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala index 9e35c3261..d4f6d3b9c 100644 --- a/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala +++ b/src/main/scala/com/jnpersson/kmers/input/FileInputs.scala @@ -96,7 +96,7 @@ class FileInputs(val files: Seq[String], k: Int, inputGrouping: InputGrouping = * Parse all files in this set as InputFragments * @param withAmbiguous whether to include ambiguous nucleotides. If not, the inputs will be split and only valid * nucleotides retained. - * @return + * @param sampleFraction the fraction to sample, if any. */ def getInputFragments(withAmbiguous: Boolean = false, sampleFraction: Option[Double] = None): Dataset[InputFragment] = { val readers = inputGrouping match { @@ -136,7 +136,6 @@ object HadoopInputReader { * A sequence input converter that reads data from one file using a specific * Hadoop format, making the result available as Dataset[InputFragment] * @param file the file to read - * @param k length of k-mers */ abstract class HadoopInputReader[R <: AnyRef](file: String)(implicit spark: SparkSession) extends InputReader { protected val conf = new HConfiguration(sc.hadoopConfiguration) @@ -225,7 +224,7 @@ class FastqTextInput(file: String)(implicit spark: SparkSession) extends HadoopI * Uses [[IndexedFastaFormat]] * * @param file the file to read - * @param k length of k-mers + * @param k length of k-mers. Needed for this input format as k-mers will cross boundaries between file splits */ class IndexedFastaInput(file: String, k: Int)(implicit spark: SparkSession) extends HadoopInputReader[PartialSequence](file) { diff --git a/src/main/scala/com/jnpersson/kmers/input/InputReader.scala b/src/main/scala/com/jnpersson/kmers/input/InputReader.scala index 76eefae9f..c1f903e99 100644 --- a/src/main/scala/com/jnpersson/kmers/input/InputReader.scala +++ b/src/main/scala/com/jnpersson/kmers/input/InputReader.scala @@ -32,7 +32,8 @@ case object Ungrouped extends InputGrouping case object PairedEnd extends InputGrouping object InputReader { - /** Add reverse complement sequences to inputs that have already been parsed. + /** Add reverse complement sequences to input fragments. + * Returns the original fragments plus their reverse complement. */ def addRCFragments(fs: Dataset[InputFragment])(implicit spark: SparkSession): Dataset[InputFragment] = { import spark.sqlContext.implicits._ @@ -77,13 +78,15 @@ abstract class InputReader(implicit spark: SparkSession) { def getSequenceTitles: Dataset[SeqTitle] /** - * Read sequence data as fragments from the input files, removing any newlines. + * Read sequence data as fragments from the input source * @return */ protected[input] def getFragments(): Dataset[InputFragment] /** - * Load sequence fragments from files, optionally adding reverse complements and/or sampling. + * Load sequence fragments from the source + * @param withAmbiguous Whether to remove ambiguous characters and whitespace + * @param The fraction to sample, if any */ def getInputFragments(withAmbiguous: Boolean, sampleFraction: Option[Double]): Dataset[InputFragment] = { From 88a50c6ffe637166699713a435507b4dd903fd4e Mon Sep 17 00:00:00 2001 From: Johan Nystrom Date: Mon, 30 Jun 2025 15:05:53 +0900 Subject: [PATCH 10/10] InputReaderProps: generate compressed files (uncompressed, gzip, bzip2) for testing --- .../jnpersson/kmers/InputReaderProps.scala | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala b/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala index c02680a93..25cb3f35f 100644 --- a/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala +++ b/src/test/scala/com/jnpersson/kmers/InputReaderProps.scala @@ -24,7 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks import org.scalatest.matchers.should.Matchers._ -import java.nio.file.Files +import java.nio.file.{Files, Path => FPath} /** Test the input reader using synthetic files of the various input formats. */ @@ -35,12 +35,8 @@ class InputReaderProps extends AnyFunSuite with SparkSessionTestWrapper with Sca val k = 35 - // Write a new temporary file with content - def generateFile(content: String, extension: String): String = { - val loc = Files.createTempFile(null, extension) - Files.write(loc, content.getBytes()) - loc.toString - } + def generateFileName(extension: String): FPath = + Files.createTempFile(null, extension) //Read files using InputReader def readFiles(files: Seq[String]): FileInputs = @@ -48,7 +44,7 @@ class InputReaderProps extends AnyFunSuite with SparkSessionTestWrapper with Sca //Delete a temporary file def removeFile(file: String): Unit = { - new java.io.File(file).delete() + HDFSUtil.deleteRecursive(file) } def removeSeparators(x: String): String = @@ -58,6 +54,18 @@ class InputReaderProps extends AnyFunSuite with SparkSessionTestWrapper with Sca case class SeqRecordFile(records: List[(String, InputFragment)], lineSeparator: String) { override def toString: String = records.map(_._1).mkString("") + + //Write a file using spark's text writer, using the specified compression + def write(path: String, compression: String) = { + import spark.implicits._ + val lines = records.map(_._1).toDS + lines.coalesce(1). //write as a single partition + write. + mode("overwrite"). + option("lineSep", lineSeparator). //the line separator is already inserted + option("compression", compression). + text(path) + } } //Do not shrink an individual record @@ -66,6 +74,15 @@ class InputReaderProps extends AnyFunSuite with SparkSessionTestWrapper with Sca val fastqQuality = (33 to 126).map(_.toChar) + def compressionFormats: Gen[SeqTitle] = + Gen.oneOf(List("none", "gzip", "bzip2")) + + def compExtension(compression: String): String = compression match { + case "gzip" => ".gz" + case "bzip2" => ".bz2" + case _ => "" + } + //lines have different length, to simulate a complex fasta file //Triples of (file data, expected input fragment, line separator) def fastaFileShortSequences(lineSep: String, seqGen: Gen[NTSeq]): Gen[(String, InputFragment)] = @@ -74,7 +91,7 @@ class InputReaderProps extends AnyFunSuite with SparkSessionTestWrapper with Sca dnaSeqs <- Gen.listOfN(lines, seqGen) id <- Gen.stringOfN(10, Gen.alphaNumChar) sequence = dnaSeqs.mkString("") - record = s">$id$lineSep" + dnaSeqs.mkString(lineSep) + lineSep + record = s">$id$lineSep" + dnaSeqs.mkString(lineSep) fragment = InputFragment(id, 0, sequence, None) } yield (record, fragment) @@ -83,7 +100,7 @@ class InputReaderProps extends AnyFunSuite with SparkSessionTestWrapper with Sca dnaSeq <- seqGen id <- Gen.stringOfN(10, Gen.alphaNumChar) quality <- Gen.stringOfN(dnaSeq.length, Gen.oneOf(fastqQuality)) - record = s"@$id$lineSep$dnaSeq$lineSep+$lineSep$quality$lineSep" + record = s"@$id$lineSep$dnaSeq$lineSep+$lineSep$quality" fragment = InputFragment(id, 0, dnaSeq, None) } yield (record, fragment) @@ -105,8 +122,9 @@ class InputReaderProps extends AnyFunSuite with SparkSessionTestWrapper with Sca } yield SeqRecordFile(records, lineSep) def testFileFormat(fileGen: Gen[SeqRecordFile], extension: String, withAmbiguous: Boolean): Unit = { - forAll(fileGen) { case file => - val loc = generateFile(file.toString, extension) + forAll(fileGen, compressionFormats) { case (file, comp) => + val loc = generateFileName(extension + compExtension(comp)).toString + file.write(loc, comp) val inputs = readFiles(List(loc)) val fragments = file.records.map(pair => (pair._2.header, pair._2.nucleotides)).sortBy(_._1) val got = inputs.getInputFragments(withAmbiguous = withAmbiguous).collect().