cb4a0691da479eed4ef87da1255951a22e17f484
[paralleljobs.git] / paralleljobs.py
1 #!/usr/bin/env python
2 """ A simple tool to run jobs from a database in parallel."""
3
4 __author__ = "Stefan Huber"
5 __copyright__ = "Copyright 2013"
6
7 __version__ = "1.0"
8 __license__ = "LGPL3"
9
10
11 import sys, getopt, os
12 import sqlite3
13 import subprocess
14
15 verbose = False
16
17 def printStatusInfo(conn):
18 c = conn.cursor()
19
20 c.execute("SELECT count(id) FROM jobs;")
21 nototal, = c.fetchone()
22
23 c.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone, = c.fetchone()
25
26 c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
27 wldone, = c.fetchone()
28 if wldone == None:
29 wldone = 0.0
30
31 c.execute("SELECT sum(workloadestm) FROM jobs;")
32 wltotal, = c.fetchone()
33
34 c.close()
35
36 perdone = 100.0*float(nodone)/float(nototal)
37 perwl = 100.0*float(wldone)/float(wltotal)
38
39 print("%d (%.1f%%) of %d jobs done. %.1f%% of the workload finished." % \
40 (nodone, perdone, nototal, perwl))
41
42 def createPropertiesTable(conn, propdef):
43 conn.execute("BEGIN EXCLUSIVE")
44
45 c = conn.cursor()
46 c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
47 if c.fetchone() == (0,):
48 print("Creating properties table.")
49 sqlstmt = "CREATE TABLE properties (\
50 jobid INTEGER PRIMARY KEY,\
51 %s, \
52 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
53 c.execute(sqlstmt)
54 c.close()
55 conn.commit()
56
57 def runCmd(cmd):
58 proc = subprocess.Popen(cmd, \
59 stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
60 out, err = proc.communicate()
61 exitcode = proc.wait()
62
63 if verbose:
64 print(out, err)
65
66 return exitcode, out, err
67
68 def processJob(conn, jobid):
69
70 c = conn.cursor()
71 c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
72 cmd, = c.fetchone()
73
74 print("Process job %d: %s" % (jobid, cmd))
75
76 ec, out, err = runCmd(cmd)
77 c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
78
79 propstr = []
80 for l in out.splitlines():
81 if l.startswith("DB-PROPERTIES:"):
82 propstr += [l[14:]]
83 for l in err.splitlines():
84 if l.startswith("DB-PROPERTIES:"):
85 propstr += [l[14:]]
86
87 prop = {}
88 for ps in propstr:
89 p = eval(ps)
90 for k, v in p.iteritems():
91 prop[k] = v
92
93 if len(prop) > 0:
94 collist = ", ".join([str(k) for k in prop.keys()])
95 collist = "jobid, " + collist
96
97 vallist = ", ".join(["?" for k in prop.keys()])
98 vallist = "?, " + vallist
99
100 c = conn.cursor()
101 sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
102 c.execute(sqlstmt, [jobid] + list(prop.values()))
103
104 c.close()
105 conn.commit()
106
107 def insertJobs(conn, cmds):
108 conn.execute("BEGIN EXCLUSIVE")
109 conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
110 conn.commit()
111
112 def createSchema(conn):
113
114 c = conn.cursor()
115 c.execute("BEGIN EXCLUSIVE")
116
117 # Create table, if necessary
118 c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
119 if c.fetchone() == (0,):
120 print("Creating jobs table.")
121 conn.execute("CREATE TABLE jobs ( \
122 id INTEGER PRIMARY KEY AUTOINCREMENT, \
123 cmd STRING NOT NULL, \
124 started BOOL DEFAULT (0) NOT NULL, \
125 done BOOL DEFAULT (0) NOT NULL, \
126 exitcode INTEGER, \
127 workloadestm REAL DEFAULT (1) NOT NULL)")
128 c.close()
129 conn.commit()
130
131 def getNextJobId(conn):
132
133 c = conn.cursor()
134 c.execute("BEGIN EXCLUSIVE")
135 c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
136
137 r = c.fetchone()
138 if r == None:
139 return None
140
141 jobid, = r
142 conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
143
144 c.close()
145 conn.commit()
146
147 return jobid
148
149
150
151 def usage():
152 """Print usage text of this program"""
153
154 print("""
155 Take the jobs defined in jobs table of the given database and process one job
156 after the other. Multiple instances may be launched against the same database.
157
158 Usage:
159 {0} [OPTIONS] [COMMANDS] -d database
160 {0} -h
161
162 COMMANDS:
163 -c cmdfn add jobs from the file with list of commands
164 -h print this text
165 -s print status information
166 -w work on the database
167
168 OPTIONS:
169 -d database the database to process
170 -n num in -w mode, only perform num-many jobs
171 -p cols-def create properties table with SQL column spec
172 -v verbose output
173
174 Commands may be combined in one call of {0}.
175
176 A list of jobs may be importet line-by-line from a file using the -c option.
177 Every job may output to stdout or stderr a string of the form
178 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
179 It is assumed that a table 'properties' exists with the columns jobid, key,
180 key2, and key3. The corresponding values are inserted into this table. Using
181 the option -p such a properties table can be created by giving a list of
182 column definitions in SQL style.
183
184 The jobs table also contains a 'workloadestm' column that is used when
185 estimating the finished workload so far. The entries default to 1 and may be
186 set externally.
187
188 Examples:
189 # create cmds.sh with jobs
190 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
191 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
192 # create an initial database, but do not work
193 {0} -d jobs.db -c cmds.sh \\
194 -p 'number INTEGER, time REAL, mem INTEGER'
195 # launch two workers
196 {0} -d jobs.db -w &
197 {0} -d jobs.db -w &
198 # print status info
199 {0} -d jobs.db -s
200 """.format(sys.argv[0]))
201
202
203 if __name__ == "__main__":
204
205 nojobs = 1
206 dbfn = None
207 cmdfn = None
208 propdef = None
209 work = False
210 status = False
211 numjobs = None
212
213 try:
214 opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsvn:")
215
216 for opt, arg in opts:
217 if opt == "-h":
218 usage()
219 sys.exit(os.EX_OK)
220 elif opt == "-d":
221 dbfn = arg
222 elif opt == "-c":
223 cmdfn = arg
224 elif opt == "-p":
225 propdef = arg
226 elif opt == "-w":
227 work = True
228 elif opt == "-s":
229 status = True
230 elif opt == "-v":
231 verbose = True
232 elif opt == "-n":
233 numjobs = int(arg)
234 else:
235 print("Unknown option '", opt, "'.")
236
237 except getopt.GetoptError as e:
238 print("Error parsing arguments:", e)
239 usage()
240 sys.exit(os.EX_USAGE)
241
242 if dbfn == None:
243 print("No database given.")
244 sys.exit(os.EX_USAGE)
245
246 conn = sqlite3.connect(dbfn)
247 createSchema(conn)
248
249 if status:
250 printStatusInfo(conn)
251
252 if propdef != None:
253 createPropertiesTable(conn, propdef)
254
255 if cmdfn != None:
256 print("Adding jobs...")
257 cmds = open(cmdfn).readlines()
258 cmds = [(c.strip(),) for c in cmds]
259 insertJobs(conn, cmds)
260
261 if work:
262 n = 0
263 while not numjobs or n < numjobs:
264
265 jobid = getNextJobId(conn)
266 if jobid == None:
267 print("All jobs have been started.")
268 break
269 processJob(conn, jobid)
270 n += 1
271
272
273 conn.close()
274