From c0720bddfafeeba49eeb16524c15ab88fd960bc8 Mon Sep 17 00:00:00 2001 From: Stefan Huber Date: Sat, 27 Apr 2013 15:03:36 +0200 Subject: [PATCH] Initial import from shutils --- paralleljobs.py | 259 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) create mode 100755 paralleljobs.py diff --git a/paralleljobs.py b/paralleljobs.py new file mode 100755 index 0000000..c9e313d --- /dev/null +++ b/paralleljobs.py @@ -0,0 +1,259 @@ +#!/usr/bin/python + +import sys, getopt, os +import sqlite3 +import subprocess + +verbose = False + +def printStatusInfo(conn): + c = conn.cursor() + + c.execute("SELECT count(id) FROM jobs;") + nototal, = c.fetchone() + + c.execute("SELECT count(id) FROM jobs WHERE done=1;") + nodone, = c.fetchone() + + c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;") + wldone, = c.fetchone() + if wldone == None: + wldone = 0.0 + + c.execute("SELECT sum(workloadestm) FROM jobs;") + wltotal, = c.fetchone() + + c.close() + + print(nototal, nodone, wldone, wltotal) + perdone = 100.0*float(nodone)/float(nototal) + perwl = 100.0*float(wldone)/float(wltotal) + + print("%d (%.1f%%) of %d jobs done. %.1f%% of the workload finished." % \ + (nodone, perdone, nototal, perwl)) + +def createPropertiesTable(conn, propdef): + conn.execute("BEGIN EXCLUSIVE") + + c = conn.cursor() + c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';") + if c.fetchone() == (0,): + print("Creating properties table.") + sqlstmt = "CREATE TABLE properties (\ + jobid INTEGER PRIMARY KEY,\ + %s, \ + FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,) + c.execute(sqlstmt) + c.close() + conn.commit() + +def runCmd(cmd): + proc = subprocess.Popen(cmd, \ + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + out, err = proc.communicate() + exitcode = proc.wait() + + if verbose: + print(out, err) + + return exitcode, out, err + +def processJob(conn, jobid): + print("Process job %d" % (jobid)) + + c = conn.cursor() + c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,)) + cmd, = c.fetchone() + + ec, out, err = runCmd(cmd) + c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid)) + + propstr = [] + for l in out.splitlines(): + if l.startswith("DB-PROPERTIES:"): + propstr += [l[14:]] + for l in err.splitlines(): + if l.startswith("DB-PROPERTIES:"): + propstr += [l[14:]] + + prop = {} + for ps in propstr: + p = eval(ps) + for k, v in p.iteritems(): + prop[k] = v + + if len(prop) > 0: + collist = ", ".join([str(k) for k in prop.keys()]) + collist = "jobid, " + collist + + vallist = ", ".join(["?" for k in prop.keys()]) + vallist = "?, " + vallist + + c = conn.cursor() + sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist) + c.execute(sqlstmt, [jobid] + list(prop.values())) + + c.close() + conn.commit() + +def insertJobs(conn, cmds): + conn.execute("BEGIN EXCLUSIVE") + conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds) + conn.commit() + +def createSchema(conn): + + c = conn.cursor() + c.execute("BEGIN EXCLUSIVE") + + # Create table, if necessary + c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';") + if c.fetchone() == (0,): + print("Creating jobs table.") + conn.execute("CREATE TABLE jobs ( \ + id INTEGER PRIMARY KEY AUTOINCREMENT, \ + cmd STRING NOT NULL, \ + started BOOL DEFAULT (0) NOT NULL, \ + done BOOL DEFAULT (0) NOT NULL, \ + exitcode INTEGER, \ + workloadestm REAL DEFAULT (1) NOT NULL)") + c.close() + conn.commit() + +def getNextJobId(conn): + + c = conn.cursor() + c.execute("BEGIN EXCLUSIVE") + c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;") + + r = c.fetchone() + if r == None: + return None + + jobid, = r + conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,)) + + c.close() + conn.commit() + + return jobid + + + +def usage(): + """Print usage text of this program""" + + print(""" +Take the jobs defined in jobs table of the given database and process one job +after the other. Multiple instances may be launched against the same database. + +Usage: + {0} [OPTIONS] [COMMANDS] -d database + {0} -h + +COMMANDS: + -c cmdfn add jobs from the file with list of commands + -h print this text + -s print status information + -w work on the database + +OPTIONS: + -d database the database to process + -p cols-def create properties table with SQL column spec + -v print output of the job's command + +Commands may be combined in one call of {0}. + +A list of jobs may be importet line-by-line from a file using the -c option. +Every job may output to stdout or stderr a string of the form + DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }} +It is assumed that a table 'properties' exists with the columns jobid, key, +key2, and key3. The corresponding values are inserted into this table. Using +the option -p such a properties table can be created by giving a list of +column definitions in SQL style. + +The jobs table also contains a 'workloadestm' column that is used when +estimating the finished workload so far. The entries default to 1 and may be +set externally. + +Examples: + # create cmds.sh with jobs + echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh + echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh + # create an initial database, but do not work + {0} -d jobs.db -c cmds.sh \\ + -p 'number INTEGER, time REAL, mem INTEGER' + # launch two workers + {0} -d jobs.db -w & + {0} -d jobs.db -w & + # print status info + {0} -d jobs.db -s +""".format(sys.argv[0])) + + +if __name__ == "__main__": + + nojobs = 1 + dbfn = None + cmdfn = None + propdef = None + work = False + status = False + + try: + opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsv") + + for opt, arg in opts: + if opt == "-h": + usage() + sys.exit(os.EX_OK) + elif opt == "-d": + dbfn = arg + elif opt == "-c": + cmdfn = arg + elif opt == "-p": + propdef = arg + elif opt == "-w": + work = True + elif opt == "-s": + status = True + elif opt == "-v": + verbose = True + else: + print("Unknown option '", opt, "'.") + + except getopt.GetoptError as e: + print("Error parsing arguments:", e) + usage() + sys.exit(os.EX_USAGE) + + if dbfn == None: + print("No database given.") + sys.exit(os.EX_USAGE) + + conn = sqlite3.connect(dbfn) + createSchema(conn) + + if status: + printStatusInfo(conn) + + if propdef != None: + createPropertiesTable(conn, propdef) + + if cmdfn != None: + print("Adding jobs...") + cmds = open(cmdfn).readlines() + cmds = [(c.strip(),) for c in cmds] + insertJobs(conn, cmds) + + if work: + while True: + jobid = getNextJobId(conn) + if jobid == None: + print("All jobs have been started.") + break + processJob(conn, jobid) + + + conn.close() + -- 2.39.5