d8f974d13cfb933cc248785f326a2df898f12885
[shutils.git] / paralleljobs.py
1 #!/usr/bin/python
2
3 import sys, getopt, os
4 import sqlite3
5 import subprocess
6
7 verbose = False
8
9 def printStatusInfo(conn):
10 c = conn.cursor()
11
12 c.execute("SELECT count(id) FROM jobs;")
13 nototal, = c.fetchone()
14
15 c.execute("SELECT count(id) FROM jobs WHERE done=1;")
16 nodone, = c.fetchone()
17
18 c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
19 wldone, = c.fetchone()
20 if wldone == None:
21 wldone = 0.0
22
23 c.execute("SELECT sum(workloadestm) FROM jobs;")
24 wltotal, = c.fetchone()
25
26 c.close()
27
28 print(nototal, nodone, wldone, wltotal)
29 perdone = 100.0*float(nodone)/float(nototal)
30 perwl = 100.0*float(wldone)/float(wltotal)
31
32 print("%d (%.1f%%) of %d jobs done. %.1f%% of the workload finished." % \
33 (nodone, perdone, nototal, perwl))
34
35 def createPropertiesTable(conn, propdef):
36 conn.execute("BEGIN EXCLUSIVE")
37
38 c = conn.cursor()
39 c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
40 if c.fetchone() == (0,):
41 print("Creating properties table.")
42 sqlstmt = "CREATE TABLE properties (\
43 jobid INTEGER PRIMARY KEY,\
44 %s, \
45 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
46 c.execute(sqlstmt)
47 c.close()
48 conn.commit()
49
50 def runCmd(cmd):
51 proc = subprocess.Popen(cmd, \
52 stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
53 out, err = proc.communicate()
54 exitcode = proc.wait()
55
56 if verbose:
57 print(out, err)
58
59 return exitcode, out, err
60
61 def processJob(conn, jobid):
62 print("Process job %d" % (jobid))
63
64 c = conn.cursor()
65 c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
66 cmd, = c.fetchone()
67
68 ec, out, err = runCmd(cmd)
69 c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
70
71 propstr = []
72 for l in out.splitlines():
73 if l.startswith("DB-PROPERTIES:"):
74 propstr += [l[14:]]
75 for l in err.splitlines():
76 if l.startswith("DB-PROPERTIES:"):
77 propstr += [l[14:]]
78
79 prop = {}
80 for ps in propstr:
81 p = eval(ps)
82 for k, v in p.iteritems():
83 prop[k] = v
84
85 if len(prop) > 0:
86 collist = ", ".join([str(k) for k in prop.keys()])
87 collist = "jobid, " + collist
88
89 vallist = ", ".join(["?" for k in prop.keys()])
90 vallist = "?, " + vallist
91
92 c = conn.cursor()
93 sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
94 c.execute(sqlstmt, [jobid] + list(prop.values()))
95
96 c.close()
97 conn.commit()
98
99 def insertJobs(conn, cmds):
100 conn.execute("BEGIN EXCLUSIVE")
101 conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
102 conn.commit()
103
104 def createSchema(conn):
105
106 c = conn.cursor()
107 c.execute("BEGIN EXCLUSIVE")
108
109 # Create table, if necessary
110 c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
111 if c.fetchone() == (0,):
112 print("Creating jobs table.")
113 conn.execute("CREATE TABLE jobs ( \
114 id INTEGER PRIMARY KEY AUTOINCREMENT, \
115 cmd STRING NOT NULL, \
116 started BOOL DEFAULT (0) NOT NULL, \
117 done BOOL DEFAULT (0) NOT NULL, \
118 exitcode INTEGER, \
119 workloadestm REAL DEFAULT (1) NOT NULL)")
120 c.close()
121 conn.commit()
122
123 def getNextJobId(conn):
124
125 c = conn.cursor()
126 c.execute("BEGIN EXCLUSIVE")
127 c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
128
129 r = c.fetchone()
130 if r == None:
131 return None
132
133 jobid, = r
134 conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
135
136 c.close()
137 conn.commit()
138
139 return jobid
140
141
142
143 def usage():
144 """Print usage text of this program"""
145
146 print("""
147 Take the jobs defined in jobs table of the given database and process one job
148 after the other. Multiple instances may be launched against the same database.
149
150 Usage:
151 {0} [OPTIONS] [COMMANDS] -d database
152 {0} -h
153
154 COMMANDS:
155 -c cmdfn add jobs from the file with list of commands
156 -h print this text
157 -s print status information
158 -w work on the database
159
160 OPTIONS:
161 -d database the database to process
162 -p cols-def create properties table with SQL column spec
163 -v print output of the job's command
164
165 Commands may be combined in one call of {0}.
166
167 A list of jobs may be importet line-by-line from a file using the -c option.
168 Every job may output to stdout or stderr a string of the form
169 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
170 It is assumed that a table 'properties' exists with the columns jobid, key,
171 key2, and key3. The corresponding values are inserted into this table. Using
172 the option -p such a properties table can be created by giving a list of
173 column definitions in SQL style.
174
175 The jobs table also contains a 'workloadestm' column that is used when
176 estimating the finished workload so far. The entries default to 1 and may be
177 set externally.
178
179 Examples:
180 # create cmds.sh with jobs
181 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
182 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
183 # create an initial database, but do not work
184 {0} -d jobs.db -c cmds.sh -p 'number INTEGER, time REAL, mem INTEGER'
185 # launch two workers
186 {0} -d jobs.db -w &
187 {0} -d jobs.db -w &
188 # print status info
189 {0} -d jobs.db -s
190 """.format(sys.argv[0]))
191
192
193 if __name__ == "__main__":
194
195 nojobs = 1
196 dbfn = None
197 cmdfn = None
198 propdef = None
199 work = False
200 status = False
201
202 try:
203 opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsv")
204
205 for opt, arg in opts:
206 if opt == "-h":
207 usage()
208 sys.exit(os.EX_OK)
209 elif opt == "-d":
210 dbfn = arg
211 elif opt == "-c":
212 cmdfn = arg
213 elif opt == "-p":
214 propdef = arg
215 elif opt == "-w":
216 work = True
217 elif opt == "-s":
218 status = True
219 elif opt == "-v":
220 verbose = True
221 else:
222 print("Unknown option '", opt, "'.")
223
224 except getopt.GetoptError as e:
225 print("Error parsing arguments:", e)
226 usage()
227 sys.exit(os.EX_USAGE)
228
229 if dbfn == None:
230 print("No database given.")
231 sys.exit(os.EX_USAGE)
232
233 conn = sqlite3.connect(dbfn)
234 createSchema(conn)
235
236 if status:
237 printStatusInfo(conn)
238
239 if propdef != None:
240 createPropertiesTable(conn, propdef)
241
242 if cmdfn != None:
243 print("Adding jobs...")
244 cmds = open(cmdfn).readlines()
245 cmds = [(c.strip(),) for c in cmds]
246 insertJobs(conn, cmds)
247
248 if work:
249 while True:
250 jobid = getNextJobId(conn)
251 if jobid == None:
252 print("All jobs have been started.")
253 break
254 processJob(conn, jobid)
255
256
257 conn.close()
258