baf79651b101056abf7eb20513c73303e8cbf695
[paralleljobs.git] / paralleljobs.py
1 #!/usr/bin/env python
2 """ A simple tool to run jobs from a database in parallel."""
3
4 __author__ = "Stefan Huber"
5 __copyright__ = "Copyright 2013"
6
7 __version__ = "1.0"
8 __license__ = "LGPL3"
9
10
11 import sys, getopt, os
12 import sqlite3
13 import subprocess
14
15 verbose = False
16
17 def printStatusInfo(conn):
18 c = conn.cursor()
19
20 c.execute("SELECT count(id) FROM jobs;")
21 nototal, = c.fetchone()
22
23 c.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone, = c.fetchone()
25
26 c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
27 wldone, = c.fetchone()
28 if wldone == None:
29 wldone = 0.0
30
31 c.execute("SELECT sum(workloadestm) FROM jobs;")
32 wltotal, = c.fetchone()
33
34 c.close()
35
36 perdone = 100.0*float(nodone)/float(nototal)
37 perwl = 100.0*float(wldone)/float(wltotal)
38
39 print("%d (%.1f%%) of %d jobs done. %.1f%% of the workload finished." % \
40 (nodone, perdone, nototal, perwl))
41
42 def createPropertiesTable(conn, propdef):
43 conn.execute("BEGIN EXCLUSIVE")
44
45 c = conn.cursor()
46 c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
47 if c.fetchone() == (0,):
48 print("Creating properties table.")
49 sqlstmt = "CREATE TABLE properties (\
50 jobid INTEGER PRIMARY KEY,\
51 %s, \
52 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
53 c.execute(sqlstmt)
54 c.close()
55 conn.commit()
56
57 def runCmd(cmd):
58 proc = subprocess.Popen(cmd, \
59 stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
60 out, err = proc.communicate()
61 exitcode = proc.wait()
62
63 if verbose:
64 print(out, err)
65
66 return exitcode, out, err
67
68 def processJob(conn, jobid):
69 print("Process job %d" % (jobid))
70
71 c = conn.cursor()
72 c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
73 cmd, = c.fetchone()
74
75 ec, out, err = runCmd(cmd)
76 c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
77
78 propstr = []
79 for l in out.splitlines():
80 if l.startswith("DB-PROPERTIES:"):
81 propstr += [l[14:]]
82 for l in err.splitlines():
83 if l.startswith("DB-PROPERTIES:"):
84 propstr += [l[14:]]
85
86 prop = {}
87 for ps in propstr:
88 p = eval(ps)
89 for k, v in p.iteritems():
90 prop[k] = v
91
92 if len(prop) > 0:
93 collist = ", ".join([str(k) for k in prop.keys()])
94 collist = "jobid, " + collist
95
96 vallist = ", ".join(["?" for k in prop.keys()])
97 vallist = "?, " + vallist
98
99 c = conn.cursor()
100 sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
101 c.execute(sqlstmt, [jobid] + list(prop.values()))
102
103 c.close()
104 conn.commit()
105
106 def insertJobs(conn, cmds):
107 conn.execute("BEGIN EXCLUSIVE")
108 conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
109 conn.commit()
110
111 def createSchema(conn):
112
113 c = conn.cursor()
114 c.execute("BEGIN EXCLUSIVE")
115
116 # Create table, if necessary
117 c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
118 if c.fetchone() == (0,):
119 print("Creating jobs table.")
120 conn.execute("CREATE TABLE jobs ( \
121 id INTEGER PRIMARY KEY AUTOINCREMENT, \
122 cmd STRING NOT NULL, \
123 started BOOL DEFAULT (0) NOT NULL, \
124 done BOOL DEFAULT (0) NOT NULL, \
125 exitcode INTEGER, \
126 workloadestm REAL DEFAULT (1) NOT NULL)")
127 c.close()
128 conn.commit()
129
130 def getNextJobId(conn):
131
132 c = conn.cursor()
133 c.execute("BEGIN EXCLUSIVE")
134 c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
135
136 r = c.fetchone()
137 if r == None:
138 return None
139
140 jobid, = r
141 conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
142
143 c.close()
144 conn.commit()
145
146 return jobid
147
148
149
150 def usage():
151 """Print usage text of this program"""
152
153 print("""
154 Take the jobs defined in jobs table of the given database and process one job
155 after the other. Multiple instances may be launched against the same database.
156
157 Usage:
158 {0} [OPTIONS] [COMMANDS] -d database
159 {0} -h
160
161 COMMANDS:
162 -c cmdfn add jobs from the file with list of commands
163 -h print this text
164 -s print status information
165 -w work on the database
166
167 OPTIONS:
168 -d database the database to process
169 -n num in -w mode, only perform num-many jobs
170 -p cols-def create properties table with SQL column spec
171 -v print output of the job's command
172
173 Commands may be combined in one call of {0}.
174
175 A list of jobs may be importet line-by-line from a file using the -c option.
176 Every job may output to stdout or stderr a string of the form
177 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
178 It is assumed that a table 'properties' exists with the columns jobid, key,
179 key2, and key3. The corresponding values are inserted into this table. Using
180 the option -p such a properties table can be created by giving a list of
181 column definitions in SQL style.
182
183 The jobs table also contains a 'workloadestm' column that is used when
184 estimating the finished workload so far. The entries default to 1 and may be
185 set externally.
186
187 Examples:
188 # create cmds.sh with jobs
189 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
190 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
191 # create an initial database, but do not work
192 {0} -d jobs.db -c cmds.sh \\
193 -p 'number INTEGER, time REAL, mem INTEGER'
194 # launch two workers
195 {0} -d jobs.db -w &
196 {0} -d jobs.db -w &
197 # print status info
198 {0} -d jobs.db -s
199 """.format(sys.argv[0]))
200
201
202 if __name__ == "__main__":
203
204 nojobs = 1
205 dbfn = None
206 cmdfn = None
207 propdef = None
208 work = False
209 status = False
210 numjobs = None
211
212 try:
213 opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsvn:")
214
215 for opt, arg in opts:
216 if opt == "-h":
217 usage()
218 sys.exit(os.EX_OK)
219 elif opt == "-d":
220 dbfn = arg
221 elif opt == "-c":
222 cmdfn = arg
223 elif opt == "-p":
224 propdef = arg
225 elif opt == "-w":
226 work = True
227 elif opt == "-s":
228 status = True
229 elif opt == "-v":
230 verbose = True
231 elif opt == "-n":
232 numjobs = int(arg)
233 else:
234 print("Unknown option '", opt, "'.")
235
236 except getopt.GetoptError as e:
237 print("Error parsing arguments:", e)
238 usage()
239 sys.exit(os.EX_USAGE)
240
241 if dbfn == None:
242 print("No database given.")
243 sys.exit(os.EX_USAGE)
244
245 conn = sqlite3.connect(dbfn)
246 createSchema(conn)
247
248 if status:
249 printStatusInfo(conn)
250
251 if propdef != None:
252 createPropertiesTable(conn, propdef)
253
254 if cmdfn != None:
255 print("Adding jobs...")
256 cmds = open(cmdfn).readlines()
257 cmds = [(c.strip(),) for c in cmds]
258 insertJobs(conn, cmds)
259
260 if work:
261 n = 0
262 while not numjobs or n < numjobs:
263
264 jobid = getNextJobId(conn)
265 if jobid == None:
266 print("All jobs have been started.")
267 break
268 processJob(conn, jobid)
269 n += 1
270
271
272 conn.close()
273