2020-02-03-How_to_use_snakemake_checkpoints_to_extract_files_from_an_archive.utf8

I use Snakemake for almost all of my workflows. Snakemake is a workflow manager built for bioinformatics. It does a ton of really cool things…see this tutorial for more information.

I usually include data download rules in my snakefiles, and most of the time, this data is packaged as a gzipped tar archive (.tar.gz). Even when the data belongs to me, I like to package it this way and put it online e.g. in an OSF repository so that anyone can repeat my workflow. Since I keep all of my Snakefiles on GitHub, I also like knowing that I can regenerate my intermediate files and results if my cluster crashes.

However, untarring an archive file is a 1 -> many rule. Snakemake checkpoints handle this situation well.

Below I show a small snakefile from a real workflow where I implemented a snakemake checkpoint. The files are located on OSF, so this workflow can be run by anyone. I’ve included instructions at the bottom for setting up an environment in which to run this workflow.

This workflow does the following:

  • rule download_plass: downloads a tar archive that contains amino acid assemblies for a bunch of metagenome-assembled genomes.
  • checkpoint decompress_plass: decompresses and extracts files from the tar archive.
  • rule cdhit_plass: for each file that was extracted from the tar archive, clusters amino acid sequences at 95% identity.
  • def aggregate_decompress_plass: this function solves for the file names output by the checkpoint. checkpoint_output stores the output directory from the checkpoint rule. file_names expands the output file name around the wildcard, solving for the wildcard by chopping off the ending of the file generated in the checkpoint.
  • rule finished: generates an empty file that becomes the target in rule all. The function name aggregate_decompress_plass is used as input, as it returns the file names that should be generated to produce the input of this rule. If there were some legitimate summary I needed to do of my clustered files, I could use the same syntax as is used in this rule, do the summary, and output a file containing that summary.
rule all: 
    input:
        "finished.txt"

rule download_plass:
    output: "inputs/plass/hu-s1-plass-hardtrim-clean-jan08.2019.tar.gz"
    shell:'''
    curl -L -o {output} https://osf.io/9hg85/download
    '''

checkpoint decompress_plass:
    output: directory("inputs/plass/hu-s1_k31_r1_search_oh0")
    input: "inputs/plass/hu-s1-plass-hardtrim-clean-jan08.2019.tar.gz"
    params: folder = "inputs/plass"
    shell:'''
    mkdir -p {params.folder}
    tar xvf {input} -C {params.folder}
    '''
    
rule cdhit_plass:
    output: "outputs/cd-hit95/{mag}.cdhit95.faa"
    input: "inputs/plass/hu-s1_k31_r1_search_oh0/{mag}.fa.cdbg_ids.reads.hardtrim.fa.gz.plass.cdhit.fa.clean.cut.dup"
    benchmark: "benchmarks/{mag}.cdhit95.benchmark.txt"
    conda: 'env.yml'
    shell:'''
    cd-hit -i {input} -o {output} -c .95
    '''

def aggregate_decompress_plass(wildcards):
    checkpoint_output = checkpoints.decompress_plass.get(**wildcards).output[0]    
    file_names = expand("outputs/cd-hit95/{mag}.cdhit95.faa", 
                        mag = glob_wildcards(os.path.join(checkpoint_output, "{mag}.fa.cdbg_ids.reads.hardtrim.fa.gz.plass.cdhit.fa.clean.cut.dup")).mag)
    return file_names
    
rule finished:
    input: aggregate_decompress_plass
    output: "finished.txt"
    shell:'''
    touch {output}
    '''

Checkpoints cause snakemake to re-evaluate the directed acyclic graph it uses to solve the order of execution of rules.

The mag wildcard is born from the output of the checkpoint. It’s not included in the checkpoint rule because it doesn’t exist yet. Instead, directory() is used to specify that the output of the rule is a directory. The mag wildcard exists after the checkpoint is run. Snakemake solves for this wildcard by using the function aggregate_decompress_plass. It looks into the directory output at the checkpoint, cuts off the suffix of the file specified in the function, and uses what’s left to create the mag wildcard. This wildcard could also be used in subsequent rules, however the aggregate_decompress_plass function would need to change to solve for output files of those rules instead:

rule all: 
    input:
        "finished.txt"

rule download_plass:
    output: "inputs/plass/hu-s1-plass-hardtrim-clean-jan08.2019.tar.gz"
    shell:'''
    curl -L -o {output} https://osf.io/9hg85/download
    '''

