--- /dev/null
+#!/usr/bin/python
+
+import sys, getopt, os
+import sqlite3
+import subprocess
+
+verbose = False
+
+def printStatusInfo(conn):
+ c = conn.cursor()
+
+ c.execute("SELECT count(id) FROM jobs;")
+ nototal, = c.fetchone()
+
+ c.execute("SELECT count(id) FROM jobs WHERE done=1;")
+ nodone, = c.fetchone()
+
+ c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
+ wldone, = c.fetchone()
+ if wldone == None:
+ wldone = 0.0
+
+ c.execute("SELECT sum(workloadestm) FROM jobs;")
+ wltotal, = c.fetchone()
+
+ c.close()
+
+ print(nototal, nodone, wldone, wltotal)
+ perdone = 100.0*float(nodone)/float(nototal)
+ perwl = 100.0*float(wldone)/float(wltotal)
+
+ print("%d (%.1f%%) of %d jobs done. %.1f%% of the workload finished." % \
+ (nodone, perdone, nototal, perwl))
+
+def createPropertiesTable(conn, propdef):
+ conn.execute("BEGIN EXCLUSIVE")
+
+ c = conn.cursor()
+ c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
+ if c.fetchone() == (0,):
+ print("Creating properties table.")
+ sqlstmt = "CREATE TABLE properties (\
+ jobid INTEGER PRIMARY KEY,\
+ %s, \
+ FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
+ c.execute(sqlstmt)
+ c.close()
+ conn.commit()
+
+def runCmd(cmd):
+ proc = subprocess.Popen(cmd, \
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
+ out, err = proc.communicate()
+ exitcode = proc.wait()
+
+ if verbose:
+ print(out, err)
+
+ return exitcode, out, err
+
+def processJob(conn, jobid):
+ print("Process job %d" % (jobid))
+
+ c = conn.cursor()
+ c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
+ cmd, = c.fetchone()
+
+ ec, out, err = runCmd(cmd)
+ c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
+
+ propstr = []
+ for l in out.splitlines():
+ if l.startswith("DB-PROPERTIES:"):
+ propstr += [l[14:]]
+ for l in err.splitlines():
+ if l.startswith("DB-PROPERTIES:"):
+ propstr += [l[14:]]
+
+ prop = {}
+ for ps in propstr:
+ p = eval(ps)
+ for k, v in p.iteritems():
+ prop[k] = v
+
+ if len(prop) > 0:
+ collist = ", ".join([str(k) for k in prop.keys()])
+ collist = "jobid, " + collist
+
+ vallist = ", ".join(["?" for k in prop.keys()])
+ vallist = "?, " + vallist
+
+ c = conn.cursor()
+ sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
+ c.execute(sqlstmt, [jobid] + list(prop.values()))
+
+ c.close()
+ conn.commit()
+
+def insertJobs(conn, cmds):
+ conn.execute("BEGIN EXCLUSIVE")
+ conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
+ conn.commit()
+
+def createSchema(conn):
+
+ c = conn.cursor()
+ c.execute("BEGIN EXCLUSIVE")
+
+ # Create table, if necessary
+ c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
+ if c.fetchone() == (0,):
+ print("Creating jobs table.")
+ conn.execute("CREATE TABLE jobs ( \
+ id INTEGER PRIMARY KEY AUTOINCREMENT, \
+ cmd STRING NOT NULL, \
+ started BOOL DEFAULT (0) NOT NULL, \
+ done BOOL DEFAULT (0) NOT NULL, \
+ exitcode INTEGER, \
+ workloadestm REAL DEFAULT (1) NOT NULL)")
+ c.close()
+ conn.commit()
+
+def getNextJobId(conn):
+
+ c = conn.cursor()
+ c.execute("BEGIN EXCLUSIVE")
+ c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
+
+ r = c.fetchone()
+ if r == None:
+ return None
+
+ jobid, = r
+ conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
+
+ c.close()
+ conn.commit()
+
+ return jobid
+
+
+
+def usage():
+ """Print usage text of this program"""
+
+ print("""
+Take the jobs defined in jobs table of the given database and process one job
+after the other. Multiple instances may be launched against the same database.
+
+Usage:
+ {0} [OPTIONS] [COMMANDS] -d database
+ {0} -h
+
+COMMANDS:
+ -c cmdfn add jobs from the file with list of commands
+ -h print this text
+ -s print status information
+ -w work on the database
+
+OPTIONS:
+ -d database the database to process
+ -p cols-def create properties table with SQL column spec
+ -v print output of the job's command
+
+Commands may be combined in one call of {0}.
+
+A list of jobs may be importet line-by-line from a file using the -c option.
+Every job may output to stdout or stderr a string of the form
+ DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
+It is assumed that a table 'properties' exists with the columns jobid, key,
+key2, and key3. The corresponding values are inserted into this table. Using
+the option -p such a properties table can be created by giving a list of
+column definitions in SQL style.
+
+The jobs table also contains a 'workloadestm' column that is used when
+estimating the finished workload so far. The entries default to 1 and may be
+set externally.
+
+Examples:
+ # create cmds.sh with jobs
+ echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
+ echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
+ # create an initial database, but do not work
+ {0} -d jobs.db -c cmds.sh \\
+ -p 'number INTEGER, time REAL, mem INTEGER'
+ # launch two workers
+ {0} -d jobs.db -w &
+ {0} -d jobs.db -w &
+ # print status info
+ {0} -d jobs.db -s
+""".format(sys.argv[0]))
+
+
+if __name__ == "__main__":
+
+ nojobs = 1
+ dbfn = None
+ cmdfn = None
+ propdef = None
+ work = False
+ status = False
+
+ try:
+ opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsv")
+
+ for opt, arg in opts:
+ if opt == "-h":
+ usage()
+ sys.exit(os.EX_OK)
+ elif opt == "-d":
+ dbfn = arg
+ elif opt == "-c":
+ cmdfn = arg
+ elif opt == "-p":
+ propdef = arg
+ elif opt == "-w":
+ work = True
+ elif opt == "-s":
+ status = True
+ elif opt == "-v":
+ verbose = True
+ else:
+ print("Unknown option '", opt, "'.")
+
+ except getopt.GetoptError as e:
+ print("Error parsing arguments:", e)
+ usage()
+ sys.exit(os.EX_USAGE)
+
+ if dbfn == None:
+ print("No database given.")
+ sys.exit(os.EX_USAGE)
+
+ conn = sqlite3.connect(dbfn)
+ createSchema(conn)
+
+ if status:
+ printStatusInfo(conn)
+
+ if propdef != None:
+ createPropertiesTable(conn, propdef)
+
+ if cmdfn != None:
+ print("Adding jobs...")
+ cmds = open(cmdfn).readlines()
+ cmds = [(c.strip(),) for c in cmds]
+ insertJobs(conn, cmds)
+
+ if work:
+ while True:
+ jobid = getNextJobId(conn)
+ if jobid == None:
+ print("All jobs have been started.")
+ break
+ processJob(conn, jobid)
+
+
+ conn.close()
+