f2e6eea15828b9c080945232db2b4d8e10c8dd92
[paralleljobs.git] / paralleljobs.py
1 #!/usr/bin/env python
2 """ A simple tool to run jobs from a database in parallel."""
3
4 __author__ = "Stefan Huber"
5 __copyright__ = "Copyright 2013"
6
7 __version__ = "1.0"
8 __license__ = "LGPL3"
9
10
11 import sys, getopt, os
12 import sqlite3
13 import subprocess
14
15 verbose = False
16
17 def printStatusInfo(conn):
18 c = conn.cursor()
19
20 c.execute("SELECT count(id) FROM jobs;")
21 nototal, = c.fetchone()
22
23 c.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone, = c.fetchone()
25
26 c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
27 wldone, = c.fetchone()
28 if wldone == None:
29 wldone = 0.0
30
31 c.execute("SELECT sum(workloadestm) FROM jobs;")
32 wltotal, = c.fetchone()
33
34 c.close()
35
36 perdone = 100.0*float(nodone)/float(nototal)
37 perwl = 100.0*float(wldone)/float(wltotal)
38
39 print("%d (%.1f%%) of %d jobs done. %.1f%% of the workload finished." % \
40 (nodone, perdone, nototal, perwl))
41
42 def createPropertiesTable(conn, propdef):
43 conn.execute("BEGIN EXCLUSIVE")
44
45 c = conn.cursor()
46 c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
47 if c.fetchone() == (0,):
48 print("Creating properties table.")
49 sqlstmt = "CREATE TABLE properties (\
50 jobid INTEGER PRIMARY KEY,\
51 %s, \
52 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
53 c.execute(sqlstmt)
54 c.close()
55 conn.commit()
56
57 def runCmd(cmd):
58 proc = subprocess.Popen(cmd, \
59 stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
60 out, err = proc.communicate()
61 exitcode = proc.wait()
62
63 if verbose:
64 print(out, err)
65
66 return exitcode, out, err
67
68 def processJob(conn, jobid):
69 print("Process job %d" % (jobid))
70
71 c = conn.cursor()
72 c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
73 cmd, = c.fetchone()
74
75 ec, out, err = runCmd(cmd)
76 c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
77
78 propstr = []
79 for l in out.splitlines():
80 if l.startswith("DB-PROPERTIES:"):
81 propstr += [l[14:]]
82 for l in err.splitlines():
83 if l.startswith("DB-PROPERTIES:"):
84 propstr += [l[14:]]
85
86 prop = {}
87 for ps in propstr:
88 p = eval(ps)
89 for k, v in p.iteritems():
90 prop[k] = v
91
92 if len(prop) > 0:
93 collist = ", ".join([str(k) for k in prop.keys()])
94 collist = "jobid, " + collist
95
96 vallist = ", ".join(["?" for k in prop.keys()])
97 vallist = "?, " + vallist
98
99 c = conn.cursor()
100 sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
101 c.execute(sqlstmt, [jobid] + list(prop.values()))
102
103 c.close()
104 conn.commit()
105
106 def insertJobs(conn, cmds):
107 conn.execute("BEGIN EXCLUSIVE")
108 conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
109 conn.commit()
110
111 def createSchema(conn):
112
113 c = conn.cursor()
114 c.execute("BEGIN EXCLUSIVE")
115
116 # Create table, if necessary
117 c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
118 if c.fetchone() == (0,):
119 print("Creating jobs table.")
120 conn.execute("CREATE TABLE jobs ( \
121 id INTEGER PRIMARY KEY AUTOINCREMENT, \
122 cmd STRING NOT NULL, \
123 started BOOL DEFAULT (0) NOT NULL, \
124 done BOOL DEFAULT (0) NOT NULL, \
125 exitcode INTEGER, \
126 workloadestm REAL DEFAULT (1) NOT NULL)")
127 c.close()
128 conn.commit()
129
130 def getNextJobId(conn):
131
132 c = conn.cursor()
133 c.execute("BEGIN EXCLUSIVE")
134 c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
135
136 r = c.fetchone()
137 if r == None:
138 return None
139
140 jobid, = r
141 conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
142
143 c.close()
144 conn.commit()
145
146 return jobid
147
148
149
150 def usage():
151 """Print usage text of this program"""
152
153 print("""
154 Take the jobs defined in jobs table of the given database and process one job
155 after the other. Multiple instances may be launched against the same database.
156
157 Usage:
158 {0} [OPTIONS] [COMMANDS] -d database
159 {0} -h
160
161 COMMANDS:
162 -c cmdfn add jobs from the file with list of commands
163 -h print this text
164 -s print status information
165 -w work on the database
166
167 OPTIONS:
168 -d database the database to process
169 -p cols-def create properties table with SQL column spec
170 -v print output of the job's command
171
172 Commands may be combined in one call of {0}.
173
174 A list of jobs may be importet line-by-line from a file using the -c option.
175 Every job may output to stdout or stderr a string of the form
176 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
177 It is assumed that a table 'properties' exists with the columns jobid, key,
178 key2, and key3. The corresponding values are inserted into this table. Using
179 the option -p such a properties table can be created by giving a list of
180 column definitions in SQL style.
181
182 The jobs table also contains a 'workloadestm' column that is used when
183 estimating the finished workload so far. The entries default to 1 and may be
184 set externally.
185
186 Examples:
187 # create cmds.sh with jobs
188 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
189 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
190 # create an initial database, but do not work
191 {0} -d jobs.db -c cmds.sh \\
192 -p 'number INTEGER, time REAL, mem INTEGER'
193 # launch two workers
194 {0} -d jobs.db -w &
195 {0} -d jobs.db -w &
196 # print status info
197 {0} -d jobs.db -s
198 """.format(sys.argv[0]))
199
200
201 if __name__ == "__main__":
202
203 nojobs = 1
204 dbfn = None
205 cmdfn = None
206 propdef = None
207 work = False
208 status = False
209
210 try:
211 opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsv")
212
213 for opt, arg in opts:
214 if opt == "-h":
215 usage()
216 sys.exit(os.EX_OK)
217 elif opt == "-d":
218 dbfn = arg
219 elif opt == "-c":
220 cmdfn = arg
221 elif opt == "-p":
222 propdef = arg
223 elif opt == "-w":
224 work = True
225 elif opt == "-s":
226 status = True
227 elif opt == "-v":
228 verbose = True
229 else:
230 print("Unknown option '", opt, "'.")
231
232 except getopt.GetoptError as e:
233 print("Error parsing arguments:", e)
234 usage()
235 sys.exit(os.EX_USAGE)
236
237 if dbfn == None:
238 print("No database given.")
239 sys.exit(os.EX_USAGE)
240
241 conn = sqlite3.connect(dbfn)
242 createSchema(conn)
243
244 if status:
245 printStatusInfo(conn)
246
247 if propdef != None:
248 createPropertiesTable(conn, propdef)
249
250 if cmdfn != None:
251 print("Adding jobs...")
252 cmds = open(cmdfn).readlines()
253 cmds = [(c.strip(),) for c in cmds]
254 insertJobs(conn, cmds)
255
256 if work:
257 while True:
258 jobid = getNextJobId(conn)
259 if jobid == None:
260 print("All jobs have been started.")
261 break
262 processJob(conn, jobid)
263
264
265 conn.close()
266