535921c758b70c6bd09707e05a50ff31cdd9281d
[paralleljobs.git] / paralleljobs.py
1 #!/usr/bin/env python
2 """ A simple tool to run jobs from a database in parallel."""
3
4 __author__ = "Stefan Huber"
5 __copyright__ = "Copyright 2013"
6
7 __version__ = "1.0"
8 __license__ = "LGPL3"
9
10
11 import sys, getopt, os
12 import sqlite3
13 import subprocess
14
15 verbose = False
16
17 def printStatusInfo(conn):
18 c = conn.cursor()
19
20 c.execute("SELECT count(id) FROM jobs;")
21 nototal, = c.fetchone()
22
23 c.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone, = c.fetchone()
25
26 c.execute("SELECT count(id) FROM jobs WHERE started=1;")
27 nostarted, = c.fetchone()
28
29 c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
30 wldone, = c.fetchone()
31 if wldone == None:
32 wldone = 0.0
33
34 c.execute("SELECT sum(workloadestm) FROM jobs;")
35 wltotal, = c.fetchone()
36
37 c.close()
38
39 perdone = 100.0*float(nodone)/float(nototal)
40 perwl = 100.0*float(wldone)/float(wltotal)
41
42 print("%d (%.1f%%) of %d jobs and %.1f%% of the workload done. %d jobs are running." % \
43 (nodone, perdone, nototal, perwl, nostarted-nodone))
44
45 def createPropertiesTable(conn, propdef):
46 conn.execute("BEGIN EXCLUSIVE")
47
48 c = conn.cursor()
49 c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
50 if c.fetchone() == (0,):
51 print("Creating properties table.")
52 sqlstmt = "CREATE TABLE properties (\
53 jobid INTEGER PRIMARY KEY,\
54 %s, \
55 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
56 c.execute(sqlstmt)
57 c.close()
58 conn.commit()
59
60 def runCmd(cmd):
61 proc = subprocess.Popen(cmd, \
62 stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
63 out, err = proc.communicate()
64 exitcode = proc.wait()
65
66 if verbose:
67 print(out, err)
68
69 return exitcode, out, err
70
71 def processJob(conn, jobid):
72
73 c = conn.cursor()
74 c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
75 cmd, = c.fetchone()
76
77 print("Process job %d: %s" % (jobid, cmd))
78
79 ec, out, err = runCmd(cmd)
80 c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
81
82 propstr = []
83 for l in out.splitlines():
84 if l.startswith("DB-PROPERTIES:"):
85 propstr += [l[14:]]
86 for l in err.splitlines():
87 if l.startswith("DB-PROPERTIES:"):
88 propstr += [l[14:]]
89
90 prop = {}
91 for ps in propstr:
92 p = eval(ps)
93 for k, v in p.iteritems():
94 prop[k] = v
95
96 if len(prop) > 0:
97 collist = ", ".join([str(k) for k in prop.keys()])
98 collist = "jobid, " + collist
99
100 vallist = ", ".join(["?" for k in prop.keys()])
101 vallist = "?, " + vallist
102
103 c = conn.cursor()
104 sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
105 c.execute(sqlstmt, [jobid] + list(prop.values()))
106
107 c.close()
108 conn.commit()
109
110 def insertJobs(conn, cmds):
111 conn.execute("BEGIN EXCLUSIVE")
112 conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
113 conn.commit()
114
115 def createSchema(conn):
116
117 c = conn.cursor()
118 c.execute("BEGIN EXCLUSIVE")
119
120 # Create table, if necessary
121 c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
122 if c.fetchone() == (0,):
123 print("Creating jobs table.")
124 conn.execute("CREATE TABLE jobs ( \
125 id INTEGER PRIMARY KEY AUTOINCREMENT, \
126 cmd STRING NOT NULL, \
127 started BOOL DEFAULT (0) NOT NULL, \
128 done BOOL DEFAULT (0) NOT NULL, \
129 exitcode INTEGER, \
130 workloadestm REAL DEFAULT (1) NOT NULL)")
131 c.close()
132 conn.commit()
133
134 def getNextJobId(conn):
135
136 c = conn.cursor()
137 c.execute("BEGIN EXCLUSIVE")
138 c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
139
140 r = c.fetchone()
141 if r == None:
142 return None
143
144 jobid, = r
145 conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
146
147 c.close()
148 conn.commit()
149
150 return jobid
151
152
153
154 def usage():
155 """Print usage text of this program"""
156
157 print("""
158 Take the jobs defined in the jobs table of the given database and process one
159 after the other. Multiple instances may be launched against the same database.
160
161 Usage:
162 {0} [OPTIONS] [COMMANDS] -d DB
163 {0} -h
164
165 COMMANDS:
166 -c FILE add each line as a job resp. job's command to DB
167 -h print this text
168 -s print status information
169 -w do work and process jobs
170
171 OPTIONS:
172 -d DB the database to process
173 -n NUM in -w mode, only process num-many jobs
174 -p COL-DEF create properties table with SQL column spec
175 -v verbose output
176
177 Commands may be combined in one call of {0}.
178
179 A list of jobs may be importet line-by-line from a file using the -c option.
180 Every job may output to stdout or stderr one or more strings of the form
181 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
182 It is assumed that a table 'properties' exists with the columns jobid, key,
183 key2, and key3. The corresponding values are inserted into this table. Using
184 the option -p such a properties table can be created by giving a list of
185 column definitions in SQL style.
186
187 The jobs table also contains a 'workloadestm' column that is used when
188 estimating the finished workload so far. The entries default to 1 and may be
189 adjusted.
190
191 Examples:
192 # create cmds.sh with jobs
193 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
194 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
195 # create an initial database, but do not work
196 {0} -d jobs.db -c cmds.sh \\
197 -p 'number INTEGER, time REAL, mem INTEGER'
198 # launch two workers
199 {0} -d jobs.db -w &
200 {0} -d jobs.db -w &
201 # print status info
202 {0} -d jobs.db -s
203 """.format(sys.argv[0]))
204
205
206 if __name__ == "__main__":
207
208 nojobs = 1
209 dbfn = None
210 cmdfn = None
211 propdef = None
212 work = False
213 status = False
214 numjobs = None
215
216 try:
217 opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsvn:")
218
219 for opt, arg in opts:
220 if opt == "-h":
221 usage()
222 sys.exit(os.EX_OK)
223 elif opt == "-d":
224 dbfn = arg
225 elif opt == "-c":
226 cmdfn = arg
227 elif opt == "-p":
228 propdef = arg
229 elif opt == "-w":
230 work = True
231 elif opt == "-s":
232 status = True
233 elif opt == "-v":
234 verbose = True
235 elif opt == "-n":
236 numjobs = int(arg)
237 else:
238 print("Unknown option '", opt, "'.")
239
240 except getopt.GetoptError as e:
241 print("Error parsing arguments:", e)
242 usage()
243 sys.exit(os.EX_USAGE)
244
245 if dbfn == None:
246 print("No database given.")
247 sys.exit(os.EX_USAGE)
248
249 conn = sqlite3.connect(dbfn)
250 createSchema(conn)
251
252 if status:
253 printStatusInfo(conn)
254
255 if propdef != None:
256 createPropertiesTable(conn, propdef)
257
258 if cmdfn != None:
259 print("Adding jobs...")
260 cmds = open(cmdfn).readlines()
261 cmds = [(c.strip(),) for c in cmds]
262 insertJobs(conn, cmds)
263
264 if work:
265 n = 0
266 while not numjobs or n < numjobs:
267
268 jobid = getNextJobId(conn)
269 if jobid == None:
270 print("All jobs have been started.")
271 break
272 processJob(conn, jobid)
273 n += 1
274
275
276 conn.close()
277