Freeze 10 Pipeline

Colin Gross

2022-09-30

BRAVO Data Prep

Repo of tools, workflows, and hacks.

github.com/statgen/bravo_data_prep

workspace/data_prep/
├── hacks/
├── README.md
├── tools/
└── workflows/

Tools

Purpose built applications for processing steps.

  • python tools
    • python3
    • requires pysam
  • cpp tools
    • cget for dependencies
    • builds htslib

Workflows

Nextflow to orchestrate processing of VCF and CRAM data.

  • coverage
  • process_vcf
  • sequences

Hacks

  • Clearly dilineated in a directory.
  • One off scripts and experimenting.
  • May be useful in future development.
  • Provide some history otherwise excluded from git.
  • Described by hacks/readme.md.
data_prep/hacks/
├── 198_samples.tsv
├── 198_samples.txt
├── make_198_sample_links.sh
├── make_198_samples_tsv.sh
├── readme.md

Inputs

  • VCFs with DP & GQ fields.
  • VCFs with genotypes only.
  • VCFs with sites only.
  • CRAM files
  • samples_list.txt
  • samples_list.txt (not the same as above)
  • samples_list.tsv (also not the same)

Outputs

  • Files to be loaded into MongoDB
    • QC_Metrics
    • VCFs
  • Files to be accessed at runtime
    • CRAMs
    • Coverage depth summaries

Basis Data

Produced by process_vcf workflow

/bravo/data/basis/
├── qc_metrics
│   └── metrics.json.gz
├── reference
│   ├── canonical_transcripts.tsv.gz
│   ├── gencode.v38.annotation.gtf.gz
│   ├── hgcn_genenames.tsv.gz
│   └── omim_ensembl_refs.tsv.gz
└── vcfs
    ├── chr11.bravo.vcf.gz
    └── chr12.bravo.vcf.gz

Runtime Data

Produced by coverage and sequences workflows

/bravo/data/runtime
├── cache
├── coverage
│   ├── bin_0.25
│   │   ├── chr11.bin_0.25.tsv.gz
│   │   ├── chr11.bin_0.25.tsv.gz.tbi
│   │   ├── chr12.bin_0.25.tsv.gz
│   │   └── chr12.bin_0.25.tsv.gz.tbi
│   ├── bin_0.50 ...
│   ├── bin_0.75 ...
│   ├── bin_1.00 ...
│   └── full ...
├── crams
│   ├── sequences ...
│   ├── variant_map.tsv.gz
│   └── variant_map.tsv.gz.tbi
└── reference
    ├── hs38DH.fa
    └── hs38DH.fa.fai

Scaling

Add more nodes when needed until available queue is running or maximum nodes provisioned.

Nextflow Config

Nextflow config handles limit of jobs to enqueue at a time.

executor {
  $slurm {
    queueSize = 1000
    jobName = { "bravo_coverage" }
  }

SLURM Config

Slurm on GCP handles limit of nodes to spin up.

partitions = [
  { name                 = "highcpu"
    machine_type         = "n1-highcpu-8"
    static_node_count    = 0
    max_node_count       = 25
  },
  { name                 = "highmem"
    machine_type         = "n2-highmem-4"
    static_node_count    = 0
    max_node_count       = 52
  }

Performance

Refactored depth count aggregation of coverage workflow to use successive rounds of concurrent reads.

Aggregate Depths

Aggregation depth counts needs to read all the depth files and concatenate rows. Individual depth files that look like:

chr11 52230001  20
chr11 52230002  14
chr11 52230003  22

to single concatenated depth file like:

chr11 52230001  20;18;29;18
chr11 52230002  14;21;24;24 
chr11 52230003  22;21;22;12

Parallel Reading with Pipes

Convenient method of reading four or five files at a time.

# Create pipes for tabix reading
PIPES=()
PIPE_COUNT=0

for INFILE in \${INFILES[@]}
do
  PIPE_NAME=pipe_\${PIPE_COUNT}
  mkfifo \${PIPE_NAME}
  PIPES+=( \${PIPE_NAME} )

  # Start reading depth file into pipe
  (tabix \${INFILE} ${chromosome}:\${POS}-\${END} > \${PIPE_NAME}) &

  let "PIPE_COUNT = PIPE_COUNT + 1"
done

Aggregate multiple files

  • mlr is a tool for tabular data.
  • Here it’s concatenating the depth counts from chr\tpos\tdepth files.
  • mlr handles multiple files increasing throughput.
  • This approach can aggregate prev aggregated files.
# Aggregate depths from depth file chunks
mlr -N --tsv 'nest' --ivar ";" -f 3 \${PIPES[@]} |\
  sort --numeric-sort --key=2 |\
  bgzip >> ${result_file}

Repeat Aggregation Until Done.

Nextflow’s DSL 1.0 doesn’t support looping.

This does not work:

process aggregate_depths_loop {
  input:
  tuple file(depth_files) from agg_channel

  output:
  tuple file("${chrom}_${task.index}.tsv.gz") into agg_channel

  when: aggregation_complete != true

Unrolled loop

  • Handles limited quantity of input files
  • Makes workflow script longer & repetative
process aggregate_depths_rnd_1 {
  input:  file(depth_files) from pileups
  output: file("${chrom}_rnd_1_${task.index}.tsv.gz") into agg_rnd_1

process aggregate_depths_rnd_2 {
  input:  file(depth_files) from agg_rnd_1
  output: file("${chrom}_rnd_1_${task.index}.tsv.gz") into agg_rnd_2

process aggregate_depths_rnd_3 {
  input:  file(depth_files) from agg_rnd_2
  output: file("${chrom}_rnd_1_${task.index}.tsv.gz") into agg_rnd_3

Runtimes

chr11 & chr12 for 198 samples

  • process_vcf: ~ 2000 cpu hours
  • coverage: ~ 680 cpu hours