Несколько выходов в один вход списка - объединение файлов BAM в Nextflow

Я пытаюсь объединить x количество файлов BAM, созданных путем одновременного выполнения нескольких выравниваний (для пакетов y количества файлов fastq) в один единственный файл BAM в Nextflow.

Пока что при выполнении выравнивания и сортировки / индексации полученного bam-файла у меня есть следующее:

//Run minimap2 on concatenated fastqs
process miniMap2Bam {
        publishDir "$params.bamDir"
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        val dirString from dirStr
        val runString from stringRun
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        val runString into stringRun1
        file("${batchFastq}.bam") into bamFiles
        val dirString into dirStrSam

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

Где ${batchFastq}.bam - это файл BAM, содержащий пакет из y количества файлов fastq.

Этот конвейер завершается отлично, однако при попытке выполнить samtools merge с этими файлами bam в другом процессе (samToolsMerge) процесс запускается каждый раз, когда выполняется выравнивание (в данном случае 4), а не один раз для всех собранных файлов bam:

//Run samtools merge
process samToolsMerge {
        echo true
        publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
        cache 'deep'
        errorStrategy 'retry'
        maxRetries 3
        maxForks 10
        memory { 14.GB * task.attempt }

        input:
        val runString from stringRun1
        file bamFile from bamFiles.collect()
        val dirString from dirStrSam

        output:
        file("**")

        script:
        """
        samtools merge ${runString}.bam ${bamFile} 
        """
}

С выходом:

executor >  lsf (9)
[49/182ec0] process > catFastqs (1)     [100%] 1 of 1 ✔
[-        ] process > nanoPlotSummary   -
[0e/609a7a] process > miniMap2Bam (1)   [100%] 4 of 4 ✔
[42/72469d] process > samToolsMerge (2) [100%] 4 of 4 ✔




Completed at: 04-Mar-2021 14:54:21
Duration    : 5m 41s
CPU hours   : 0.2
Succeeded   : 9

Как я могу взять только получившиеся файлы bam из miniMap2Bam и прогнать их samToolsMerge за один раз вместо того, чтобы процесс выполнялся несколько раз?

Заранее спасибо!

РЕДАКТИРОВАТЬ: благодаря Палли в комментариях ниже проблема заключалась в передаче значений runString и dirString из предыдущего процесса в miniMap2Bam, а затем в samToolsMerge, что заставляло процесс повторяться каждый раз при передаче значения.

Решение было таким же простым, как удаление vals из miniMap2Bam (как показано ниже):

//Run minimap2 on concatenated fastqs
process miniMap2Bam {
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        file("${batchFastq}.bam") into bamFiles

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

person erichards52    schedule 04.03.2021    source источник
comment
Что такое runstring и dirstring и чего вы пытаетесь с их помощью достичь? Такое ощущение, что поведение, которое вы получаете, исходит от вывода значения в каналы значений. Я думаю, вы пытаетесь форсировать определенный поток, но не используете правильные инструменты, доступные в следующем потоке.   -  person Pallie    schedule 04.03.2021
comment
Спасибо за ваш ответ. Я считаю, что вы правы в том, что эти значения, вероятно, являются причиной такого поведения. Строка выполнения - это родительский каталог каталога, в котором размещены файлы fastq, а строка запуска - родительский каталог, в котором размещаются файлы fastq. Эти строки создаются при первоначальном чтении пакетов файлов fastq для успешной публикации файлов в правильных каталогах - я постараюсь найти обходной путь.   -  person erichards52    schedule 04.03.2021
comment
Большое спасибо, @Pallie! Теперь эта проблема решена путем простого перенаправления значений вокруг процесса, чтобы он больше не использовался в процессе слияния - я отредактирую вопрос, чтобы предоставить решение, хотя это довольно прямолинейно. Я хотел бы наградить вас, но я не уверен, что могу предоставить комментарии с ответом на голосование / выбор?   -  person erichards52    schedule 04.03.2021


Ответы (1)


Самым простым исправлением, вероятно, было бы прекращение передачи статической dirstring и runstring по каналам:

// Instead of a hardcoded path use a parameter you passed via CLI like you did with bamDir
dirString = file("/path/to/fastqs/")
runString = file("/path/to/fastqs/").getParent()
fastqBatch = Channel.from("/path/to/fastqs/")

//Run minimap2 on concatenated fastqs
process miniMap2Bam {
        publishDir "$params.bamDir"
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        file("${batchFastq}.bam") into bamFiles

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

//Run samtools merge
process samToolsMerge {
        echo true
        publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
        cache 'deep'
        errorStrategy 'retry'
        maxRetries 3
        maxForks 10
        memory { 14.GB * task.attempt }

        input:
        file bamFile from bamFiles.collect()

        output:
        file("**")

        script:
        """
        samtools merge ${runString}.bam ${bamFile} 
        """

person Pallie    schedule 04.03.2021
comment
Привет, Аган Палли, я выберу это как ответ на свой вопрос, так как проблема теперь решена, и он точно отображает то, что мне нужно реализовать в моем конвейере. Спасибо! - person erichards52; 04.03.2021