890ff35d9bd5aaabb619cb2fb0dbeb85170a4e07
2 """ A simple tool to run jobs from a database in parallel."""
4 __author__
= "Stefan Huber"
5 __copyright__
= "Copyright 2013"
11 import sys
, getopt
, os
17 def printStatusInfo(conn
):
20 c
.execute("SELECT count(id) FROM jobs;")
21 nototal
, = c
.fetchone()
23 c
.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone
, = c
.fetchone()
26 c
.execute("SELECT count(id) FROM jobs WHERE started=1;")
27 nostarted
, = c
.fetchone()
29 c
.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
30 wldone
, = c
.fetchone()
34 c
.execute("SELECT sum(workloadestm) FROM jobs;")
35 wltotal
, = c
.fetchone()
42 perdone
= 100.0*float(nodone
)/float(nototal
)
44 perwl
= 100.0*float(wldone
)/float(wltotal
)
46 print("%d (%.1f%%) of %d jobs and %.1f%% of the workload done. %d jobs are running." % \
47 (nodone
, perdone
, nototal
, perwl
, nostarted
-nodone
))
49 def createPropertiesTable(conn
, propdef
):
50 conn
.execute("BEGIN EXCLUSIVE")
53 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
54 if c
.fetchone() == (0,):
55 print("Creating properties table.")
56 sqlstmt
= "CREATE TABLE properties (\
57 jobid INTEGER PRIMARY KEY,\
59 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef
,)
65 proc
= subprocess
.Popen(cmd
, \
66 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True)
67 out
, err
= proc
.communicate()
68 exitcode
= proc
.wait()
73 return exitcode
, out
, err
75 def processJob(conn
, jobid
):
78 c
.execute("SELECT cmd FROM jobs WHERE id=?", (jobid
,))
81 print("Process job %d: %s" % (jobid
, cmd
))
83 ec
, out
, err
= runCmd(cmd
)
84 c
.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec
, jobid
))
87 for l
in out
.splitlines():
88 if l
.startswith("DB-PROPERTIES:"):
90 for l
in err
.splitlines():
91 if l
.startswith("DB-PROPERTIES:"):
97 for k
, v
in p
.iteritems():
101 collist
= ", ".join([str(k
) for k
in prop
.keys()])
102 collist
= "jobid, " + collist
104 vallist
= ", ".join(["?" for k
in prop
.keys()])
105 vallist
= "?, " + vallist
108 sqlstmt
= "INSERT INTO properties (%s) VALUES (%s);" % (collist
,vallist
)
109 c
.execute(sqlstmt
, [jobid
] + list(prop
.values()))
114 def insertJobs(conn
, cmds
):
115 conn
.execute("BEGIN EXCLUSIVE")
116 conn
.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds
)
119 def createSchema(conn
):
122 c
.execute("BEGIN EXCLUSIVE")
124 # Create table, if necessary
125 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
126 if c
.fetchone() == (0,):
127 print("Creating jobs table.")
128 conn
.execute("CREATE TABLE jobs ( \
129 id INTEGER PRIMARY KEY AUTOINCREMENT, \
130 cmd STRING NOT NULL, \
131 started BOOL DEFAULT (0) NOT NULL, \
132 done BOOL DEFAULT (0) NOT NULL, \
134 workloadestm REAL DEFAULT (1) NOT NULL)")
138 def getNextJobId(conn
):
141 c
.execute("BEGIN EXCLUSIVE")
142 c
.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
149 conn
.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid
,))
159 """Print usage text of this program"""
162 Take the jobs defined in the jobs table of the given database and process one
163 after the other. Multiple instances may be launched against the same database.
166 {0} [OPTIONS] [COMMANDS] -d DB
170 -c FILE add each line as a job resp. job's command to DB
172 -s print status information
173 -w do work and process jobs
176 -d DB the database to process
177 -n NUM in -w mode, only process num-many jobs
178 -p COL-DEF create properties table with SQL column spec
181 Commands may be combined in one call of {0}.
183 A list of jobs may be importet line-by-line from a file using the -c option.
184 Every job may output to stdout or stderr one or more strings of the form
185 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
186 It is assumed that a table 'properties' exists with the columns jobid, key,
187 key2, and key3. The corresponding values are inserted into this table. Using
188 the option -p such a properties table can be created by giving a list of
189 column definitions in SQL style.
191 The jobs table also contains a 'workloadestm' column that is used when
192 estimating the finished workload so far. The entries default to 1 and may be
196 # create cmds.sh with jobs
197 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
198 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
199 # create an initial database, but do not work
200 {0} -d jobs.db -c cmds.sh \\
201 -p 'number INTEGER, time REAL, mem INTEGER'
207 """.format(sys
.argv
[0]))
210 if __name__
== "__main__":
221 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hd:c:p:wsvn:")
223 for opt
, arg
in opts
:
242 print("Unknown option '", opt
, "'.")
244 except getopt
.GetoptError
as e
:
245 print("Error parsing arguments:", e
)
247 sys
.exit(os
.EX_USAGE
)
250 print("No database given.")
251 sys
.exit(os
.EX_USAGE
)
253 conn
= sqlite3
.connect(dbfn
, timeout
=60)
257 printStatusInfo(conn
)
260 createPropertiesTable(conn
, propdef
)
263 print("Adding jobs...")
264 cmds
= open(cmdfn
).readlines()
265 cmds
= [(c
.strip(),) for c
in cmds
]
266 insertJobs(conn
, cmds
)
270 while not numjobs
or n
< numjobs
:
272 jobid
= getNextJobId(conn
)
274 print("All jobs have been started.")
276 processJob(conn
, jobid
)