6eb92dda40b803a9543ea161360170e4f50ead0a
2 """ A simple tool to run jobs from a database in parallel."""
4 __author__
= "Stefan Huber"
5 __copyright__
= "Copyright 2013"
11 import sys
, getopt
, os
17 def printStatusInfo(conn
):
20 c
.execute("SELECT count(id) FROM jobs;")
21 nototal
, = c
.fetchone()
23 c
.execute("SELECT count(id) FROM jobs WHERE done=1;")
24 nodone
, = c
.fetchone()
26 c
.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
27 wldone
, = c
.fetchone()
31 c
.execute("SELECT sum(workloadestm) FROM jobs;")
32 wltotal
, = c
.fetchone()
36 perdone
= 100.0*float(nodone
)/float(nototal
)
37 perwl
= 100.0*float(wldone
)/float(wltotal
)
39 print("%d (%.1f%%) of %d jobs done. %.1f%% of the workload finished." % \
40 (nodone
, perdone
, nototal
, perwl
))
42 def createPropertiesTable(conn
, propdef
):
43 conn
.execute("BEGIN EXCLUSIVE")
46 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
47 if c
.fetchone() == (0,):
48 print("Creating properties table.")
49 sqlstmt
= "CREATE TABLE properties (\
50 jobid INTEGER PRIMARY KEY,\
52 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef
,)
58 proc
= subprocess
.Popen(cmd
, \
59 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True)
60 out
, err
= proc
.communicate()
61 exitcode
= proc
.wait()
66 return exitcode
, out
, err
68 def processJob(conn
, jobid
):
71 c
.execute("SELECT cmd FROM jobs WHERE id=?", (jobid
,))
74 print("Process job %d: %s" % (jobid
, cmd
))
76 ec
, out
, err
= runCmd(cmd
)
77 c
.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec
, jobid
))
80 for l
in out
.splitlines():
81 if l
.startswith("DB-PROPERTIES:"):
83 for l
in err
.splitlines():
84 if l
.startswith("DB-PROPERTIES:"):
90 for k
, v
in p
.iteritems():
94 collist
= ", ".join([str(k
) for k
in prop
.keys()])
95 collist
= "jobid, " + collist
97 vallist
= ", ".join(["?" for k
in prop
.keys()])
98 vallist
= "?, " + vallist
101 sqlstmt
= "INSERT INTO properties (%s) VALUES (%s);" % (collist
,vallist
)
102 c
.execute(sqlstmt
, [jobid
] + list(prop
.values()))
107 def insertJobs(conn
, cmds
):
108 conn
.execute("BEGIN EXCLUSIVE")
109 conn
.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds
)
112 def createSchema(conn
):
115 c
.execute("BEGIN EXCLUSIVE")
117 # Create table, if necessary
118 c
.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
119 if c
.fetchone() == (0,):
120 print("Creating jobs table.")
121 conn
.execute("CREATE TABLE jobs ( \
122 id INTEGER PRIMARY KEY AUTOINCREMENT, \
123 cmd STRING NOT NULL, \
124 started BOOL DEFAULT (0) NOT NULL, \
125 done BOOL DEFAULT (0) NOT NULL, \
127 workloadestm REAL DEFAULT (1) NOT NULL)")
131 def getNextJobId(conn
):
134 c
.execute("BEGIN EXCLUSIVE")
135 c
.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
142 conn
.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid
,))
152 """Print usage text of this program"""
155 Take the jobs defined in jobs table of the given database and process one job
156 after the other. Multiple instances may be launched against the same database.
159 {0} [OPTIONS] [COMMANDS] -d database
163 -c cmdfn add jobs from the file with list of commands
165 -s print status information
166 -w work on the database
169 -d database the database to process
170 -n num in -w mode, only perform num-many jobs
171 -p cols-def create properties table with SQL column spec
172 -v print output of the job's command
174 Commands may be combined in one call of {0}.
176 A list of jobs may be importet line-by-line from a file using the -c option.
177 Every job may output to stdout or stderr a string of the form
178 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
179 It is assumed that a table 'properties' exists with the columns jobid, key,
180 key2, and key3. The corresponding values are inserted into this table. Using
181 the option -p such a properties table can be created by giving a list of
182 column definitions in SQL style.
184 The jobs table also contains a 'workloadestm' column that is used when
185 estimating the finished workload so far. The entries default to 1 and may be
189 # create cmds.sh with jobs
190 echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
191 echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
192 # create an initial database, but do not work
193 {0} -d jobs.db -c cmds.sh \\
194 -p 'number INTEGER, time REAL, mem INTEGER'
200 """.format(sys
.argv
[0]))
203 if __name__
== "__main__":
214 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hd:c:p:wsvn:")
216 for opt
, arg
in opts
:
235 print("Unknown option '", opt
, "'.")
237 except getopt
.GetoptError
as e
:
238 print("Error parsing arguments:", e
)
240 sys
.exit(os
.EX_USAGE
)
243 print("No database given.")
244 sys
.exit(os
.EX_USAGE
)
246 conn
= sqlite3
.connect(dbfn
)
250 printStatusInfo(conn
)
253 createPropertiesTable(conn
, propdef
)
256 print("Adding jobs...")
257 cmds
= open(cmdfn
).readlines()
258 cmds
= [(c
.strip(),) for c
in cmds
]
259 insertJobs(conn
, cmds
)
263 while not numjobs
or n
< numjobs
:
265 jobid
= getNextJobId(conn
)
267 print("All jobs have been started.")
269 processJob(conn
, jobid
)