2 """ A simple tool to run jobs from a database in parallel."""
4 __author__
= "Stefan Huber"
5 __copyright__
= "Copyright 2013"
11 import sys
, getopt
, os
17 def printStatusInfo(conn
):
20 c
.execute("SELECT count(id) FROM jobs;")
21 nototal
, = c
.fetchone()
23 c
.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone
, = c
.fetchone()
26 c
.execute("SELECT count(id) FROM jobs WHERE started=1;")
27 nostarted
, = c
.fetchone()
29 c
.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
30 wldone
, = c
.fetchone()
34 c
.execute("SELECT sum(workloadestm) FROM jobs;")
35 wltotal
, = c
.fetchone()
41 perdone
= 100.0*float(nodone
)/float(nototal
)
43 perwl
= 100.0*float(wldone
)/float(wltotal
)
45 print("%d (%.1f%%) of %d jobs and %.1f%% of the workload done. %d jobs are running." % \
46 (nodone
, perdone
, nototal
, perwl
, nostarted
-nodone
))
48 print("Exit code stats:")
49 c
.execute("SELECT exitcode, count(exitcode) AS cnt FROM jobs GROUP BY exitcode ORDER BY exitcode ASC;")
50 for code
, cnt
in c
.fetchall():
51 print(" %3s: %6s (%5.1f%%)" % (code
, cnt
, 100.0*float(cnt
)/nodone
))
55 def createPropertiesTable(conn
, propdef
):
56 conn
.execute("BEGIN EXCLUSIVE")
59 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
60 if c
.fetchone() == (0,):
61 print("Creating properties table.")
62 sqlstmt
= "CREATE TABLE properties (\
63 jobid INTEGER PRIMARY KEY,\
65 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef
,)
71 proc
= subprocess
.Popen(cmd
, \
72 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True)
73 out
, err
= proc
.communicate()
74 exitcode
= proc
.wait()
79 return exitcode
, out
, err
81 def processJob(conn
, jobid
):
84 c
.execute("SELECT cmd FROM jobs WHERE id=?", (jobid
,))
87 print("Process job %d: %s" % (jobid
, cmd
))
89 ec
, out
, err
= runCmd(cmd
)
90 c
.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec
, jobid
))
93 for l
in out
.splitlines():
94 if l
.startswith("DB-PROPERTIES:"):
96 for l
in err
.splitlines():
97 if l
.startswith("DB-PROPERTIES:"):
103 for k
, v
in p
.iteritems():
107 collist
= ", ".join([str(k
) for k
in prop
.keys()])
108 collist
= "jobid, " + collist
110 vallist
= ", ".join(["?" for k
in prop
.keys()])
111 vallist
= "?, " + vallist
114 sqlstmt
= "INSERT INTO properties (%s) VALUES (%s);" % (collist
,vallist
)
115 c
.execute(sqlstmt
, [jobid
] + list(prop
.values()))
120 def insertJobs(conn
, cmds
):
121 conn
.execute("BEGIN EXCLUSIVE")
122 conn
.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds
)
125 def createSchema(conn
):
128 c
.execute("BEGIN EXCLUSIVE")
130 # Create table, if necessary
131 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
132 if c
.fetchone() == (0,):
133 print("Creating jobs table.")
134 conn
.execute("CREATE TABLE jobs ( \
135 id INTEGER PRIMARY KEY AUTOINCREMENT, \
136 cmd STRING NOT NULL, \
137 started BOOL DEFAULT (0) NOT NULL, \
138 done BOOL DEFAULT (0) NOT NULL, \
140 workloadestm REAL DEFAULT (1) NOT NULL)")
144 def getNextJobId(conn
):
147 c
.execute("BEGIN EXCLUSIVE")
148 c
.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
155 conn
.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid
,))
165 """Print usage text of this program"""
168 Take the jobs defined in the jobs table of the given database and process one
169 after the other. Multiple instances may be launched against the same database.
172 {0} [OPTIONS] [COMMANDS] -d DB
176 -c FILE add each line as a job resp. job's command to DB
178 -s print status information
179 -w do work and process jobs
182 -d DB the database to process
183 -n NUM in -w mode, only process num-many jobs
184 -p COL-DEF create properties table with SQL column spec
187 Commands may be combined in one call of {0}.
189 A list of jobs may be importet line-by-line from a file using the -c option.
190 Every job may output to stdout or stderr one or more strings of the form
191 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
192 It is assumed that a table 'properties' exists with the columns jobid, key,
193 key2, and key3. The corresponding values are inserted into this table. Using
194 the option -p such a properties table can be created by giving a list of
195 column definitions in SQL style.
197 The jobs table also contains a 'workloadestm' column that is used when
198 estimating the finished workload so far. The entries default to 1 and may be
202 # create cmds.sh with jobs
203 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
204 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
205 # create an initial database, but do not work
206 {0} -d jobs.db -c cmds.sh \\
207 -p 'number INTEGER, time REAL, mem INTEGER'
213 """.format(sys
.argv
[0]))
216 if __name__
== "__main__":
227 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hd:c:p:wsvn:")
229 for opt
, arg
in opts
:
248 print("Unknown option '", opt
, "'.")
250 except getopt
.GetoptError
as e
:
251 print("Error parsing arguments:", e
)
253 sys
.exit(os
.EX_USAGE
)
256 print("No database given.")
257 sys
.exit(os
.EX_USAGE
)
259 conn
= sqlite3
.connect(dbfn
, timeout
=60)
263 printStatusInfo(conn
)
266 createPropertiesTable(conn
, propdef
)
269 print("Adding jobs...")
270 cmds
= open(cmdfn
).readlines()
271 cmds
= [(c
.strip(),) for c
in cmds
]
272 insertJobs(conn
, cmds
)
276 while not numjobs
or n
< numjobs
:
278 jobid
= getNextJobId(conn
)
280 print("All jobs have been started.")
282 processJob(conn
, jobid
)