2 """ A simple tool to run jobs from a database in parallel."""
4 __author__
= "Stefan Huber"
5 __copyright__
= "Copyright 2013"
11 import sys
, getopt
, os
17 def printStatusInfo(conn
):
20 c
.execute("SELECT count(id) FROM jobs;")
21 nototal
, = c
.fetchone()
23 c
.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone
, = c
.fetchone()
26 c
.execute("SELECT count(id) FROM jobs WHERE started=1;")
27 nostarted
, = c
.fetchone()
29 c
.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
30 wldone
, = c
.fetchone()
34 c
.execute("SELECT sum(workloadestm) FROM jobs;")
35 wltotal
, = c
.fetchone()
41 perdone
= 100.0*float(nodone
)/float(nototal
)
43 perwl
= 100.0*float(wldone
)/float(wltotal
)
45 print("%d (%.1f%%) of %d jobs and %.1f%% of the workload done. %d jobs are running." % \
46 (nodone
, perdone
, nototal
, perwl
, nostarted
-nodone
))
48 print("Exit code stats:")
49 c
.execute("SELECT exitcode, count(exitcode) AS cnt FROM jobs WHERE exitcode >= 0 GROUP BY exitcode ORDER BY exitcode ASC;")
50 for code
, cnt
in c
.fetchall():
51 print(" %3s: %6s (%5.1f%%)" % (code
, cnt
, 100.0*float(cnt
)/nodone
))
55 def createPropertiesTable(conn
, propdef
):
56 conn
.execute("BEGIN EXCLUSIVE")
59 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
60 if c
.fetchone() == (0,):
61 print("Creating properties table.")
62 sqlstmt
= "CREATE TABLE properties (\
63 jobid INTEGER PRIMARY KEY,\
65 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef
,)
71 proc
= subprocess
.Popen(cmd
, \
72 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True)
73 out
, err
= proc
.communicate()
74 exitcode
= proc
.wait()
79 return exitcode
, out
, err
81 def processJob(conn
, jobid
):
84 c
.execute("SELECT cmd FROM jobs WHERE id=?", (jobid
,))
87 print "Job %d: '%s'..." % (jobid
, cmd
),
88 ec
, out
, err
= runCmd(cmd
)
92 print " [FAILED: %d]" % ec
94 c
.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec
, jobid
))
97 for l
in out
.splitlines():
98 if l
.startswith("DB-PROPERTIES:"):
100 for l
in err
.splitlines():
101 if l
.startswith("DB-PROPERTIES:"):
107 for k
, v
in p
.iteritems():
111 collist
= ", ".join([str(k
) for k
in prop
.keys()])
112 collist
= "jobid, " + collist
114 vallist
= ", ".join(["?" for k
in prop
.keys()])
115 vallist
= "?, " + vallist
118 sqlstmt
= "INSERT INTO properties (%s) VALUES (%s);" % (collist
,vallist
)
119 c
.execute(sqlstmt
, [jobid
] + list(prop
.values()))
124 def insertJobs(conn
, cmds
):
125 conn
.execute("BEGIN EXCLUSIVE")
126 conn
.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds
)
129 def createSchema(conn
):
132 c
.execute("BEGIN EXCLUSIVE")
134 # Create table, if necessary
135 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
136 if c
.fetchone() == (0,):
137 print "Creating jobs table."
138 conn
.execute("CREATE TABLE jobs ( \
139 id INTEGER PRIMARY KEY AUTOINCREMENT, \
140 cmd STRING NOT NULL, \
141 started BOOL DEFAULT (0) NOT NULL, \
142 done BOOL DEFAULT (0) NOT NULL, \
144 workloadestm REAL DEFAULT (1) NOT NULL)")
148 def getNextJobId(conn
):
151 c
.execute("BEGIN EXCLUSIVE")
152 c
.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
159 conn
.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid
,))
169 """Print usage text of this program"""
172 Take the jobs defined in the jobs table of the given database and process one
173 after the other. Multiple instances may be launched against the same database.
176 {0} [OPTIONS] [COMMANDS] -d DB
180 -c FILE add each line as a job resp. job's command to DB
182 -s print status information
183 -w do work and process jobs
186 -d DB the database to process
187 -n NUM in -w mode, only process num-many jobs
188 -p COL-DEF create properties table with SQL column spec
191 Commands may be combined in one call of {0}.
193 A list of jobs may be importet line-by-line from a file using the -c option.
194 Every job may output to stdout or stderr one or more strings of the form
195 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
196 It is assumed that a table 'properties' exists with the columns jobid, key,
197 key2, and key3. The corresponding values are inserted into this table. Using
198 the option -p such a properties table can be created by giving a list of
199 column definitions in SQL style.
201 The jobs table also contains a 'workloadestm' column that is used when
202 estimating the finished workload so far. The entries default to 1 and may be
206 # create cmds.sh with jobs
207 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
208 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
209 # create an initial database, but do not work
210 {0} -d jobs.db -c cmds.sh \\
211 -p 'number INTEGER, time REAL, mem INTEGER'
217 """.format(sys
.argv
[0]))
220 if __name__
== "__main__":
231 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hd:c:p:wsvn:")
233 for opt
, arg
in opts
:
252 print("Unknown option '", opt
, "'.")
254 except getopt
.GetoptError
as e
:
255 print("Error parsing arguments:", e
)
257 sys
.exit(os
.EX_USAGE
)
260 print("No database given.")
261 sys
.exit(os
.EX_USAGE
)
263 conn
= sqlite3
.connect(dbfn
, timeout
=60)
267 printStatusInfo(conn
)
270 createPropertiesTable(conn
, propdef
)
273 print("Adding jobs...")
274 cmds
= open(cmdfn
).readlines()
275 cmds
= [(c
.strip(),) for c
in cmds
]
276 insertJobs(conn
, cmds
)
280 while not numjobs
or n
< numjobs
:
282 jobid
= getNextJobId(conn
)
284 print("All jobs have been started.")
286 processJob(conn
, jobid
)