f2e6eea15828b9c080945232db2b4d8e10c8dd92
2 """ A simple tool to run jobs from a database in parallel."""
4 __author__
= "Stefan Huber"
5 __copyright__
= "Copyright 2013"
11 import sys
, getopt
, os
17 def printStatusInfo(conn
):
20 c
.execute("SELECT count(id) FROM jobs;")
21 nototal
, = c
.fetchone()
23 c
.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone
, = c
.fetchone()
26 c
.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
27 wldone
, = c
.fetchone()
31 c
.execute("SELECT sum(workloadestm) FROM jobs;")
32 wltotal
, = c
.fetchone()
36 perdone
= 100.0*float(nodone
)/float(nototal
)
37 perwl
= 100.0*float(wldone
)/float(wltotal
)
39 print("%d (%.1f%%) of %d jobs done. %.1f%% of the workload finished." % \
40 (nodone
, perdone
, nototal
, perwl
))
42 def createPropertiesTable(conn
, propdef
):
43 conn
.execute("BEGIN EXCLUSIVE")
46 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
47 if c
.fetchone() == (0,):
48 print("Creating properties table.")
49 sqlstmt
= "CREATE TABLE properties (\
50 jobid INTEGER PRIMARY KEY,\
52 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef
,)
58 proc
= subprocess
.Popen(cmd
, \
59 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True)
60 out
, err
= proc
.communicate()
61 exitcode
= proc
.wait()
66 return exitcode
, out
, err
68 def processJob(conn
, jobid
):
69 print("Process job %d" % (jobid
))
72 c
.execute("SELECT cmd FROM jobs WHERE id=?", (jobid
,))
75 ec
, out
, err
= runCmd(cmd
)
76 c
.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec
, jobid
))
79 for l
in out
.splitlines():
80 if l
.startswith("DB-PROPERTIES:"):
82 for l
in err
.splitlines():
83 if l
.startswith("DB-PROPERTIES:"):
89 for k
, v
in p
.iteritems():
93 collist
= ", ".join([str(k
) for k
in prop
.keys()])
94 collist
= "jobid, " + collist
96 vallist
= ", ".join(["?" for k
in prop
.keys()])
97 vallist
= "?, " + vallist
100 sqlstmt
= "INSERT INTO properties (%s) VALUES (%s);" % (collist
,vallist
)
101 c
.execute(sqlstmt
, [jobid
] + list(prop
.values()))
106 def insertJobs(conn
, cmds
):
107 conn
.execute("BEGIN EXCLUSIVE")
108 conn
.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds
)
111 def createSchema(conn
):
114 c
.execute("BEGIN EXCLUSIVE")
116 # Create table, if necessary
117 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
118 if c
.fetchone() == (0,):
119 print("Creating jobs table.")
120 conn
.execute("CREATE TABLE jobs ( \
121 id INTEGER PRIMARY KEY AUTOINCREMENT, \
122 cmd STRING NOT NULL, \
123 started BOOL DEFAULT (0) NOT NULL, \
124 done BOOL DEFAULT (0) NOT NULL, \
126 workloadestm REAL DEFAULT (1) NOT NULL)")
130 def getNextJobId(conn
):
133 c
.execute("BEGIN EXCLUSIVE")
134 c
.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
141 conn
.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid
,))
151 """Print usage text of this program"""
154 Take the jobs defined in jobs table of the given database and process one job
155 after the other. Multiple instances may be launched against the same database.
158 {0} [OPTIONS] [COMMANDS] -d database
162 -c cmdfn add jobs from the file with list of commands
164 -s print status information
165 -w work on the database
168 -d database the database to process
169 -p cols-def create properties table with SQL column spec
170 -v print output of the job's command
172 Commands may be combined in one call of {0}.
174 A list of jobs may be importet line-by-line from a file using the -c option.
175 Every job may output to stdout or stderr a string of the form
176 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
177 It is assumed that a table 'properties' exists with the columns jobid, key,
178 key2, and key3. The corresponding values are inserted into this table. Using
179 the option -p such a properties table can be created by giving a list of
180 column definitions in SQL style.
182 The jobs table also contains a 'workloadestm' column that is used when
183 estimating the finished workload so far. The entries default to 1 and may be
187 # create cmds.sh with jobs
188 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
189 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
190 # create an initial database, but do not work
191 {0} -d jobs.db -c cmds.sh \\
192 -p 'number INTEGER, time REAL, mem INTEGER'
198 """.format(sys
.argv
[0]))
201 if __name__
== "__main__":
211 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hd:c:p:wsv")
213 for opt
, arg
in opts
:
230 print("Unknown option '", opt
, "'.")
232 except getopt
.GetoptError
as e
:
233 print("Error parsing arguments:", e
)
235 sys
.exit(os
.EX_USAGE
)
238 print("No database given.")
239 sys
.exit(os
.EX_USAGE
)
241 conn
= sqlite3
.connect(dbfn
)
245 printStatusInfo(conn
)
248 createPropertiesTable(conn
, propdef
)
251 print("Adding jobs...")
252 cmds
= open(cmdfn
).readlines()
253 cmds
= [(c
.strip(),) for c
in cmds
]
254 insertJobs(conn
, cmds
)
258 jobid
= getNextJobId(conn
)
260 print("All jobs have been started.")
262 processJob(conn
, jobid
)