Remove None in exit code stats
[paralleljobs.git] / paralleljobs.py
1 #!/usr/bin/env python
2 """ A simple tool to run jobs from a database in parallel."""
3
4 __author__ = "Stefan Huber"
5 __copyright__ = "Copyright 2013"
6
7 __version__ = "1.0"
8 __license__ = "LGPL3"
9
10
11 import sys, getopt, os
12 import sqlite3
13 import subprocess
14
15 verbose = False
16
17 def printStatusInfo(conn):
18 c = conn.cursor()
19
20 c.execute("SELECT count(id) FROM jobs;")
21 nototal, = c.fetchone()
22
23 c.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone, = c.fetchone()
25
26 c.execute("SELECT count(id) FROM jobs WHERE started=1;")
27 nostarted, = c.fetchone()
28
29 c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
30 wldone, = c.fetchone()
31 if wldone == None:
32 wldone = 0.0
33
34 c.execute("SELECT sum(workloadestm) FROM jobs;")
35 wltotal, = c.fetchone()
36
37
38 perdone = 0
39 perwl = 0
40 if nototal > 0:
41 perdone = 100.0*float(nodone)/float(nototal)
42 if wltotal > 0:
43 perwl = 100.0*float(wldone)/float(wltotal)
44
45 print("%d (%.1f%%) of %d jobs and %.1f%% of the workload done. %d jobs are running." % \
46 (nodone, perdone, nototal, perwl, nostarted-nodone))
47
48 print("Exit code stats:")
49 c.execute("SELECT exitcode, count(exitcode) AS cnt FROM jobs WHERE exitcode >= 0 GROUP BY exitcode ORDER BY exitcode ASC;")
50 for code, cnt in c.fetchall():
51 print(" %3s: %6s (%5.1f%%)" % (code, cnt, 100.0*float(cnt)/nodone))
52
53 c.close()
54
55 def createPropertiesTable(conn, propdef):
56 conn.execute("BEGIN EXCLUSIVE")
57
58 c = conn.cursor()
59 c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
60 if c.fetchone() == (0,):
61 print("Creating properties table.")
62 sqlstmt = "CREATE TABLE properties (\
63 jobid INTEGER PRIMARY KEY,\
64 %s, \
65 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
66 c.execute(sqlstmt)
67 c.close()
68 conn.commit()
69
70 def runCmd(cmd):
71 proc = subprocess.Popen(cmd, \
72 stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
73 out, err = proc.communicate()
74 exitcode = proc.wait()
75
76 if verbose:
77 print out, err
78
79 return exitcode, out, err
80
81 def processJob(conn, jobid):
82
83 c = conn.cursor()
84 c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
85 cmd, = c.fetchone()
86
87 print "Job %d: '%s'..." % (jobid, cmd),
88 ec, out, err = runCmd(cmd)
89 if ec==0:
90 print " [OK]"
91 else:
92 print " [FAILED: %d]" % ec
93
94 c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
95
96 propstr = []
97 for l in out.splitlines():
98 if l.startswith("DB-PROPERTIES:"):
99 propstr += [l[14:]]
100 for l in err.splitlines():
101 if l.startswith("DB-PROPERTIES:"):
102 propstr += [l[14:]]
103
104 prop = {}
105 for ps in propstr:
106 p = eval(ps)
107 for k, v in p.iteritems():
108 prop[k] = v
109
110 if len(prop) > 0:
111 collist = ", ".join([str(k) for k in prop.keys()])
112 collist = "jobid, " + collist
113
114 vallist = ", ".join(["?" for k in prop.keys()])
115 vallist = "?, " + vallist
116
117 c = conn.cursor()
118 sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
119 c.execute(sqlstmt, [jobid] + list(prop.values()))
120
121 c.close()
122 conn.commit()
123
124 def insertJobs(conn, cmds):
125 conn.execute("BEGIN EXCLUSIVE")
126 conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
127 conn.commit()
128
129 def createSchema(conn):
130
131 c = conn.cursor()
132 c.execute("BEGIN EXCLUSIVE")
133
134 # Create table, if necessary
135 c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
136 if c.fetchone() == (0,):
137 print "Creating jobs table."
138 conn.execute("CREATE TABLE jobs ( \
139 id INTEGER PRIMARY KEY AUTOINCREMENT, \
140 cmd STRING NOT NULL, \
141 started BOOL DEFAULT (0) NOT NULL, \
142 done BOOL DEFAULT (0) NOT NULL, \
143 exitcode INTEGER, \
144 workloadestm REAL DEFAULT (1) NOT NULL)")
145 c.close()
146 conn.commit()
147
148 def getNextJobId(conn):
149
150 c = conn.cursor()
151 c.execute("BEGIN EXCLUSIVE")
152 c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
153
154 r = c.fetchone()
155 if r == None:
156 return None
157
158 jobid, = r
159 conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
160
161 c.close()
162 conn.commit()
163
164 return jobid
165
166
167
168 def usage():
169 """Print usage text of this program"""
170
171 print("""
172 Take the jobs defined in the jobs table of the given database and process one
173 after the other. Multiple instances may be launched against the same database.
174
175 Usage:
176 {0} [OPTIONS] [COMMANDS] -d DB
177 {0} -h
178
179 COMMANDS:
180 -c FILE add each line as a job resp. job's command to DB
181 -h print this text
182 -s print status information
183 -w do work and process jobs
184
185 OPTIONS:
186 -d DB the database to process
187 -n NUM in -w mode, only process num-many jobs
188 -p COL-DEF create properties table with SQL column spec
189 -v verbose output
190
191 Commands may be combined in one call of {0}.
192
193 A list of jobs may be importet line-by-line from a file using the -c option.
194 Every job may output to stdout or stderr one or more strings of the form
195 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
196 It is assumed that a table 'properties' exists with the columns jobid, key,
197 key2, and key3. The corresponding values are inserted into this table. Using
198 the option -p such a properties table can be created by giving a list of
199 column definitions in SQL style.
200
201 The jobs table also contains a 'workloadestm' column that is used when
202 estimating the finished workload so far. The entries default to 1 and may be
203 adjusted.
204
205 Examples:
206 # create cmds.sh with jobs
207 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
208 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
209 # create an initial database, but do not work
210 {0} -d jobs.db -c cmds.sh \\
211 -p 'number INTEGER, time REAL, mem INTEGER'
212 # launch two workers
213 {0} -d jobs.db -w &
214 {0} -d jobs.db -w &
215 # print status info
216 {0} -d jobs.db -s
217 """.format(sys.argv[0]))
218
219
220 if __name__ == "__main__":
221
222 nojobs = 1
223 dbfn = None
224 cmdfn = None
225 propdef = None
226 work = False
227 status = False
228 numjobs = None
229
230 try:
231 opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsvn:")
232
233 for opt, arg in opts:
234 if opt == "-h":
235 usage()
236 sys.exit(os.EX_OK)
237 elif opt == "-d":
238 dbfn = arg
239 elif opt == "-c":
240 cmdfn = arg
241 elif opt == "-p":
242 propdef = arg
243 elif opt == "-w":
244 work = True
245 elif opt == "-s":
246 status = True
247 elif opt == "-v":
248 verbose = True
249 elif opt == "-n":
250 numjobs = int(arg)
251 else:
252 print("Unknown option '", opt, "'.")
253
254 except getopt.GetoptError as e:
255 print("Error parsing arguments:", e)
256 usage()
257 sys.exit(os.EX_USAGE)
258
259 if dbfn == None:
260 print("No database given.")
261 sys.exit(os.EX_USAGE)
262
263 conn = sqlite3.connect(dbfn, timeout=60)
264 createSchema(conn)
265
266 if status:
267 printStatusInfo(conn)
268
269 if propdef != None:
270 createPropertiesTable(conn, propdef)
271
272 if cmdfn != None:
273 print("Adding jobs...")
274 cmds = open(cmdfn).readlines()
275 cmds = [(c.strip(),) for c in cmds]
276 insertJobs(conn, cmds)
277
278 if work:
279 n = 0
280 while not numjobs or n < numjobs:
281
282 jobid = getNextJobId(conn)
283 if jobid == None:
284 print("All jobs have been started.")
285 break
286 processJob(conn, jobid)
287 n += 1
288
289
290 conn.close()
291