checkpoint decompress_plass:
    output: directory("inputs/plass/hu-s1_k31_r1_search_oh0")
    input: "inputs/plass/hu-s1-plass-hardtrim-clean-jan08.2019.tar.gz"
    params: folder = "inputs/plass"
    shell:'''
    mkdir -p {params.folder}
    tar xvf {input} -C {params.folder}
    '''
    
rule cdhit_plass:
    output: "outputs/cd-hit95/{mag}.cdhit95.faa"
    input: "inputs/plass/hu-s1_k31_r1_search_oh0/{mag}.fa.cdbg_ids.reads.hardtrim.fa.gz.plass.cdhit.fa.clean.cut.dup"
    benchmark: "benchmarks/{mag}.cdhit95.benchmark.txt"
    conda: 'env.yml'
    shell:'''
    cd-hit -i {input} -o {output} -c .95
    '''
    
rule paladin_index_plass:
    output: "outputs/cd-hit95/{nbhd}.cdhit95.faa.bwt"
    input: "outputs/cd-hit95/{nbhd}.cdhit95.faa"
    benchmark: "benchmarks/{nbhd}.paladin_index.txt"
    conda: ENV
    shell:'''
    paladin index -r3 {input}
    '''
    
def aggregate_decompress_plass(wildcards):
    checkpoint_output = checkpoints.decompress_plass.get(**wildcards).output[0]    
    file_names = expand("outputs/cd-hit95/{mag}.cdhit95.faa.bwt", 
                        mag = glob_wildcards(os.path.join(checkpoint_output, "{mag}.fa.cdbg_ids.reads.hardtrim.fa.gz.plass.cdhit.fa.clean.cut.dup")).mag)
    return file_names
    
rule finished:
    input: aggregate_decompress_plass
    output: "finished.txt"
    shell:'''
    touch {output}
    '''

Lastly, you can also remove rule finished if you don’t want to have an extra empty file generated at the end of the workflow and you don’t have a summary step. To do this, aggregate_decompress_plass can be used as the input to rule all. However, the function needs to be defined before the rule all is evaluated. I prefer to have aggregate_decompress_plass inline with the checkpoint and rules it’s operating on, so I prefer to use the empty file. However, I’ve included the alternative below.

def aggregate_decompress_plass(wildcards):
    checkpoint_output = checkpoints.decompress_plass.get(**wildcards).output[0]    
    file_names = expand("outputs/cd-hit95/{mag}.cdhit95.faa", 
                        mag = glob_wildcards(os.path.join(checkpoint_output, "{mag}.fa.cdbg_ids.reads.hardtrim.fa.gz.plass.cdhit.fa.clean.cut.dup")).mag)
    return file_names
    
rule all: 
    input:
        "finished.txt"

rule download_plass:
    output: "inputs/plass/hu-s1-plass-hardtrim-clean-jan08.2019.tar.gz"
    shell:'''
    curl -L -o {output} https://osf.io/9hg85/download
    '''

checkpoint decompress_plass:
    output: directory("inputs/plass/hu-s1_k31_r1_search_oh0")
    input: "inputs/plass/hu-s1-plass-hardtrim-clean-jan08.2019.tar.gz"
    params: folder = "inputs/plass"
    shell:'''
    mkdir -p {params.folder}
    tar xvf {input} -C {params.folder}
    '''
    
rule cdhit_plass:
    output: "outputs/cd-hit95/{mag}.cdhit95.faa"
    input: "inputs/plass/hu-s1_k31_r1_search_oh0/{mag}.fa.cdbg_ids.reads.hardtrim.fa.gz.plass.cdhit.fa.clean.cut.dup"
    benchmark: "benchmarks/{mag}.cdhit95.benchmark.txt"
    conda: 'env.yml'
    shell:'''
    cd-hit -i {input} -o {output} -c .95
    '''

Creating an environment to run this snakemake workflow.

To run this workflow, you need to have conda installed. Click here for a tutorial on how to install, configure, and run conda.

Once you have conda installed, create an environment and install snakemake.

conda create -n checkpoint snakemake-minimal=5.9.1
conda activate checkpoint

This workflow also relies on a conda environment to run cd-hit. Snakemake generates this environment from a user-supplied file. Make a file called env.yml, and in it, put the following text:

channels:
   - conda-forge
   - bioconda
   - defaults
dependencies:
   - cd-hit=4.8.1

Save the text above in a file called Snakefile and run:

snakemake --use-conda 

If you’d like to save your snakefile under a different name like my_checkpoint_snake, you can run:

snakemake -s my_checkpoint_snake --use-conda