@@ -309,45 +309,10 @@ class ADAMPartitionTask(Task):
309309
310310 adam_command = Parameter ()
311311 allowed_file_formats = Parameter ()
312+ source_edition = Parameter ()
313+ edition = Parameter ()
312314 partition_strategy_file = Parameter ()
313- source_edition = 'basic'
314- edition = 'locuspart'
315-
316- def requires (self ):
317- return ADAMBasicTask (adam_command = self .adam_command ,
318- allowed_file_formats = self .allowed_file_formats )
319-
320- def run (self ):
321- adam_cmd = ('{hadoop_home}/bin/hadoop jar {adam_partitioning_jar}'
322- ' CrunchPartitionTool -D mapreduce.job.reduces={parallelism}'
323- ' {partition_strategy_file} {source} {target}' ).format (
324- hadoop_home = os .environ ['HADOOP_HOME' ],
325- adam_partitioning_jar = os .environ ['ADAM_PARTITIONING_JAR' ],
326- parallelism = 1 ,
327- partition_strategy_file = self .partition_strategy_file ,
328- source = target_s3n_url (ToastConfig ().config ['name' ],
329- edition = self .source_edition ),
330- target = target_s3n_url (ToastConfig ().config ['name' ],
331- edition = self .edition ))
332- p = Popen (adam_cmd , shell = True )
333- p .wait ()
334-
335- if p .returncode == 0 :
336- create_SUCCESS_file (target_s3_url (ToastConfig ().config ['name' ],
337- edition = self .edition ))
338-
339- def output (self ):
340- return S3FlagTarget (target_s3_url (ToastConfig ().config ['name' ],
341- edition = self .edition ))
342-
343-
344- class ADAMFlattenPartitionTask (Task ):
345-
346- adam_command = Parameter ()
347- allowed_file_formats = Parameter ()
348- partition_strategy_file = Parameter ()
349- source_edition = 'flat'
350- edition = 'flat_locuspart'
315+ parallelism = Parameter ()
351316
352317 def requires (self ):
353318 return ADAMBasicTask (adam_command = self .adam_command ,
@@ -359,7 +324,7 @@ def run(self):
359324 ' {partition_strategy_file} {source} {target}' ).format (
360325 hadoop_home = os .environ ['HADOOP_HOME' ],
361326 adam_partitioning_jar = os .environ ['ADAM_PARTITIONING_JAR' ],
362- parallelism = 1 ,
327+ parallelism = self . parallelism ,
363328 partition_strategy_file = self .partition_strategy_file ,
364329 source = target_s3n_url (ToastConfig ().config ['name' ],
365330 edition = self .source_edition ),
@@ -376,20 +341,27 @@ def output(self):
376341 return S3FlagTarget (target_s3_url (ToastConfig ().config ['name' ],
377342 edition = self .edition ))
378343
379-
380344class VCF2ADAMTask (Task ):
381345
382346 def requires (self ):
347+ conf = ToastConfig ().config
348+ parallelism = conf ['numPartitionsHint' ] if 'numPartitionsHint' in conf else 1
383349 basic = ADAMBasicTask (adam_command = 'vcf2adam' ,
384350 allowed_file_formats = ['vcf' ])
385351 flat = ADAMFlattenTask (adam_command = 'vcf2adam' ,
386352 allowed_file_formats = ['vcf' ])
387353 locuspart = ADAMPartitionTask (adam_command = 'vcf2adam' ,
388354 allowed_file_formats = ['vcf' ],
389- partition_strategy_file = 'genotypes-partition-strategy' )
390- flat_locuspart = ADAMFlattenPartitionTask (adam_command = 'vcf2adam' ,
391- allowed_file_formats = ['vcf' ],
392- partition_strategy_file = 'flat-genotypes-partition-strategy' )
355+ source_edition = 'basic' ,
356+ edition = 'locuspart' ,
357+ partition_strategy_file = 'genotypes-partition-strategy' ,
358+ parallelism = parallelism )
359+ flat_locuspart = ADAMPartitionTask (adam_command = 'vcf2adam' ,
360+ allowed_file_formats = ['vcf' ],
361+ source_edition = 'flat' ,
362+ edition = 'flat_locuspart' ,
363+ partition_strategy_file = 'flat-genotypes-partition-strategy' ,
364+ parallelism = parallelism )
393365 dependencies = [basic ]
394366 for edition in ToastConfig ().config ['editions' ]:
395367 if edition == 'basic' :
@@ -412,16 +384,24 @@ def output(self):
412384class BAM2ADAMTask (Task ):
413385
414386 def requires (self ):
387+ conf = ToastConfig ().config
388+ parallelism = conf ['numPartitionsHint' ] if 'numPartitionsHint' in conf else 1
415389 basic = ADAMBasicTask (adam_command = 'transform' ,
416390 allowed_file_formats = ['sam' , 'bam' ])
417391 flat = ADAMFlattenTask (adam_command = 'transform' ,
418392 allowed_file_formats = ['sam' , 'bam' ])
419393 locuspart = ADAMPartitionTask (adam_command = 'transform' ,
420394 allowed_file_formats = ['sam' , 'bam' ],
421- partition_strategy_file = 'alignments-partition-strategy' )
422- flat_locuspart = ADAMFlattenPartitionTask (adam_command = 'transform' ,
423- allowed_file_formats = ['sam' , 'bam' ],
424- partition_strategy_file = 'flat-alignments-partition-strategy' )
395+ source_edition = 'basic' ,
396+ edition = 'locuspart' ,
397+ partition_strategy_file = 'alignments-partition-strategy' ,
398+ parallelism = parallelism )
399+ flat_locuspart = ADAMPartitionTask (adam_command = 'transform' ,
400+ allowed_file_formats = ['sam' , 'bam' ],
401+ source_edition = 'flat' ,
402+ edition = 'flat_locuspart' ,
403+ partition_strategy_file = 'flat-alignments-partition-strategy' ,
404+ parallelism = parallelism )
425405 dependencies = [basic ]
426406 for edition in ToastConfig ().config ['editions' ]:
427407 if edition == 'basic' :
0 commit comments