From: Stefan Huber Date: Sat, 27 Apr 2013 13:04:01 +0000 (+0200) Subject: Moving paralleljobs to dedicated repo X-Git-Url: https://git.sthu.org/?a=commitdiff_plain;h=72adf7bc52ad98978a0c1856b817fc0416256703;p=shutils.git Moving paralleljobs to dedicated repo --- diff --git a/paralleljobs.py b/paralleljobs.py deleted file mode 100755 index c9e313d..0000000 --- a/paralleljobs.py +++ /dev/null @@ -1,259 +0,0 @@ -#!/usr/bin/python - -import sys, getopt, os -import sqlite3 -import subprocess - -verbose = False - -def printStatusInfo(conn): - c = conn.cursor() - - c.execute("SELECT count(id) FROM jobs;") - nototal, = c.fetchone() - - c.execute("SELECT count(id) FROM jobs WHERE done=1;") - nodone, = c.fetchone() - - c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;") - wldone, = c.fetchone() - if wldone == None: - wldone = 0.0 - - c.execute("SELECT sum(workloadestm) FROM jobs;") - wltotal, = c.fetchone() - - c.close() - - print(nototal, nodone, wldone, wltotal) - perdone = 100.0*float(nodone)/float(nototal) - perwl = 100.0*float(wldone)/float(wltotal) - - print("%d (%.1f%%) of %d jobs done. %.1f%% of the workload finished." % \ - (nodone, perdone, nototal, perwl)) - -def createPropertiesTable(conn, propdef): - conn.execute("BEGIN EXCLUSIVE") - - c = conn.cursor() - c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';") - if c.fetchone() == (0,): - print("Creating properties table.") - sqlstmt = "CREATE TABLE properties (\ - jobid INTEGER PRIMARY KEY,\ - %s, \ - FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,) - c.execute(sqlstmt) - c.close() - conn.commit() - -def runCmd(cmd): - proc = subprocess.Popen(cmd, \ - stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - out, err = proc.communicate() - exitcode = proc.wait() - - if verbose: - print(out, err) - - return exitcode, out, err - -def processJob(conn, jobid): - print("Process job %d" % (jobid)) - - c = conn.cursor() - c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,)) - cmd, = c.fetchone() - - ec, out, err = runCmd(cmd) - c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid)) - - propstr = [] - for l in out.splitlines(): - if l.startswith("DB-PROPERTIES:"): - propstr += [l[14:]] - for l in err.splitlines(): - if l.startswith("DB-PROPERTIES:"): - propstr += [l[14:]] - - prop = {} - for ps in propstr: - p = eval(ps) - for k, v in p.iteritems(): - prop[k] = v - - if len(prop) > 0: - collist = ", ".join([str(k) for k in prop.keys()]) - collist = "jobid, " + collist - - vallist = ", ".join(["?" for k in prop.keys()]) - vallist = "?, " + vallist - - c = conn.cursor() - sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist) - c.execute(sqlstmt, [jobid] + list(prop.values())) - - c.close() - conn.commit() - -def insertJobs(conn, cmds): - conn.execute("BEGIN EXCLUSIVE") - conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds) - conn.commit() - -def createSchema(conn): - - c = conn.cursor() - c.execute("BEGIN EXCLUSIVE") - - # Create table, if necessary - c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';") - if c.fetchone() == (0,): - print("Creating jobs table.") - conn.execute("CREATE TABLE jobs ( \ - id INTEGER PRIMARY KEY AUTOINCREMENT, \ - cmd STRING NOT NULL, \ - started BOOL DEFAULT (0) NOT NULL, \ - done BOOL DEFAULT (0) NOT NULL, \ - exitcode INTEGER, \ - workloadestm REAL DEFAULT (1) NOT NULL)") - c.close() - conn.commit() - -def getNextJobId(conn): - - c = conn.cursor() - c.execute("BEGIN EXCLUSIVE") - c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;") - - r = c.fetchone() - if r == None: - return None - - jobid, = r - conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,)) - - c.close() - conn.commit() - - return jobid - - - -def usage(): - """Print usage text of this program""" - - print(""" -Take the jobs defined in jobs table of the given database and process one job -after the other. Multiple instances may be launched against the same database. - -Usage: - {0} [OPTIONS] [COMMANDS] -d database - {0} -h - -COMMANDS: - -c cmdfn add jobs from the file with list of commands - -h print this text - -s print status information - -w work on the database - -OPTIONS: - -d database the database to process - -p cols-def create properties table with SQL column spec - -v print output of the job's command - -Commands may be combined in one call of {0}. - -A list of jobs may be importet line-by-line from a file using the -c option. -Every job may output to stdout or stderr a string of the form - DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }} -It is assumed that a table 'properties' exists with the columns jobid, key, -key2, and key3. The corresponding values are inserted into this table. Using -the option -p such a properties table can be created by giving a list of -column definitions in SQL style. - -The jobs table also contains a 'workloadestm' column that is used when -estimating the finished workload so far. The entries default to 1 and may be -set externally. - -Examples: - # create cmds.sh with jobs - echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh - echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh - # create an initial database, but do not work - {0} -d jobs.db -c cmds.sh \\ - -p 'number INTEGER, time REAL, mem INTEGER' - # launch two workers - {0} -d jobs.db -w & - {0} -d jobs.db -w & - # print status info - {0} -d jobs.db -s -""".format(sys.argv[0])) - - -if __name__ == "__main__": - - nojobs = 1 - dbfn = None - cmdfn = None - propdef = None - work = False - status = False - - try: - opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsv") - - for opt, arg in opts: - if opt == "-h": - usage() - sys.exit(os.EX_OK) - elif opt == "-d": - dbfn = arg - elif opt == "-c": - cmdfn = arg - elif opt == "-p": - propdef = arg - elif opt == "-w": - work = True - elif opt == "-s": - status = True - elif opt == "-v": - verbose = True - else: - print("Unknown option '", opt, "'.") - - except getopt.GetoptError as e: - print("Error parsing arguments:", e) - usage() - sys.exit(os.EX_USAGE) - - if dbfn == None: - print("No database given.") - sys.exit(os.EX_USAGE) - - conn = sqlite3.connect(dbfn) - createSchema(conn) - - if status: - printStatusInfo(conn) - - if propdef != None: - createPropertiesTable(conn, propdef) - - if cmdfn != None: - print("Adding jobs...") - cmds = open(cmdfn).readlines() - cmds = [(c.strip(),) for c in cmds] - insertJobs(conn, cmds) - - if work: - while True: - jobid = getNextJobId(conn) - if jobid == None: - print("All jobs have been started.") - break - processJob(conn, jobid) - - - conn.close() -