From 2e254c2a892b7024a001999fc89f59da30f20be0 Mon Sep 17 00:00:00 2001 From: Stefan Huber Date: Fri, 26 Apr 2013 22:37:12 +0200 Subject: [PATCH] Add a parallel job processing tool --- paralleljobs.py | 188 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100755 paralleljobs.py diff --git a/paralleljobs.py b/paralleljobs.py new file mode 100755 index 0000000..ba579a4 --- /dev/null +++ b/paralleljobs.py @@ -0,0 +1,188 @@ +#!/usr/bin/python + +import sys, getopt, os +import sqlite3 +import subprocess + + +def createPropertiesTable(conn, propdef): + conn.execute("BEGIN EXCLUSIVE") + + c = conn.cursor() + c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';") + if c.fetchone() == (0,): + print("Creating properties table.") + sqlstmt = "CREATE TABLE properties (jobid INTEGER PRIMARY KEY, %s, \ + FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,) + c.execute(sqlstmt) + conn.commit() + +def runCmd(cmd): + proc = subprocess.Popen(cmd, \ + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + out, err = proc.communicate() + exitcode = proc.wait() + return exitcode, out, err + +def processJob(conn, jobid): + print("Process job %d" % (jobid)) + + c = conn.cursor() + c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,)) + cmd, = c.fetchone() + + ec, out, err = runCmd(cmd) + c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid)) + + propstr = [] + for l in out.splitlines(): + if l.startswith("DB-PROPERTIES:"): + propstr += [l[14:]] + for l in err.splitlines(): + if l.startswith("DB-PROPERTIES:"): + propstr += [l[14:]] + + prop = {} + for ps in propstr: + p = eval(ps) + for k, v in p.iteritems(): + prop[k] = v + + if len(prop) > 0: + collist = ", ".join([str(k) for k in prop.keys()]) + collist = "jobid, " + collist + + vallist = ", ".join(["?" for k in prop.keys()]) + vallist = "?, " + vallist + + c = conn.cursor() + sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist) + c.execute(sqlstmt, [jobid] + list(prop.values())) + + conn.commit() + +def insertJobs(conn, cmds): + conn.execute("BEGIN EXCLUSIVE") + conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds) + conn.commit() + +def createSchema(conn): + + c = conn.cursor() + c.execute("BEGIN EXCLUSIVE") + + # Create table, if necessary + c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';") + if c.fetchone() == (0,): + print("Creating jobs table.") + conn.execute("CREATE TABLE jobs ( \ + id INTEGER PRIMARY KEY AUTOINCREMENT, \ + cmd STRING NOT NULL, \ + started BOOL DEFAULT (0) NOT NULL, \ + done BOOL DEFAULT (0) NOT NULL, \ + exitcode INTEGER )") + conn.commit() + +def getNextJobId(conn): + + c = conn.cursor() + c.execute("BEGIN EXCLUSIVE") + c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;") + + r = c.fetchone() + if r == None: + return None + + jobid, = r + conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,)) + conn.commit() + + return jobid + + + +def usage(): + """Print usage text of this program""" + + print(""" +Take the jobs defined in jobs table of given database and process one job after +the other. Multiple instances may be launched against the same database. + +A list of jobs may be importet line-by-line from a file using the -c option. +Every job may output to stdout or stderr a string of the form + DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True}} +It is assumed that a table 'properties' exists with the columns jobid, key, +key2, and key3. The corresponding values are inserted into this table. Using +the option -p such a properties table can be created by giving a list of +column definitions in SQL style. + +Usage: + {0} [OPTIONS] -d database + {0} -h + +OPTIONS: + -c cmdfn add jobs from the file with list of commands + -d database the database to process + -h print this text + -p cols-def create properties table with SQL column spec + +Examples: + {0} -d stats.db -c cmds.sh -p 'time REAL, mem INTEGER' +""".format(sys.argv[0])) + + +if __name__ == "__main__": + + nojobs = 1 + dbfn = None + cmdfn = None + propdef = None + + try: + opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:") + + for opt, arg in opts: + if opt == "-h": + usage() + sys.exit(os.EX_OK) + elif opt == "-d": + dbfn = arg + elif opt == "-c": + cmdfn = arg + elif opt == "-p": + propdef = arg + else: + print("Unknown option '", opt, "'.") + + except getopt.GetoptError as e: + print("Error parsing arguments:", e) + usage() + sys.exit(os.EX_USAGE) + + if dbfn == None: + print("No database given.") + sys.exit(os.EX_USAGE) + + #try: + conn = sqlite3.connect(dbfn) + createSchema(conn) + + if propdef != None: + createPropertiesTable(conn, propdef) + + if cmdfn != None: + print("Adding jobs...") + cmds = open(cmdfn).readlines() + cmds = [(c.strip(),) for c in cmds] + insertJobs(conn, cmds) + + while True: + jobid = getNextJobId(conn) + if jobid == None: + print("All jobs have been started.") + break + processJob(conn, jobid) + + + conn.close() + -- 2.30.2