Data Driven Workflow Planning in Cluster Management Systems Srinath Shankar srinath@cs.wisc.edu David J DeWitt dewitt@cs.wisc.edu Depar tment of Computer Sciences University of Wisconsin Madison, WI 53706-1685 ABSTRACT Traditional scientific computing has been associated with harnessing computation cycles within and across clusters of machines. In recent years, scientific applications have become increasingly dataintensive. This is especially true in the fields of astronomy and high energy physics. Furthermore, the lowered cost of disks and commodity machines has led to a dramatic increase in the amount of free disk space spread across machines in a cluster. This space is not being exploited by traditional distributed computing tools. In this paper we have evaluated ways to improve the data management capabilities of Condor, a popular distributed computing system. We have augmented the Condor system by providing the capability to store data used and produced by workflows on the disks of machines in the cluster. We have also replaced the Condor matchmaker with a new workflow planning framework that is cognizant of dependencies between jobs in a workflow and exploits these new data storage capabilities to produce workflow schedules. We show that our data caching and workflow planning framework can significantly reduce response times for data-intensive workflows by reducing data transfer over the network in a cluster. We also consider ways in which this planning framework can be made adaptive in a dynamic, multi-user, failure-prone environment. Categories and Subject Descriptors: C.2.4 [Computer-Communication Networks]:Distributed Systems ­ Distributed Applications; H.3.4 [Information Storage and Retrieval]:Systems and Software ­ Distributed Systems General Terms: Algorithms, Management, Performance, Design, Experimentation. Keywords: Workflow management, cluster management, data management, Condor, scheduling, planning, scientific computing. 1. INTRODUCTION Scientific applications have traditionally been considered to be compute-intensive operations. Several distributed computing systems have been used to harness the computing power of clusters of machines. But in recent years there has been an information explosion in several fields of science such as high-energy physics Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. To copy otherwise, to republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. HPDC'07, June 25­29, 2007, Monterey, California, USA. Copyright 2007 ACM 978-1-59593-673-8/07/0006 ...$5.00. (ATLAS [4]), astronomy (SDSS [6]) and bio-informatics (BIRN [1], BLAST [5]). Thus the issue of effective data management in clusters has come to the fore. One popular example of a distributed computing system is Condor, which is now used to manage more than 100,000 machines across 1400 clusters world-wide. Condor is primarily used to harvest CPU cycles on idle machines in a network. It is the foundation of many scientific workflow management systems such as GriPhyN [3] and GridDB [20]. Condor can perform both critical and useful tasks such as job migration between machines, input and output file transfers and job status notification. In the current Condor scheme, the disks of machines in a cluster are used only for temporary storage. The data used and produced by a job running on a machine is removed after the job completes. This made sense in an environment where disk space was scarce. Since the development of Condor in the mid `80s, the amount of disk space available to users has increased by orders of magnitude. Soon, even the smallest disk drives available will have a capacity of about 500 GB. Thus, the disk drives of desktop machines in a Condor cluster will have a significant amount of free space. Furthermore, as the cost of machines plummets, clusters are growing larger with an increasing proportion of commodity machines dedicated to running Condor jobs. These dedicated machines also have free disk space. Soon, even a modest cluster of 1000 machines will have half a petabyte of disk space distributed over its machines, with a considerable proportion of free space. Given the increasingly data-intensive nature of scientific workflows and the availability of disk space across machines in a cluster, the present Condor approach to running scientific workflows is very inefficient. Jobs in scientific workflows usually exhibit significant input-output relationships ­ the output of some jobs is used as input by others. Thus if a job's output data is kept on a machine after it completes, it can probably be used by dependent jobs that are scheduled on the same machine. Furthermore, several jobs in a workflow may share common input and executable files. Keeping these files on the disk of a compute machine will benefit other jobs that are scheduled on that machine. Additionally, scientific experiments are frequently re-run and different invocations of a workflow may share several files. Thus, there are benefits to keeping user data on caches of execute machines even after a workflow has finished execution. Another natural benefit that this provides is memoization ­ the ability to retrieve data products without re-running the computation. In this paper, we present a different data management architecture for Condor in which the input, output and executable files of jobs are cached on the local disks of machines in a cluster. We believe that this disk space has the potential to become the de-facto location for long-term storage of scientific data over computational 127 3. AN OVERVIEW OF CONDOR In Condor, users create submit files for jobs they wish to run. The submit file specifies details about the job such as the names of the executable, input and output files, and environment variables that need to be set at the time of job execution. In addition, a user may also specify requirements and rank attributes. The requirements attribute places constraints on the machines that can run the job. Rank is used to specify an order of preference for machines that meet the job's requirements. A job is submitted by specifying a submit file to the condor submit tool. The submit file is converted into a "classad", which is a list of attribute-value pairs. Machines also advertise their resources using classads and can place constraints on the jobs they wish to run. In the matchmaking process, each user is allotted a number of the machines in the system based on a fair-share scheme [2]. Jobs that are selected for scheduling are first sorted in order of user priority. For each job, the list of available machines is scanned, and the machine with the highest rank that satisfies the job's requirements is chosen for execution.(This machine is called the execute machine). The matchmaking process is done periodically (typically, every five minutes). The condor submit tool only accepts jobs consisting of a single program. Workflows consisting of DAGs are submitted using a different tool called condor submit dag. This tool allows users to specify parent-child dependencies between jobs. When a workflow is submitted, the Condor DAGMan daemon is spawned, and the top-level jobs in the workflow (jobs marked `A' in Figure 2) are submitted using condor submit. DAGMan continuously monitors the logs produced by Condor on the submit machine (the machine to which the user submits jobs) and uses condor submit to submit a child job once its parents have completed execution. Thus, the Condor matchmaker itself has no notion of workflows and cannot consider a job for scheduling until it is ready for execution. Before a job begins, its input and executable files are transferred from the submit machine to the execute machine. After a job has completed, the output files associated with that job are transferred back to the submit machine and deleted from the execute machine. When a child job is scheduled, files are again transferred from the submit machine to an execute machine (which could possibly be the same one that executed the parent job). This job scheduling and file transfer system is inefficient since the data dependencies within and across DAGs in a workflow are ignored. Figure 1: A diamond DAG Figure 2: Workflow from diamond DAGs clusters. Furthermore, exploiting the disks of machines effectively requires a different workflow planning scheme. The current Condor scheduling algorithm does not take into account the data dependencies between jobs in a workflow. We present a new planning algorithm that is cognizant of the location of Condor user data in a cluster as well as the data dependencies between jobs in a workflow. Our planning algorithm produces a schedule by comparing the time saved by running jobs in parallel with the time taken for data transfer when dependent jobs are run on different machines. Our results indicate that the planning algorithm significantly improves response times for data-intensive workflows. We also consider ways to make planning adaptable in a dynamic environment. Throughout this paper, we will refer to our new data caching and workflow planning framework as DAG-Condor. At the heart of our system architecture is a database that records the versions of input, output and executable files used in the workflow, the location of these files across the cluster as well as the job schedules produced by the planning algorithm. 2. NATURE OF SCIENTIFIC WORKFLOWS 4. THE NEED FOR DISK CACHING Consider a simple workflow consisting of two jobs A and B, in which job A produces a file F that is used by job B. In Condor, after job A executes on a machine (say, machine M), file F is moved back to the submit machine and then deleted from M. Once job B has been scheduled on a machine (perhaps M itself), F is transferred to that machine. If Condor had the ability to cache file F on M, then by scheduling B on M the transfer of file F across the network could be avoided. Similarly, if two jobs A and B share a common batch input file, then caching that file on machine M will avoid file transfers if both A and B both scheduled on M. Thus, caching can reduce the amount of pipeline and batch I/O that is transferred across the network. This can significantly reduce the response time for workflows that process large amounts of data. Furthermore, scientists frequently repeat experiments and simulations with different parameters. Executable and input files are frequently reused across multiple submissions of a workflow. For example, in the BLAST [5] workflow, a newly discovered protein sequence is checked with a file (nr db) consisting of a list of well known proteins. (See Section 9 for more details.) The file nr db In workflow literature, a DAG (Directed Acyclic Graph) refers to a set of programs with dependencies between them ­ that is, the input of some programs may depend on the output of others. This places partial constraints on the order of execution of programs. Consider the "diamond" DAG shown in Figure 1. Since programs B and C depend on the output of program A, they can be run only after A completes, but B and C themselves can be run in any order, or even in parallel. A typical scientific workflow frequently consists of a number of such DAGs with identical structures. These DAGs may share some common (global) input data. In their analysis of scientific workflows, Thain et al. [26] characterized the different types of I/O performed by jobs in their lifetime. Pipeline I/O refers to the data flow that occurs between parent and child programs within a particular DAG. The term batch I/O refers to input files that are common across all DAGs in a workflow. In Figure 1, files `File1' and `File2' are pipeline files. `FileInput' is a batch input to the DAGs in the example workflow shown in Figure 2. Batch input can occur at any level in a DAG. 128 Figure 3: Example `V' DAG is a batch input to the BLAST workflow. This file often remains unchanged over multiple submissions of BLAST. Replicating it on execute machines will be beneficial not just for the currently executing workflow but for future submissions as well. Also, when this file is updated with new information, maintaining an old version may be helpful in providing comparisons between results of the BLAST algorithm obtained with the new and old versions of the file for a given protein sequence. subsequent assignments of DAGs to that machine, we assume it caches the batch input of the workflow. Effectively, in this phase of scheduling, an entire DAG is treated as one job, and is preferentially assigned to the machine that caches most of its input files. At the end of this stage, certain machines will have a higher load than average, and the extra DAGs on these machines are chosen for parallelization in the next phase. Algorithm AssignDAGs shows the first phase of scheduling. GetBestMachine determines the best machine to assign to a DAG according to the formula for dag r untime. CalculateIONeeded returns the volume of data transfer needed to execute a given DAG on a machine. ScheduleDAG assigns jobs to machines while respecting the partial order imposed by the DAG. Suppose we have a workflow consisting of 6 DAGs and that four machines are allotted it. The schedule after the first phase is shown in Figure 4(a). There are two machines with a heavier load of 2 DAGs each (M1 and M3), and two lightly loaded machines with one DAG each (M2 and M4). Algorithm 5.1: G E T B E S T M AC H I N E(dag , machines, w ithO utput) bestT ime bestmachine null for each M machines io C A L C U L AT E I O N E E D E D(dag , M , w ithO utput) time M . timeS oF ar + dag . g etRunT ime() + io/net B W if time < bestT ime bestT ime time bestmachine M return (bestmachine) Algorithm 5.2: A S S I G N DAG S(dag s, machines) for each D dag s bestmachine G E T B E S T M AC H I N E(D , machines, f alse) scheduleD AG(D , bestmachine) Algorithm 5.3: PA R A L L E L I Z E(extr aD ag s, lig htM achines) for each D extr aD ag s dag P or tion null pr evJ ob D . N ext() thisJ ob null while not D . AtE nd() thisJ ob D . N ext() if pr evJ ob. level = thisJ ob. level dag P or tion. Append(thisJ ob) nextJ ob thisJ ob. childJ ob while nextJ ob. numP ar ents = 1 dag P or tion. Append(nextJ ob) nextJ ob nextJ ob. childJ ob dag Level nextJ ob. level bestmachine G E T B E S T M AC H I N E(dag P or tion, lig htM achines, tr ue) r untime dag P or tion. g etRunT ime() if compar eI O r untime(r untime, bestmachine) > 0 scheduleD AG(dag P or tion, bestmachine) pr evJ ob thisJ ob 5. DATA-AWARE PLANNING Scheduling all the jobs in a DAG on the same machine eliminates the need to transfer pipeline data across the network. However, it significantly reduces the degree of parallelism obtained. Consider the DAG shown in Figure 3. If A, B and C are scheduled on the same machine then there is no need to transfer F1 or F2 between machines. However, the resources of other idle machines may be wasted. Scheduling A and B to run in parallel might be better if the time taken to transfer F1 and F2 between machines is small compared to the execution times of A and B. Thus, in addition to machine availability information, we use file sizes and job run-times collected from previous runs of a workflow to produce a schedule. In Section 5.1, we outline the basic principles of our planning algorithm that exploits data stored across execute machines in the cluster. Next, in Section 5.2, we outline some of the modifications to make the planning algorithm more adaptive to machine failure and other contingencies. 5.1 The Planning algorithm In our workflow planning scheme (DAG-Condor), every workflow is assigned a number of machines in accordance with the Condor fair-share scheme [2]. This ensures that different users of the system get roughly equal amounts of CPU over a long period of time. Next, in the memoization step, any jobs whose data products have already been produced previously and are cached on one of the machines are removed from planning consideration. (Details in Section 5.3). Planning then proceeds in two phases ­ (i) assignment of DAGs to machines, and (ii) parallelization of DAGs. In the assignment phase, each DAG in the workflow is tentatively assigned to a specific machine. The time taken to execute a DAG on a machine is the sum of the execution times of its individual jobs and the time taken to transfer the required input and executable files to that machine. (For purposes of planning, we treat executable files like input files ­ since executables are usually shared across multiple programs in a workflow,Pey usually constitute batch input.) th Specifically, dag r untime = i pi + input siz e/net B W, where pi are the execution times of the jobs comprising the DAG (in seconds), input siz e is the size (in MB) of input files that are not cached on the machine. N et B W is the network bandwidth (in MB/s). (Any required files that are already stored on the machine do not need to transferred, and are not included in dag r untime). Each DAG is assigned to the machine that will result in the earliest completion time for the DAG. Once a DAG is assigned to a machine, the cumulative runtime on that machine is updated. In In the next phase, the extra DAGs on the machines with heavier loads are traversed to see if there are any opportunities for load reduction by parallelization. Parallelization involves identifying jobs in a DAG that can be run in parallel (e.g. jobs A and B in Figure 3) and moving such jobs to different machines if feasible. In our algorithm, a cost-benefit scheme is used to determine if a job is suitable for parallelization. The benefit of parallelization is the reduction in workflow execution time by moving a job from a heavily loaded machine to a lightly loaded one. However, when a job is moved to a different machine than the one it's parent jobs executed on, the input files produced by those parents may also need to be moved. Similarly, the output files of that job may need to be moved to the machine on which its children are scheduled. 129 DAGs and the list of lightly loaded machines as arguments. As an optimization, we treat a chain of jobs J1-J2-J3 as a single job with the input of J1 and the outputs of J3. Function CompareIOruntime calculates savetime in this algorithm. 5.2 Replanning Jobs The planning scheme described in Section 5.1 is fairly static. A real planning framework in a multi-user, failure-prone environment must deal with events that require changing previously generated plans. For example, when new workflows are submitted, it may be necessary to reassign some machines to the new user in the interests of fairness. Similarly, when a workflow completes it frees up resources to be used by other workflows. Furthermore, a planning framework must deal with machine unavailability. If a machine crashes, or a user returns to his desktop machine, that machine can no longer be used to run jobs. In addition, workflow plans are produced using estimates of the execution times of jobs based on past runs. Thus, variations in workflow execution times may require a re-evaluation of plans that are produced. There are several possible approaches to replanning workflows. One is to simply produce a new, complete plan for a workflow whenever there is an unexpected condition. However, this may not be scalable in clusters with thousands of machines, dozens of users and a large number of workflows running simultaneously. As a concrete example, consider the case where a user has submitted a new workflow. In a fair system, assigning new resources to this user may involve removing resources from other users and their workflows. This will require producing a completely new plan for each workflow that is affected. Another approach is to make planning decisions as late as possible ­ this is the approach used by Condor. Jobs are scheduled one-by-one only when they are ready ­ no workflow plans are produced. This system is quite flexible in the face of contingencies, but as we show in Section 9, it can lead to inefficient overall schedules. To make DAG-Condor more adaptive, we modified the scheme presented in the previous section to produce plans incrementally. Thus, the planning algorithm is split over multiple cycles. In a single planning cycle, only a portion of each pending workflow in the system is planned. The schedule produced for each workflow in a cycle extends to a time interval called the "scheduling horizon". No planning for workflows is done beyond the scheduling horizon. This reduces wasteful planning. However, the manner in which portions of workflows are selected in each planning cycle has an impact on performance. A DAG-oriented scheme is required to deal with the inherent data-dependencies between jobs in a workflow. Thus, while choosing portions of a workflow to plan in each cycle, the granularity of selection must be a DAG. The idea behind our replanning scheme is that the eventual schedule for a workflow produced using incremental planning should be similar to the static one produced in Section 5.1, but with adjustments being made along the way for machine failures, new workflows and other events. Referring to the static planning scheme (Section 5.1), there are two phases ­ assignment of whole DAGs and parallelization. The key is to defer the DAG parallelization step ­ parallelization should only be done when there are no more whole DAGs to assign to machines. The following modifications were made to the static planning algorithm ­ i In the assignment phase (AssignDAGs), once the expected runtime on a machine exceeds the scheduling horizon, no more DAGs are assigned to that machine. ii If the expected run-time on all the machines assigned to a workflow exceeds the scheduling horizon, then planning for that workflow in the current cycle is complete. (a) Before parallelization (b) Final schedule Figure 4: Workflow schedules for a `V' DAG The cost of parallelization is the transfer of the job's input and output files between the different machine. The cache contents of the lightly loaded machines are examined to determine the best machine to run parallelizable jobs. We use the following formula to determine when a job can be moved from one machine to another: sav etime = cputime - (input siz e + output siz e)/net B W . I nput siz e and output siz e are measured in megabytes, cputime in seconds, and net B W in MB/s. I nput siz e is the size of the job's input files that are in the original machine's cache ­ files not in its cache have to be transferred anyway and are not included. Output siz e is the size of the job's output files. It is ignored if the job has no children, since in this case the files don't need to be transferred back to the original machine. Thus, (input siz e + output siz e) is the I/O cost of parallelization. If sav etime is positive, then the job is reassigned. Assuming parallelization is cost-effective, the final schedule for our workflow example is shown in Figure 4(b). The pseudo-code for this phase is shown in function Parallelize, which takes the extra 130 iii The parallelization (Parallelize) step is performed only if there are machines whose expected execution time has not exceeded the horizon after the assignment phase. iv If a workflow receives more machines, DAGs will be assigned to those machines until the horizon is reached on those machines. Parallelizaton is performed only if there are no more DAGs to assign. v If a workflow loses a machine, the DAGs assigned to that machine will simply be removed and scheduled in a later cycle. In each planning cycle, the matchmaker checks to see if the expected execution time on any machines has fallen below the horizon. If this happens, then the incremental planning scheme is invoked for those machines. The proper values for planning interval parameters such as the scheduling horizon depend on several factors such as the workload on the cluster, the number of machines and users, and frequency of job submission to the cluster. The values for these parameters can be set in the Condor configuration policies. occur. Prob use is calculated according to the LRFU [19] policy that encompasses the well known Least Recently Used and Least P Frequently Used policies. Briefly, pr ob use = i ( 1 )ti . Here, 2 the index i ranges over jobs that have used the file over time, and ti is the elapsed time in minutes since the submission of the ith job. If is 0, this reduces to LFU (prob use is then the number of jobs that have used the file) and if is 1, this results in an LRU policy (the time of the most recent access is dominant in the summation). The "correct" value for depends on the submit patterns for workflows and is user-adjustable. For instance, if a user does not change most of the files used between submissions of a workflow, a value close to 0 would keep frequently used files in the cache for use by subsequent submissions of the workflow. The formula also gives preference to batch input ­ there are more terms in the summation since they are used by a larger number of jobs. Files are chosen for deletion in increasing order of their worth. Files are deleted until the cache size falls below the specified upper limit. 6.2 Replica maintenance A long-term disk caching scheme will have to deal with failures such as disk and machine crashes which could lead to files being unavailable. In addition, a cached copy of a file may be temporarily unavailable if a machine is busy with another job or is being used by its owner. To ensure a minimum degree of availability in the cluster, a k-replication scheme has been implemented. Once a file on a machine has been chosen for deletion according to the scheme in Section 6.1, the number of replicas for that file may fall below the required threshold (k). In that case, an alternate location for that file must be found. The machine with the maximum amount of free disk space that does not already have a replica of the file is chosen as the new location. Once the file has been transferred to the new location, the local copy at the machine with the full cache is deleted. If there is no space available on any machine in the cluster, the file is deleted without being replicated. 5.3 Memoization DAG-Condor also provides memoization capabilities by maintaining a history of job information in a database. When a workflow is submitted, checksums are computed on the input and executable files used in the workflow. The "version" of a particular job is determined by the versions (i.e. checksums) of its input and executable files. When a new workflow is submitted, our system can correctly recognize a job version that has been previously executed and locate its output files. These output files are transferred directly to the user, and the job is omitted from planning consideration. If a job's output is dependent on other factors, the user can explicitly turn off memoization for a job by setting a parameter in its submit file. 6. CACHE AND REPLICA MAINTENANCE When a job has completed on a machine, its input, output and executable files are stored in the machine's disk cache. While most dedicated machines in a cluster may have sizable disks and cache a large number of files, owners of desktops may want to restrict the amount of their disk space used to store files. Thus we allow administrators to set an upper bound on the disk cache size for each machine in a cluster. 7. THE CONDOR DAEMON FRAMEWORK Machines in a cluster that are willing to execute user jobs must run a startd daemon. Machines in the cluster from which jobs can be submitted must run a schedd daemon. Every cluster must have one negotiator daemon that assigns jobs to machines. (The machine that runs this is called the matchmaker). Individual jobs are submitted using condor submit and workflows are submitted using condor submit dag. Once a job has been assigned to a machine, the startd on that machine (the execute machine) spawns a starter process to handle the job. The schedd on the submit machine spawns a shadow process. The starter and shadow are responsible for file transfers between the submit and execute machines. The starter obtains the input files from the shadow before the job starts and transfers the output files back to the submit machine via the shadow after the job completes. During job execution, periodic status updates may be sent from starter to shadow. Figure 5 shows the interaction (in dotted lines) between the principal daemons in Condor. In the figure, we have omitted the collector daemon, which sits on the matchmaker and acts as an in-memory database for the negotiator. 6.1 Cache Maintenance In DAG-Condor, the size of the disk cache on a machine is checked when it has finished executing a job. If the cache size exceeds the specified upper bound, files are deleted from the cache in accordance with the following scheme. A good disk cache maintenance policy must keep files that are likely to be used in the future in the cache. Since our planning algorithm produces job schedules for each machine, the jobs that will be run on a machine in the immediate future are known in advance. Any files used by these jobs are exempt from deletion. The worth of a file stored in the machine's cache will depend on how recently it has been accessed, the number of cached copies, whether it is a batch input (which is used by many jobs) or an intermediate file, and on workflow submission patterns. The worth of input, intermediate and executable files is calculated as follows ­ wor th = (pr ob use × f ilesiz e)/num copies. N um copies is the number of copies of the file in the cluster, and pr ob use is the (relative) probability the file will be used by a workflow in the future. Larger files are favored since the network I/O required to transfer them to a machine is more. This formula also ensures that excessive replication of a file does not 8. IMPLEMENTATION OF CACHING AND WORKFLOW PLANNING At the core of the DAG-Condor architecture is a database that is used to store information on files such as versions, sizes and cache locations, as well as job and workflow information and job schedules. The database is used to store both current and historical information. Our current implementation uses a PostgreSQL 131 Figure 5: Modified architecture. Rectangles are machines and ellipses are daemons. Interactions between daemons shown in dotted lines. database. The database schema used is shown in Table 1. As shown in Figure 5, the job submission tools ­ condor submit and condor submit dag ­ and the Condor daemons were modified to connect directly to the database and perform insert and update operations in the course of workflow submission and execution. In the Condor architecture, when a workflow is submitted a DAGMan daemon is spawned on the submit machine. DAGMan first parses the DAG-description file and then uses condor submit to submit all jobs that have no parents (these jobs are ready to run). Next, it compiles a list of parent-child job dependencies in memory along with the status of each job. Periodically, it sniffs the user log files to check for job completion events. When a job has completed, DAGMan updates its in-memory list and submits any jobs that are now ready to run. The following changes were made to the condor submit and the condor submit dag tools. In the new DAG-Condor architecture, when a workflow is submitted using condor submit dag, the DAG-description file is parsed and all jobs (not just the ready jobs) are passed to condor submit, together with dependency information. Condor submit and condor submit dag record each individual job in the table job list along with a submission timestamp. A workflow name must be specified in the job's submit file and it is shared by all the jobs in the workflow. Job numbers are generated automatically by condor submit and are unique for every job ever submitted with a particular workflow name. While parsing a job's submit file, condor submit computes a checksum on the input and executable files that are specified. Any new files or new versions of files that are encountered are recorded in file version. Condor submit also determines the version of the job itself using the versions of its executable and input files and records any new job versions in program version. Essentially, if it is found that a job version already exists in the database, it means that the same executable has been run with the same versions of the input files in a previous run of the workflow. Having determined the version of a job, condor submit records this in the job contents table. Finally, the job is enqueued in job queue with a list of its parents and a status of `ready' or `waiting'. The principal difference in workflow submission in DAG-Condor is that there is no role for a long-running DAGMan daemon. The matchmaker gets job information directly from the database. In 8.1 The schema A workflow in DAG-Condor is identified by a workflow name. Different instances (submissions) of a workflow that share the same name can also share files, which is useful if files are frequently reused. Table workflow status holds the status (`submitting', `submitted', `executing', etc.) of workflows in the system. Each individual job in a workflow is assigned a job number that is unique across all jobs in all instances of that workflow. Files can be of type `input', `intermediate', `output' and `executable'. In DAG-Condor, a file is identified by its name and version within a workflow. The versions of input and executable files are determined by a checksum. Versioning of input and executable files leads naturally to versioning of jobs. The version number of a job and its output files is defined by the versions of its executable and input files. The relation between a job and the versions of the files it uses is stored in table program version. File information is stored in table file version, along with the file types and sizes. Thus, the database maintains the lineage of all files and jobs that are encountered. File location is used to store the locations of different files on machines in the cluster. The role of the other tables will be explained along with the modifications to the various tools and daemons in Condor. 8.2 Job submission A user must create submit files for each job in a workflow. In addition, a DAG-description file that lists all the jobs in the workflow and the parent-child dependencies between these jobs must be created. This file is passed as an argument to condor submit dag. 132 Table 1: Database schema Job list-(workflowname, jobnum,submittime) Job contents-(workflowname, jobnum,executablename,version) File version-(workflowname, filename,version,filetype,checksum,filesize) File location-(workflowname, filename,version,machine,path) Program version-(workflowname, executablename,version,inputlist) Job queue-(workflowname, jobnum,parentlist,status) Job schedule-(workflowname, jobnum,machine,afterworkflow,afterjob,status) Program output-(workflowname, executablename,version,filename) Program stats-(workflowname, executablename,version,cputime) Workflow status-(workflowname, workflownum,status) fact, having DAGMan submit jobs as they become ready runs counter to the DAG-Condor philosophy of a DAG-oriented rather than a job-oriented planning algorithm. In Condor, the matchmaker would only be aware of jobs that are ready to run after their parents have completed. In the DAG-Condor, the matchmaker requires complete information about all the jobs in the workflow and their files. ified in Section 6. Possible locations for replicas are obtained by consulting the file location table. The cache on each machine is organized by workflow name. Different versions of the same file in a workflow are distinguished by a ` v' string appended to the file name. 8.4 Planning In this section, we describe the interaction between the DAGCondor negotiator, which implements the planning algorithm described in Section 5 and the database. In each planning cycle, the matchmaker checks the job queue table to see if there are any newly submitted workflows. At this point, the versions of the input and executable files for each job have been recorded in program version by condor submit. The matchmaker uses this information to identify any memoizable jobs in the workflow and informs the schedd of any output files that are available. (Recall that a memoizable job is one that has been run before with the same versions of input and executable files.) The schedd is responsible for retrieving output files for a memoized job using the single-file request mechanism. Memoized jobs are marked `done' in job queue and removed from further consideration. While producing a schedule for the remaining jobs in a workflow, the planner uses the file location table to determine the locations of cached files. The execute machine assigned to each job in the final schedule is stored in job schedule. After planning pending workflows, the matchmaker examines job schedule to see if any jobs are ready for execution. If so, it needs to inform the relevant schedd and startd. (In Condor, the schedd presents jobs to the matchmaker in a priority order. However in the new DAG-Condor scheme, the matchmaker decides which of the schedd's ready jobs is to be run.) To activate job execution on a machine, the traditional Condor claiming protocol between startd and schedd is used. 8.3 Storage and file transfer Before a job runs on an execute machine, the startd on that machine spawns a starter daemon. The starter is responsible for setting up the job environment and workspace. First, it checks the job version and file location tables to see if any files required by the job are already in the long-term cache of the execute machine. If so, these files can be used directly. If the files are not available locally, they have to be obtained from other machines. In DAG-Condor any machine can serve as a source of files, not just the submit machine. This is a major difference in the file transfer mechanism between Condor and DAG-Condor. File retrieval from multiple sources in DAG-Condor prompted an augmentation to Condor's native file transfer mechanism. In Condor, all files required by a job are transferred together to the execute machine before it starts, and all output files are transferred back to the submit machine after it completes. We added the capability for a single file "request". This allows a client (such as a starter) to specify the full pathname of the file that it needs on another machine, and only that file is transferred upon request. As we shall see, the single file transfer capability is also used by the schedd to request the output files of memoized jobs from different machines. In our implementation, the startd is responsible for handling file requests from clients, since every execute machine must run a startd. The number of concurrent requests the startd can handle can be specified as a configuration parameter. Thus, when the starter needs a file, it first compiles a list of machines that store that file using table file location. The starter sends a file request to the first machine on the list, and if no response is received (that startd may be busy with other file transfers), it moves on to the next machine. In this way, it retrieves all the required input files before a job starts. Once a job has completed, the starter saves its input, output and executable files in the machine's long-term disk cache and updates the file location table with this information. The output files are recorded in the table program outputs. For new files, entries are created in the file version table with checksum and size information. The starter also records the program execution time in program stats and updates the status of the completed job to `done' in the job queue and job schedule tables. After job execution the startd examines the disk cache size to check if it is within the size specified by the owner of the machine. If necessary, files are replicated and deleted according to the cache maintenance policies spec- 9. RESULTS In this section we compare the performance of the following three systems: I DAG-C: This is the DAG-Condor file caching and DAG-oriented workflow planning framework that has been described. For our experiments, we used the static version of the planning algorithm. II ORIG: This is the Condor system ­ there is no distributed disk storage (and no data-driven scheduling). III Job-C: Job-C is an alternate framework with distributed file caching, but no DAG-oriented planning. Like ORIG, a job is considered for scheduling only when all its parent jobs have completed. However, each job is matched to the machine available at the time that stores the largest volume of relevant input and executable data in bytes. The goal of this comparison is to separately test the effi- 133 Figure 6: BLAST DAG and file sizes Table 2. BLAST runtime in minutes Num DAGs ORIG Job-C DAG-C 25 50 75 194 414 549 107 147 190 106 145 185 100 670 210 202 Figure 7: Sensitivity to pipeline I/O Table 3. Varying Pipeline I/O Response times in minutes File size 100MB 1GB 1.5GB ORIG 130 186 224 Job-C 85 104 114 DAG-C 84 84 85 2GB 263 127 85 cacy of distributed file caching and DAG-oriented planning on performance. To implement Job-C, we took the Condor system and made the same changes to the starter, startd and condor submit tools as described in Section 8. Condor submit dag was left untouched. The Condor matchmaker was changed to match each job to the machine caching most of its required data. Thus, like ORIG, Job-C is a job-oriented submission and scheduling system. We set up a cluster of 25 execute machines and two submit machines connected by a 100Mbps network with the matchmaker and database on a separate machine. The disks on the machines were large enough to store the batch and intermediate data used in the experiments. All the workflow files were on the same network, and no shared file system was used. The DAG-C and Job-C experiments were run with initially clean disk caches. In ORIG and Job-C, DAGMan was set to sniff the logs every five seconds. This was done to minimize the lag between job completion and submission of dependent jobs. First, we ran the BLAST [5] workflow. BLAST is a sequence alignment program that lets users search a database of well known genes or proteins for similarities with a new sequence of acids ­ the idea is that genes with similar sequences will have similar properties. A BLAST DAG consists of two programs, shown in Figure 6. Blastall takes a sequence of acids (seq), performs the alignment operation with each protein in the database, and stores the results in a `.blast' file. This is then processed by javawrap which produces CSV and binary files for later use. Typically, several BLAST DAGs with different sequences are run within a single workflow. The average volume of the pipeline I/O (the `.blast' files) is about 2MB per DAG. The average running times for blastall and javawrap are 22 minutes and 2 seconds respectively. Table 2 shows the workflow response times for DAG-C, Job-C and ORIG as we varied the number of DAGs in the BLAST workflow. In ORIG, every time the blastall program is run on a machine, the batch files must be transferred to that location ­ as the size of the workflow increases, so does the amount of network I/O. In DAGC and Job-C, the files are transferred to each execute machine just once. Thus, caching avoids the transfer of about 3.9 GB of data for every DAG scheduled on a machine after the first one. This results in better response times for Job-C and DAG-C as the number of DAGs is increased. Furthermore, in ORIG, files can only be retrieved from the job's submit machine, which can be a bottleneck when a workflow begins execution. In Job-C and DAG-C, once a batch file has been transferred to an execute machine, other jobs can retrieve the file from there. We evaluated the performance of the three systems on workflows consisting of different kinds of DAGs. The experiments involved varying the amount of I/O, the branching factor, pipeline depth and job running time. We constructed workflows of 50 DAGs of the type shown in Figure 7 with varying amounts of pipeline I/O. The execution time of each job was set to 10 minutes. The response times obtained are shown in Table 3. In ORIG, the input and executable files need to be transferred to the execute machine from the submit machine for each job ­ hence the longer response times. In Job-C and DAGC, the batch input is transferred only once to each of the machines. However, in Job-C, jobs B1 and B2 in the DAG are often scheduled on different machines. Thus, either F1 or F2 is transferred to a different machine. Similarly G1 or G2 is transferred to the machine where C1 is executed. This results in longer execution times as the sizes of these files increases. In DAG-C , the execution times are virtually constant since there is no transfer of pipeline I/O across the network at all. In the next set of experiments, we compared the performance of DAG-C, Job-C and ORIG on DAGs with different branching factors. The structure of the DAGs used is shown in Figure 8. In each test, a workflow of 50 DAGs of 10-minute jobs was run on the cluster. The results are shown in Table 4. For large files, DAGC performs significantly better than Job-C as the branching factor increases. This is because in Job-C the various sibling jobs (B1 to Bn) are often scheduled on different machines, so their output files need to be transferred to the machine where job C1 is executed. We also compared the performance of the three algorithms on workflows with varying pipeline depths and no branching. (See Figure 9 for the exact DAG structure used). The response times for workflows of 50 DAGs composed of 10-minute jobs is shown in Table 5. As pipeline depth increases, the performance of DAGC improves compared to the other two algorithms. Though Job-C attempts to place a job on the same machine as its predecessor, this may not be possible if that machine is already running a job. In this case, intermediate files will be transferred between machines. This happens more frequently as pipeline depth increases, resulting in longer workflow response times. Finally, we tested the sensitivity of workflow performance times to individual job running times. We used the DAG structure shown 134 Figure 8: Sensitivity to DAG breadth Table 4. Varying branching factor Pipeline volume: 100MB per file Branching factor(n) 3 4 5 ORIG 128 163 205 Job-C 121 147 170 DAG-C 117 140 162 Figure 10: Sensitivity to job running time Table 6. Workflow response time in minutes. Job run-times in minutes. Pipeline Volume: 1GB per file Job run-time 10 15 20 25 30 ORIG 539 601 678 745 817 Job-C 265 337 421 494 568 DAG-C 186 256 335 418 501 6 250 197 186 Pipeline volume: 1GB per file Branching factor(n) 3 4 5 6 ORIG 281 355 440 539 Job-C 159 190 225 265 DAG-C 117 139 163 186 10. RELATED WORK One of the earliest scientific workflow management systems to propose a data-centric view of workflows was Zoo [16]. Jobs and data in a workflow were modeled as entities and dependencies were modeled as relationships. Its focus was on data querying and visualization in scientific workflows. However, Zoo was designed for experiments running on a desktop and was not a distributed cluster computing system. GridDB[20] is a WFMS that uses a functional data language to model workflows and relational tables to model the inputs and outputs of programs. Insertion of tuples in input tables triggers the execution of jobs using Condor. The collaborative GriPhyN[3] project is aimed at making Grid computing simple and transparent while providing important functionality such as workflow modeling and data replication. The components of GriPhyN include Chimera[14] (a modeling tool), a Replica Location Service[11] and Pegasus[12], which produces DAGMan files that are submitted to Condor. Since Condor serves as an execute substrate for GridDB and GriPhyN, there is no mechanism to plan workflows within a cluster. Scheduling task graphs has been studied in the field of optimization (see [18] for surveys). Some task-graph scheduling algorithms that handle communication costs between tasks in a DAG include Quarfoth[23] and Hwang[15]. It has also been considered in Grid workflow management ( Wieczorek [27], Blythe [9] are examples). However, the issue of caching data on execute machines has not been considered in these projects. Furthermore, these algorithms typically consider the single DAG scheduling problem. Scheduling scientific workflows consisting of multiple DAGs with sharing of batch input has not been addressed. Romosan et al. [25] outline an architecture in which each compute node in a Condor cluster has a disk cache and a Storage Resource Manager. The matchmaker informs compute nodes of the files required by ready jobs, and the compute nodes inform the matchmaker of the relevant files in their caches. However, the work does not address the issue of planning workflows. Similarly, Bright et al. [10] propose a caching mechanism and scheduling based on the location of input files. Their algorithms do not deal with job dependencies, and ignore pipeline I/O. The importance of data placement and replica management to improve data availability and minimize access latency over the widearea has been recognized. The Condor team has developed tools Table 5. Response times in minutes Pipeline Volume: 100 MB per file Depth 3 4 5 6 7 8 ORIG 72 103 133 160 193 218 Job-C 71 99 123 147 170 195 DAG-C 71 96 121 144 167 189 Pipeline Volume: 1GB per file Depth 3 4 5 6 7 8 ORIG 101 176 244 311 375 437 Job-C 72 103 133 165 192 227 DAG-C 71 96 120 146 165 192 Figure 9: Sensitivity to DAG depth in Figure 10 with a branching factor of 6, a pipeline volume of 1 GB per file. Each workflow we ran contained 50 DAGs. The results are shown in Table 6. Not surprisingly, DAG-C loses its edge relative to Job-C as the workflow becomes more dominated by computation. To summarize, storing files on the disks of execute machines significantly improves the performance of data-intensive workflows. A planning algorithm that explicitly recognizes job dependencies can more effectively leverage distributed disk space to reduce workflow response times. 135 and applications such as Stork [17] and DiskRouter for efficient data transfer over wide-area networks. Ranganathan et al. [24] propose a peer-to-peer replication model to improve data availability over the WAN, which is also the aim of GriPhyN's RLS. Bent et al. [8] propose caching batch input on remote compute clusters to eliminate the need for repeated remote I/O requests over the WAN, thus increasing data availability. Our focus is on improving I/O performance within a cluster, which is increasingly important as the volume of data processed by scientific programs grows. Local data caching has long been a feature of distributed file systems (such as Farsite [7]) . Distributed data storage provides better fault tolerance and scalability. The principal difference between data access in file systems and in a cluster computing system is that file systems have to move data to the location of the processes. Issues of job planning and scheduling do not arise. In cluster computing systems, we try to move processes to the site of the data. 11. FUTURE WORK While we have demonstrated the benefits of file caching and data-aware planning, there is considerable scope for more research and evaluation within the framework presented. Since the DAG-Condor scheduler frequently queries the database, the overhead of planning depends on the database size and load. It also depends on other factors such as the number of machines in the cluster and the workload. One immediate avenue for investigation is evaluating the dependence of planning overhead on these factors. This is important in testing the efficacy and efficiency of the adaptive planning scheme described in Section 5.2. Furthermore, several improvements can be made to the cache maintenance system. For example, the cache replacement parameters for a user's files can be adjusted depending on workflow submission patterns. Another possibility is to take into account the time needed to generate a file in estimates of its worth ­ files that can be regenerated with a minimum amount of computation can be preferentially discarded from a machine's cache. 12. CONCLUSION The idea of moving computation to data rather than data to computation was pioneered in classical distributed database systems such as R*[21] and parallel database systems such as Gamma [13]. As scientific applications become more data-intensive, WFMSs face many of the same issues as database systems in the areas of data placement, workflow planning and parallelization. Nieto-Santisteban et al. [22] have demonstrated the performance benefits of modern database systems ­ indexing, parallelized query execution and efficient joins in the SDSS project. The primary use of the database in our work is to store workflow information, schedules and statistics. A comprehensive use of data management technology that extends to workflow modeling and cluster management could provide both performance and administrative benefits. 13. REFERENCES [1] Biomedical informatics research network. http://www.nbirn.net. [2] Condor fair share scheduling. http://www.cs.wisc.edu/ condor/manual/v6.7/ 3 5User Priorities.html. [3] Grid physics network. http://www.griphyn.org. [4] Grid physics network in atlas. http://www.usatlas.bnl.gov/computing/grid/griphyn/. [5] Ncbi blast. http://www.ncbi.nlm.nih.gov/BLAST/. [6] Sloan Digital Sky Survey. http://www.sdss.org. [7] A. Adya et al. Farsite: Federated, Available, and Reliable Storage for an Incompletely Trusted Environment. SIGOPS Oper. Syst. Rev., 36(SI):1­14, 2002. [8] J. Bent, D. Thain, A. Arpaci-Dusseau, and R. Arpaci-Dusseau. Explicit Control in the Batch-Aware Distributed File System. In NSDI, pages 365­378, 2004. [9] J. Blythe et al. Task Scheduling Strategies for Workflow-based Applications in Grids. CCGrid 2005, 2005. [10] L. Bright and D. Maier. Efficient Scheduling and Execution of Scientific Workflow Tasks. In SSDBM, pages 65­78, 2005. [11] A. L. Chervenak et al. Giggle: A Framework for Constructing Scalable Replica Location Services. In SC, pages 1­17, 2002. [12] E. Deelman, J. Blythe, et al. Pegasus: Mapping scientific workflows onto the grid. In European Across Grids Conference, pages 11­20, 2004. [13] D. DeWitt et al. The Gamma Database Machine Project. IEEE Trans. Knowl. Data Eng., 2(1):44­62, 1990. [14] I. T. Foster, J.-S. Vockler, M. Wilde, and Y. Zhao. Chimera: ¨ A Virtual Data System for Representing, Querying, and Automating Data Derivation. In SSDBM, pages 37­46, 2002. [15] J.-J. Hwang et al. Scheduling precedence graphs in systems with interprocessor communication times. SIAM J. Comput., 18(2):244­257, 1989. [16] Y. E. Ioannidis et al. ZOO: A Desktop Experiment Management Environment. In SIGMOD Conference, pages 580­583, 1997. [17] T. Kosar and M. Livny. Stork: Making Data Placement a First Class Citizen in the Grid. In ICDCS, pages 342­349, 2004. [18] Y.-K. Kwok and I. Ahmad. Benchmarking and Comparison of the Task Graph Scheduling Algorithms. Journal of Parallel and Distributed Computing, 59(3):381­422, 1999. [19] D. Lee et al. LRFU: A Spectrum of Policies that Subsumes the Least Recently Used and Least Frequently Used Policies. IEEE Trans. Computers, 50(12):1352­1361, 2001. [20] D. T. Liu and M. J. Franklin. The Design of GridDB: A Data-Centric Overlay for the Scientific Grid. In VLDB, pages 600­611, 2004. [21] G. M. Lohman et al. Query processing in R*. In Query Processing in Database Systems, pages 31­47. Springer, 1985. [22] M. A. Nieto-Santisteban et al. When Database Systems Meet the Grid. In CIDR, pages 154­161, 2005. [23] J. Quarfoth, A. Korth, and D. Lopez. Task Allocation Algorithms with Communication costs considered. Midwest Instruction and Computing Symposium, 2005. [24] K. Ranganathan et al. Improving Data Availability through Dynamic Model-Driven Replication in Large Peer-to-Peer Communities. In CCGRID, pages 376­381, 2002. [25] A. Romosan, D. Rotem, A. Shoshani, and D. Wright. Co-Scheduling of Computation and Data on Computer Clusters. In SSDBM, pages 103­112, 2005. [26] D. Thain, J. Bent, A. C. Arpaci-Dusseau, R. H. Arpaci-Dusseau, and M. Livny. Pipeline and Batch Sharing in Grid Workloads. In Proceedings of High-Performance Distributed Computing (HPDC-12), pages 152­161, Seattle, Washington, June 2003. [27] M. Wieczorek, R. Prodan, and T. Fahringer. Scheduling of scientific workflows in the ASKALON grid environment. SIGMOD Record, 34(3):56­62, 2005. 136