项目作者: FlyPythons

项目描述 :
Module to create DAG Task and Manage Task on SGE
高级语言: Python
项目地址: git://github.com/FlyPythons/DAGflow.git
创建时间: 2017-12-27T06:39:09Z
项目社区:https://github.com/FlyPythons/DAGflow

开源协议:MIT License

下载


DAGflow

a python Module to create DAG Task and Manage Task on SGE

Installation

1. Requirements

Python (2.5 or later)
Sun Grid Engine(SGE)

DAGflow has been tested on CentOS 7.2.1511 and Ubuntu 12.04.4 LTS

2. Install

Download the code and unzip it into desired installation directory

  1. git clone https://github.com/FlyPythons/DAGflow.git

Tutorial

The following tutorial shows how to create a DAGflow and run it

1. Draw you work flow

Now i have a set of fasta files and i want to blast them to a db named ‘db.fasta’.
To complete this work, a workflow as following is needed
image

2. Write your workflow script

At first, you should write your workflow script

  1. import os
  2. from dagflow import DAG, Task, ParallelTask, do_dag
  3. inputs = ['1.fasta', "2.fasta", "3.fasta", "4.fasta"]
  4. db = "db.fasta"
  5. db = os.path.abspath(db)
  6. # create a DAG object
  7. my_dag = DAG("blast")
  8. # create the first task 'make_db'
  9. make_db = Task(
  10. id="make_db", # your task id, should be unique
  11. work_dir=".", # you task work directory
  12. type="local", # the way your task run. if "sge", task will submit with qsub
  13. option={}, # the option of "sge" or "local"
  14. script="makeblastdb -in %s -dbtype nucl" % db # the command of the task
  15. )
  16. # when you create a task, then add it to DAG object
  17. my_dag.add_task(make_db)
  18. # then add blast tasks
  19. blast_tasks = ParallelTask(id="blast",
  20. work_dir="{id}",
  21. type="sge",
  22. option="-pe smp 4 -q all.q",
  23. script="blastn -in {query} -db %s -outfmt 6 -out {query}.m6",
  24. query=inputs)
  25. my_dag.add_task(*blast_tasks)
  26. make_db.set_downstream(*blast_tasks)
  27. # add blast_join task to join blast results
  28. blast_join = Task(
  29. id="blast_join",
  30. work_dir=".",
  31. type="local", # option is default
  32. script="cat */*.m6 > blast.all.m6"
  33. )
  34. # you should always remember to add you task to DAG object when created
  35. my_dag.add_task(blast_join)
  36. # this task need a list of tasks in blast_task all done
  37. blast_join.set_upstream(*blast_tasks)
  38. # all of you tasks were added to you workflow, you can run it
  39. do_dag(my_dag)

Now, your workflow script is completed, you can name it as ‘workflow.py’

3. Run you workflow

You can run you workflow script as a python script using the following commands.

  1. python workflow.py

Re-run your workflow if it was break in the middle

For some reason, you workflow was broken with some tasks undone.
You can use the same command python workflow.py to re-run the undone jobs.

set dependence between workflow and task

Sometimes you may want to add a workflow to another workflow, this can be down as following:

  1. from DAGflow import *
  2. # two workflow wf1 and wf2
  3. wf1 = DAG("workflow1")
  4. wf2 = DAG("workflow2")
  5. task1 = Task(
  6. id="task",
  7. work_dir=".",
  8. script="hello, i am a task"
  9. )
  10. # set wf2 depends on wf1
  11. wf1.add_dag(wf2)
  12. # set task1 depends on wf2
  13. task1.set_upstream(wf2.tasks.values())