Adding exit code stats
[paralleljobs.git] / paralleljobs.py
1 #!/usr/bin/env python3
2 """ A simple tool to run jobs from a database in parallel."""
3
4 __author__ = "Stefan Huber"
5 __copyright__ = "Copyright 2013"
6
7 __version__ = "1.0"
8 __license__ = "LGPL3"
9
10
11 import sys, getopt, os
12 import sqlite3
13 import subprocess
14
15 verbose = False
16
17 def printStatusInfo(conn):
18 c = conn.cursor()
19
20 c.execute("SELECT count(id) FROM jobs;")
21 nototal, = c.fetchone()
22
23 c.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone, = c.fetchone()
25
26 c.execute("SELECT count(id) FROM jobs WHERE started=1;")
27 nostarted, = c.fetchone()
28
29 c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
30 wldone, = c.fetchone()
31 if wldone == None:
32 wldone = 0.0
33
34 c.execute("SELECT sum(workloadestm) FROM jobs;")
35 wltotal, = c.fetchone()
36
37
38 perdone = 0
39 perwl = 0
40 if nototal > 0:
41 perdone = 100.0*float(nodone)/float(nototal)
42 if wltotal > 0:
43 perwl = 100.0*float(wldone)/float(wltotal)
44
45 print("%d (%.1f%%) of %d jobs and %.1f%% of the workload done. %d jobs are running." % \
46 (nodone, perdone, nototal, perwl, nostarted-nodone))
47
48 print("Exit code stats:")
49 c.execute("SELECT exitcode, count(exitcode) AS cnt FROM jobs GROUP BY exitcode ORDER BY exitcode ASC;")
50 for code, cnt in c.fetchall():
51 print(" %3s: %6s (%5.1f%%)" % (code, cnt, 100.0*float(cnt)/nodone))
52
53 c.close()
54
55 def createPropertiesTable(conn, propdef):
56 conn.execute("BEGIN EXCLUSIVE")
57
58 c = conn.cursor()
59 c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
60 if c.fetchone() == (0,):
61 print("Creating properties table.")
62 sqlstmt = "CREATE TABLE properties (\
63 jobid INTEGER PRIMARY KEY,\
64 %s, \
65 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
66 c.execute(sqlstmt)
67 c.close()
68 conn.commit()
69
70 def runCmd(cmd):
71 proc = subprocess.Popen(cmd, \
72 stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
73 out, err = proc.communicate()
74 exitcode = proc.wait()
75
76 if verbose:
77 print(out, err)
78
79 return exitcode, out, err
80
81 def processJob(conn, jobid):
82
83 c = conn.cursor()
84 c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
85 cmd, = c.fetchone()
86
87 print("Process job %d: %s" % (jobid, cmd))
88
89 ec, out, err = runCmd(cmd)
90 c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
91
92 propstr = []
93 for l in out.splitlines():
94 if l.startswith("DB-PROPERTIES:"):
95 propstr += [l[14:]]
96 for l in err.splitlines():
97 if l.startswith("DB-PROPERTIES:"):
98 propstr += [l[14:]]
99
100 prop = {}
101 for ps in propstr:
102 p = eval(ps)
103 for k, v in p.iteritems():
104 prop[k] = v
105
106 if len(prop) > 0:
107 collist = ", ".join([str(k) for k in prop.keys()])
108 collist = "jobid, " + collist
109
110 vallist = ", ".join(["?" for k in prop.keys()])
111 vallist = "?, " + vallist
112
113 c = conn.cursor()
114 sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
115 c.execute(sqlstmt, [jobid] + list(prop.values()))
116
117 c.close()
118 conn.commit()
119
120 def insertJobs(conn, cmds):
121 conn.execute("BEGIN EXCLUSIVE")
122 conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
123 conn.commit()
124
125 def createSchema(conn):
126
127 c = conn.cursor()
128 c.execute("BEGIN EXCLUSIVE")
129
130 # Create table, if necessary
131 c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
132 if c.fetchone() == (0,):
133 print("Creating jobs table.")
134 conn.execute("CREATE TABLE jobs ( \
135 id INTEGER PRIMARY KEY AUTOINCREMENT, \
136 cmd STRING NOT NULL, \
137 started BOOL DEFAULT (0) NOT NULL, \
138 done BOOL DEFAULT (0) NOT NULL, \
139 exitcode INTEGER, \
140 workloadestm REAL DEFAULT (1) NOT NULL)")
141 c.close()
142 conn.commit()
143
144 def getNextJobId(conn):
145
146 c = conn.cursor()
147 c.execute("BEGIN EXCLUSIVE")
148 c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
149
150 r = c.fetchone()
151 if r == None:
152 return None
153
154 jobid, = r
155 conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
156
157 c.close()
158 conn.commit()
159
160 return jobid
161
162
163
164 def usage():
165 """Print usage text of this program"""
166
167 print("""
168 Take the jobs defined in the jobs table of the given database and process one
169 after the other. Multiple instances may be launched against the same database.
170
171 Usage:
172 {0} [OPTIONS] [COMMANDS] -d DB
173 {0} -h
174
175 COMMANDS:
176 -c FILE add each line as a job resp. job's command to DB
177 -h print this text
178 -s print status information
179 -w do work and process jobs
180
181 OPTIONS:
182 -d DB the database to process
183 -n NUM in -w mode, only process num-many jobs
184 -p COL-DEF create properties table with SQL column spec
185 -v verbose output
186
187 Commands may be combined in one call of {0}.
188
189 A list of jobs may be importet line-by-line from a file using the -c option.
190 Every job may output to stdout or stderr one or more strings of the form
191 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
192 It is assumed that a table 'properties' exists with the columns jobid, key,
193 key2, and key3. The corresponding values are inserted into this table. Using
194 the option -p such a properties table can be created by giving a list of
195 column definitions in SQL style.
196
197 The jobs table also contains a 'workloadestm' column that is used when
198 estimating the finished workload so far. The entries default to 1 and may be
199 adjusted.
200
201 Examples:
202 # create cmds.sh with jobs
203 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
204 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
205 # create an initial database, but do not work
206 {0} -d jobs.db -c cmds.sh \\
207 -p 'number INTEGER, time REAL, mem INTEGER'
208 # launch two workers
209 {0} -d jobs.db -w &
210 {0} -d jobs.db -w &
211 # print status info
212 {0} -d jobs.db -s
213 """.format(sys.argv[0]))
214
215
216 if __name__ == "__main__":
217
218 nojobs = 1
219 dbfn = None
220 cmdfn = None
221 propdef = None
222 work = False
223 status = False
224 numjobs = None
225
226 try:
227 opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsvn:")
228
229 for opt, arg in opts:
230 if opt == "-h":
231 usage()
232 sys.exit(os.EX_OK)
233 elif opt == "-d":
234 dbfn = arg
235 elif opt == "-c":
236 cmdfn = arg
237 elif opt == "-p":
238 propdef = arg
239 elif opt == "-w":
240 work = True
241 elif opt == "-s":
242 status = True
243 elif opt == "-v":
244 verbose = True
245 elif opt == "-n":
246 numjobs = int(arg)
247 else:
248 print("Unknown option '", opt, "'.")
249
250 except getopt.GetoptError as e:
251 print("Error parsing arguments:", e)
252 usage()
253 sys.exit(os.EX_USAGE)
254
255 if dbfn == None:
256 print("No database given.")
257 sys.exit(os.EX_USAGE)
258
259 conn = sqlite3.connect(dbfn, timeout=60)
260 createSchema(conn)
261
262 if status:
263 printStatusInfo(conn)
264
265 if propdef != None:
266 createPropertiesTable(conn, propdef)
267
268 if cmdfn != None:
269 print("Adding jobs...")
270 cmds = open(cmdfn).readlines()
271 cmds = [(c.strip(),) for c in cmds]
272 insertJobs(conn, cmds)
273
274 if work:
275 n = 0
276 while not numjobs or n < numjobs:
277
278 jobid = getNextJobId(conn)
279 if jobid == None:
280 print("All jobs have been started.")
281 break
282 processJob(conn, jobid)
283 n += 1
284
285
286 conn.close()
287