increase sqlite3 timeout
[paralleljobs.git] / paralleljobs.py
1 #!/usr/bin/env python
2 """ A simple tool to run jobs from a database in parallel."""
3
4 __author__ = "Stefan Huber"
5 __copyright__ = "Copyright 2013"
6
7 __version__ = "1.0"
8 __license__ = "LGPL3"
9
10
11 import sys, getopt, os
12 import sqlite3
13 import subprocess
14
15 verbose = False
16
17 def printStatusInfo(conn):
18 c = conn.cursor()
19
20 c.execute("SELECT count(id) FROM jobs;")
21 nototal, = c.fetchone()
22
23 c.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone, = c.fetchone()
25
26 c.execute("SELECT count(id) FROM jobs WHERE started=1;")
27 nostarted, = c.fetchone()
28
29 c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
30 wldone, = c.fetchone()
31 if wldone == None:
32 wldone = 0.0
33
34 c.execute("SELECT sum(workloadestm) FROM jobs;")
35 wltotal, = c.fetchone()
36
37 c.close()
38
39 perdone = 0
40 perwl = 0
41 if nototal > 0:
42 perdone = 100.0*float(nodone)/float(nototal)
43 if wltotal > 0:
44 perwl = 100.0*float(wldone)/float(wltotal)
45
46 print("%d (%.1f%%) of %d jobs and %.1f%% of the workload done. %d jobs are running." % \
47 (nodone, perdone, nototal, perwl, nostarted-nodone))
48
49 def createPropertiesTable(conn, propdef):
50 conn.execute("BEGIN EXCLUSIVE")
51
52 c = conn.cursor()
53 c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
54 if c.fetchone() == (0,):
55 print("Creating properties table.")
56 sqlstmt = "CREATE TABLE properties (\
57 jobid INTEGER PRIMARY KEY,\
58 %s, \
59 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
60 c.execute(sqlstmt)
61 c.close()
62 conn.commit()
63
64 def runCmd(cmd):
65 proc = subprocess.Popen(cmd, \
66 stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
67 out, err = proc.communicate()
68 exitcode = proc.wait()
69
70 if verbose:
71 print(out, err)
72
73 return exitcode, out, err
74
75 def processJob(conn, jobid):
76
77 c = conn.cursor()
78 c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
79 cmd, = c.fetchone()
80
81 print("Process job %d: %s" % (jobid, cmd))
82
83 ec, out, err = runCmd(cmd)
84 c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
85
86 propstr = []
87 for l in out.splitlines():
88 if l.startswith("DB-PROPERTIES:"):
89 propstr += [l[14:]]
90 for l in err.splitlines():
91 if l.startswith("DB-PROPERTIES:"):
92 propstr += [l[14:]]
93
94 prop = {}
95 for ps in propstr:
96 p = eval(ps)
97 for k, v in p.iteritems():
98 prop[k] = v
99
100 if len(prop) > 0:
101 collist = ", ".join([str(k) for k in prop.keys()])
102 collist = "jobid, " + collist
103
104 vallist = ", ".join(["?" for k in prop.keys()])
105 vallist = "?, " + vallist
106
107 c = conn.cursor()
108 sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
109 c.execute(sqlstmt, [jobid] + list(prop.values()))
110
111 c.close()
112 conn.commit()
113
114 def insertJobs(conn, cmds):
115 conn.execute("BEGIN EXCLUSIVE")
116 conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
117 conn.commit()
118
119 def createSchema(conn):
120
121 c = conn.cursor()
122 c.execute("BEGIN EXCLUSIVE")
123
124 # Create table, if necessary
125 c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
126 if c.fetchone() == (0,):
127 print("Creating jobs table.")
128 conn.execute("CREATE TABLE jobs ( \
129 id INTEGER PRIMARY KEY AUTOINCREMENT, \
130 cmd STRING NOT NULL, \
131 started BOOL DEFAULT (0) NOT NULL, \
132 done BOOL DEFAULT (0) NOT NULL, \
133 exitcode INTEGER, \
134 workloadestm REAL DEFAULT (1) NOT NULL)")
135 c.close()
136 conn.commit()
137
138 def getNextJobId(conn):
139
140 c = conn.cursor()
141 c.execute("BEGIN EXCLUSIVE")
142 c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
143
144 r = c.fetchone()
145 if r == None:
146 return None
147
148 jobid, = r
149 conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
150
151 c.close()
152 conn.commit()
153
154 return jobid
155
156
157
158 def usage():
159 """Print usage text of this program"""
160
161 print("""
162 Take the jobs defined in the jobs table of the given database and process one
163 after the other. Multiple instances may be launched against the same database.
164
165 Usage:
166 {0} [OPTIONS] [COMMANDS] -d DB
167 {0} -h
168
169 COMMANDS:
170 -c FILE add each line as a job resp. job's command to DB
171 -h print this text
172 -s print status information
173 -w do work and process jobs
174
175 OPTIONS:
176 -d DB the database to process
177 -n NUM in -w mode, only process num-many jobs
178 -p COL-DEF create properties table with SQL column spec
179 -v verbose output
180
181 Commands may be combined in one call of {0}.
182
183 A list of jobs may be importet line-by-line from a file using the -c option.
184 Every job may output to stdout or stderr one or more strings of the form
185 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
186 It is assumed that a table 'properties' exists with the columns jobid, key,
187 key2, and key3. The corresponding values are inserted into this table. Using
188 the option -p such a properties table can be created by giving a list of
189 column definitions in SQL style.
190
191 The jobs table also contains a 'workloadestm' column that is used when
192 estimating the finished workload so far. The entries default to 1 and may be
193 adjusted.
194
195 Examples:
196 # create cmds.sh with jobs
197 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
198 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
199 # create an initial database, but do not work
200 {0} -d jobs.db -c cmds.sh \\
201 -p 'number INTEGER, time REAL, mem INTEGER'
202 # launch two workers
203 {0} -d jobs.db -w &
204 {0} -d jobs.db -w &
205 # print status info
206 {0} -d jobs.db -s
207 """.format(sys.argv[0]))
208
209
210 if __name__ == "__main__":
211
212 nojobs = 1
213 dbfn = None
214 cmdfn = None
215 propdef = None
216 work = False
217 status = False
218 numjobs = None
219
220 try:
221 opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsvn:")
222
223 for opt, arg in opts:
224 if opt == "-h":
225 usage()
226 sys.exit(os.EX_OK)
227 elif opt == "-d":
228 dbfn = arg
229 elif opt == "-c":
230 cmdfn = arg
231 elif opt == "-p":
232 propdef = arg
233 elif opt == "-w":
234 work = True
235 elif opt == "-s":
236 status = True
237 elif opt == "-v":
238 verbose = True
239 elif opt == "-n":
240 numjobs = int(arg)
241 else:
242 print("Unknown option '", opt, "'.")
243
244 except getopt.GetoptError as e:
245 print("Error parsing arguments:", e)
246 usage()
247 sys.exit(os.EX_USAGE)
248
249 if dbfn == None:
250 print("No database given.")
251 sys.exit(os.EX_USAGE)
252
253 conn = sqlite3.connect(dbfn, timeout=60)
254 createSchema(conn)
255
256 if status:
257 printStatusInfo(conn)
258
259 if propdef != None:
260 createPropertiesTable(conn, propdef)
261
262 if cmdfn != None:
263 print("Adding jobs...")
264 cmds = open(cmdfn).readlines()
265 cmds = [(c.strip(),) for c in cmds]
266 insertJobs(conn, cmds)
267
268 if work:
269 n = 0
270 while not numjobs or n < numjobs:
271
272 jobid = getNextJobId(conn)
273 if jobid == None:
274 print("All jobs have been started.")
275 break
276 processJob(conn, jobid)
277 n += 1
278
279
280 conn.close()
281