]> git.sthu.org Git - paralleljobs.git/commitdiff
Initial import from shutils v1.0
authorStefan Huber <shuber@sthu.org>
Sat, 27 Apr 2013 13:03:36 +0000 (15:03 +0200)
committerStefan Huber <shuber@sthu.org>
Sat, 27 Apr 2013 13:03:36 +0000 (15:03 +0200)
paralleljobs.py [new file with mode: 0755]

diff --git a/paralleljobs.py b/paralleljobs.py
new file mode 100755 (executable)
index 0000000..c9e313d
--- /dev/null
@@ -0,0 +1,259 @@
+#!/usr/bin/python
+
+import sys, getopt, os
+import sqlite3
+import subprocess
+
+verbose = False
+
+def printStatusInfo(conn):
+    c = conn.cursor()
+
+    c.execute("SELECT count(id) FROM jobs;")
+    nototal, = c.fetchone()
+
+    c.execute("SELECT count(id) FROM jobs WHERE done=1;")
+    nodone, = c.fetchone()
+
+    c.execute("SELECT sum(workloadestm) FROM jobs WHERE done=1;")
+    wldone, = c.fetchone()
+    if wldone == None:
+        wldone = 0.0
+
+    c.execute("SELECT sum(workloadestm) FROM jobs;")
+    wltotal, = c.fetchone()
+
+    c.close()
+
+    print(nototal, nodone, wldone, wltotal)
+    perdone = 100.0*float(nodone)/float(nototal)
+    perwl = 100.0*float(wldone)/float(wltotal)
+
+    print("%d (%.1f%%) of %d jobs done. %.1f%% of the workload finished." % \
+            (nodone, perdone, nototal, perwl))
+
+def createPropertiesTable(conn, propdef):
+    conn.execute("BEGIN EXCLUSIVE")
+
+    c = conn.cursor()
+    c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
+    if c.fetchone() == (0,):
+        print("Creating properties table.")
+        sqlstmt = "CREATE TABLE properties (\
+                jobid INTEGER PRIMARY KEY,\
+                %s, \
+                FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
+        c.execute(sqlstmt)
+    c.close()
+    conn.commit()
+
+def runCmd(cmd):
+    proc = subprocess.Popen(cmd, \
+            stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
+    out, err = proc.communicate()
+    exitcode = proc.wait()
+
+    if verbose:
+        print(out, err)
+
+    return exitcode, out, err
+
+def processJob(conn, jobid):
+    print("Process job %d" % (jobid))
+
+    c = conn.cursor()
+    c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
+    cmd, = c.fetchone()
+
+    ec, out, err = runCmd(cmd)
+    c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
+
+    propstr = []
+    for l in out.splitlines():
+        if l.startswith("DB-PROPERTIES:"):
+            propstr += [l[14:]]
+    for l in err.splitlines():
+        if l.startswith("DB-PROPERTIES:"):
+            propstr += [l[14:]]
+
+    prop = {}
+    for ps in propstr:
+        p = eval(ps)
+        for k, v in p.iteritems():
+            prop[k] = v
+
+    if len(prop) > 0:
+        collist = ", ".join([str(k) for k in prop.keys()])
+        collist = "jobid, " + collist
+
+        vallist = ", ".join(["?" for k in prop.keys()])
+        vallist = "?, " + vallist
+
+        c = conn.cursor()
+        sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
+        c.execute(sqlstmt, [jobid] + list(prop.values()))
+
+    c.close()
+    conn.commit()
+
+def insertJobs(conn, cmds):
+    conn.execute("BEGIN EXCLUSIVE")
+    conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
+    conn.commit()
+
+def createSchema(conn):
+
+    c = conn.cursor()
+    c.execute("BEGIN EXCLUSIVE")
+
+    # Create table, if necessary
+    c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
+    if c.fetchone() == (0,):
+        print("Creating jobs table.")
+        conn.execute("CREATE TABLE jobs ( \
+                        id INTEGER PRIMARY KEY AUTOINCREMENT, \
+                        cmd STRING NOT NULL, \
+                        started BOOL DEFAULT (0) NOT NULL, \
+                        done BOOL DEFAULT (0) NOT NULL, \
+                        exitcode INTEGER, \
+                        workloadestm REAL DEFAULT (1) NOT NULL)")
+    c.close()
+    conn.commit()
+
+def getNextJobId(conn):
+
+    c = conn.cursor()
+    c.execute("BEGIN EXCLUSIVE")
+    c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
+
+    r = c.fetchone()
+    if r == None:
+        return None
+
+    jobid, = r
+    conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
+
+    c.close()
+    conn.commit()
+
+    return jobid
+
+
+
+def usage():
+    """Print usage text of this program"""
+
+    print("""
+Take the jobs defined in jobs table of the given database and process one job
+after the other. Multiple instances may be launched against the same database.
+
+Usage:
+  {0} [OPTIONS] [COMMANDS] -d database
+  {0} -h
+
+COMMANDS:
+  -c cmdfn        add jobs from the file with list of commands
+  -h              print this text
+  -s              print status information
+  -w              work on the database
+
+OPTIONS:
+  -d database     the database to process
+  -p cols-def     create properties table with SQL column spec
+  -v              print output of the job's command
+
+Commands may be combined in one call of {0}.
+
+A list of jobs may be importet line-by-line from a file using the -c option.
+Every job may output to stdout or stderr a string of the form
+    DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True }}
+It is assumed that a table 'properties' exists with the columns jobid, key,
+key2, and key3. The corresponding values are inserted into this table. Using
+the option -p such a properties table can be created by giving a list of
+column definitions in SQL style.
+
+The jobs table also contains a 'workloadestm' column that is used when
+estimating the finished workload so far. The entries default to 1 and may be
+set externally.
+
+Examples:
+  # create cmds.sh with jobs
+  echo "ulimit -v 2000000 -t 1200; ./isprime 65535" > cmds.sh
+  echo "ulimit -v 2000000 -t 1200; ./isprime 65537" >> cmds.sh
+  # create an initial database, but do not work
+  {0} -d jobs.db -c cmds.sh \\
+      -p 'number INTEGER, time REAL, mem INTEGER'
+  # launch two workers
+  {0} -d jobs.db -w &
+  {0} -d jobs.db -w &
+  # print status info
+  {0} -d jobs.db -s
+""".format(sys.argv[0]))
+
+
+if __name__ == "__main__":
+
+    nojobs = 1
+    dbfn = None
+    cmdfn = None
+    propdef = None
+    work = False
+    status = False
+
+    try:
+        opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:wsv")
+
+        for opt, arg in opts:
+            if opt == "-h":
+                usage()
+                sys.exit(os.EX_OK)
+            elif opt == "-d":
+                dbfn = arg
+            elif opt == "-c":
+                cmdfn = arg
+            elif opt == "-p":
+                propdef = arg
+            elif opt == "-w":
+                work = True
+            elif opt == "-s":
+                status = True
+            elif opt == "-v":
+                verbose = True
+            else:
+                print("Unknown option '", opt, "'.")
+
+    except getopt.GetoptError as e:
+        print("Error parsing arguments:", e)
+        usage()
+        sys.exit(os.EX_USAGE)
+
+    if dbfn == None:
+        print("No database given.")
+        sys.exit(os.EX_USAGE)
+
+    conn = sqlite3.connect(dbfn)
+    createSchema(conn)
+
+    if status:
+        printStatusInfo(conn)
+
+    if propdef != None:
+        createPropertiesTable(conn, propdef)
+
+    if cmdfn != None:
+        print("Adding jobs...")
+        cmds = open(cmdfn).readlines()
+        cmds = [(c.strip(),) for c in cmds]
+        insertJobs(conn, cmds)
+
+    if work:
+        while True:
+            jobid = getNextJobId(conn)
+            if jobid == None:
+                print("All jobs have been started.")
+                break
+            processJob(conn, jobid)
+
+
+    conn.close()
+