2 """ A simple tool to run jobs from a database in parallel."""
4 __author__
= "Stefan Huber"
5 __copyright__
= "Copyright 2013"
11 import sys
, getopt
, os
17 def printStatusInfo(conn
):
20 c
.execute("SELECT count(id) FROM jobs;")
21 nototal
, = c
.fetchone()
23 c
.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone
, = c
.fetchone()
26 c
.execute("SELECT count(id) FROM jobs WHERE started=1;")
27 nostarted
, = c
.fetchone()
29 c
.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
30 wldone
, = c
.fetchone()
34 c
.execute("SELECT sum(workloadestm) FROM jobs;")
35 wltotal
, = c
.fetchone()
39 perdone
= 100.0*float(nodone
)/float(nototal
)
40 perwl
= 100.0*float(wldone
)/float(wltotal
)
42 print("%d (%.1f%%) of %d jobs and %.1f%% of the workload done. %d jobs are running." % \
43 (nodone
, perdone
, nototal
, perwl
, nostarted
-nodone
))
45 def createPropertiesTable(conn
, propdef
):
46 conn
.execute("BEGIN EXCLUSIVE")
49 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
50 if c
.fetchone() == (0,):
51 print("Creating properties table.")
52 sqlstmt
= "CREATE TABLE properties (\
53 jobid INTEGER PRIMARY KEY,\
55 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef
,)
61 proc
= subprocess
.Popen(cmd
, \
62 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True)
63 out
, err
= proc
.communicate()
64 exitcode
= proc
.wait()
69 return exitcode
, out
, err
71 def processJob(conn
, jobid
):
74 c
.execute("SELECT cmd FROM jobs WHERE id=?", (jobid
,))
77 print("Process job %d: %s" % (jobid
, cmd
))
79 ec
, out
, err
= runCmd(cmd
)
80 c
.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec
, jobid
))
83 for l
in out
.splitlines():
84 if l
.startswith("DB-PROPERTIES:"):
86 for l
in err
.splitlines():
87 if l
.startswith("DB-PROPERTIES:"):
93 for k
, v
in p
.iteritems():
97 collist
= ", ".join([str(k
) for k
in prop
.keys()])
98 collist
= "jobid, " + collist
100 vallist
= ", ".join(["?" for k
in prop
.keys()])
101 vallist
= "?, " + vallist
104 sqlstmt
= "INSERT INTO properties (%s) VALUES (%s);" % (collist
,vallist
)
105 c
.execute(sqlstmt
, [jobid
] + list(prop
.values()))
110 def insertJobs(conn
, cmds
):
111 conn
.execute("BEGIN EXCLUSIVE")
112 conn
.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds
)
115 def createSchema(conn
):
118 c
.execute("BEGIN EXCLUSIVE")
120 # Create table, if necessary
121 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
122 if c
.fetchone() == (0,):
123 print("Creating jobs table.")
124 conn
.execute("CREATE TABLE jobs ( \
125 id INTEGER PRIMARY KEY AUTOINCREMENT, \
126 cmd STRING NOT NULL, \
127 started BOOL DEFAULT (0) NOT NULL, \
128 done BOOL DEFAULT (0) NOT NULL, \
130 workloadestm REAL DEFAULT (1) NOT NULL)")
134 def getNextJobId(conn
):
137 c
.execute("BEGIN EXCLUSIVE")
138 c
.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
145 conn
.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid
,))
155 """Print usage text of this program"""
158 Take the jobs defined in the jobs table of the given database and process one
159 after the other. Multiple instances may be launched against the same database.
162 {0} [OPTIONS] [COMMANDS] -d DB
166 -c FILE add each line as a job resp. job's command to DB
168 -s print status information
169 -w do work and process jobs
172 -d DB the database to process
173 -n NUM in -w mode, only process num-many jobs
174 -p COL-DEF create properties table with SQL column spec
177 Commands may be combined in one call of {0}.
179 A list of jobs may be importet line-by-line from a file using the -c option.
180 Every job may output to stdout or stderr one or more strings of the form
181 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
182 It is assumed that a table 'properties' exists with the columns jobid, key,
183 key2, and key3. The corresponding values are inserted into this table. Using
184 the option -p such a properties table can be created by giving a list of
185 column definitions in SQL style.
187 The jobs table also contains a 'workloadestm' column that is used when
188 estimating the finished workload so far. The entries default to 1 and may be
192 # create cmds.sh with jobs
193 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
194 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
195 # create an initial database, but do not work
196 {0} -d jobs.db -c cmds.sh \\
197 -p 'number INTEGER, time REAL, mem INTEGER'
203 """.format(sys
.argv
[0]))
206 if __name__
== "__main__":
217 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hd:c:p:wsvn:")
219 for opt
, arg
in opts
:
238 print("Unknown option '", opt
, "'.")
240 except getopt
.GetoptError
as e
:
241 print("Error parsing arguments:", e
)
243 sys
.exit(os
.EX_USAGE
)
246 print("No database given.")
247 sys
.exit(os
.EX_USAGE
)
249 conn
= sqlite3
.connect(dbfn
)
253 printStatusInfo(conn
)
256 createPropertiesTable(conn
, propdef
)
259 print("Adding jobs...")
260 cmds
= open(cmdfn
).readlines()
261 cmds
= [(c
.strip(),) for c
in cmds
]
262 insertJobs(conn
, cmds
)
266 while not numjobs
or n
< numjobs
:
268 jobid
= getNextJobId(conn
)
270 print("All jobs have been started.")
272 processJob(conn
, jobid
)