]> git.sthu.org Git - shutils.git/commitdiff
Add a parallel job processing tool
authorStefan Huber <shuber@sthu.org>
Fri, 26 Apr 2013 20:37:12 +0000 (22:37 +0200)
committerStefan Huber <shuber@sthu.org>
Fri, 26 Apr 2013 20:37:12 +0000 (22:37 +0200)
paralleljobs.py [new file with mode: 0755]

diff --git a/paralleljobs.py b/paralleljobs.py
new file mode 100755 (executable)
index 0000000..ba579a4
--- /dev/null
@@ -0,0 +1,188 @@
+#!/usr/bin/python
+
+import sys, getopt, os
+import sqlite3
+import subprocess
+
+
+def createPropertiesTable(conn, propdef):
+    conn.execute("BEGIN EXCLUSIVE")
+
+    c = conn.cursor()
+    c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
+    if c.fetchone() == (0,):
+        print("Creating properties table.")
+        sqlstmt = "CREATE TABLE properties (jobid INTEGER PRIMARY KEY, %s, \
+                FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
+        c.execute(sqlstmt)
+    conn.commit()
+
+def runCmd(cmd):
+    proc = subprocess.Popen(cmd, \
+            stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
+    out, err = proc.communicate()
+    exitcode = proc.wait()
+    return exitcode, out, err
+
+def processJob(conn, jobid):
+    print("Process job %d" % (jobid))
+
+    c = conn.cursor()
+    c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
+    cmd, = c.fetchone()
+
+    ec, out, err = runCmd(cmd)
+    c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
+
+    propstr = []
+    for l in out.splitlines():
+        if l.startswith("DB-PROPERTIES:"):
+            propstr += [l[14:]]
+    for l in err.splitlines():
+        if l.startswith("DB-PROPERTIES:"):
+            propstr += [l[14:]]
+
+    prop = {}
+    for ps in propstr:
+        p = eval(ps)
+        for k, v in p.iteritems():
+            prop[k] = v
+
+    if len(prop) > 0:
+        collist = ", ".join([str(k) for k in prop.keys()])
+        collist = "jobid, " + collist
+
+        vallist = ", ".join(["?" for k in prop.keys()])
+        vallist = "?, " + vallist
+
+        c = conn.cursor()
+        sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
+        c.execute(sqlstmt, [jobid] + list(prop.values()))
+
+    conn.commit()
+
+def insertJobs(conn, cmds):
+    conn.execute("BEGIN EXCLUSIVE")
+    conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
+    conn.commit()
+
+def createSchema(conn):
+
+    c = conn.cursor()
+    c.execute("BEGIN EXCLUSIVE")
+
+    # Create table, if necessary
+    c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
+    if c.fetchone() == (0,):
+        print("Creating jobs table.")
+        conn.execute("CREATE TABLE jobs ( \
+                        id INTEGER PRIMARY KEY AUTOINCREMENT, \
+                        cmd STRING NOT NULL, \
+                        started BOOL DEFAULT (0) NOT NULL, \
+                        done BOOL DEFAULT (0) NOT NULL, \
+                        exitcode INTEGER )")
+    conn.commit()
+
+def getNextJobId(conn):
+
+    c = conn.cursor()
+    c.execute("BEGIN EXCLUSIVE")
+    c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
+
+    r = c.fetchone()
+    if r == None:
+        return None
+
+    jobid, = r
+    conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
+    conn.commit()
+
+    return jobid
+
+
+
+def usage():
+    """Print usage text of this program"""
+
+    print("""
+Take the jobs defined in jobs table of given database and process one job after
+the other. Multiple instances may be launched against the same database.
+
+A list of jobs may be importet line-by-line from a file using the -c option.
+Every job may output to stdout or stderr a string of the form
+  DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True}}
+It is assumed that a table 'properties' exists with the columns jobid, key,
+key2, and key3. The corresponding values are inserted into this table. Using
+the option -p such a properties table can be created by giving a list of
+column definitions in SQL style.
+
+Usage:
+  {0} [OPTIONS] -d database
+  {0} -h
+
+OPTIONS:
+  -c cmdfn        add jobs from the file with list of commands
+  -d database     the database to process
+  -h              print this text
+  -p cols-def     create properties table with SQL column spec
+
+Examples:
+  {0} -d stats.db -c cmds.sh -p 'time REAL, mem INTEGER'
+""".format(sys.argv[0]))
+
+
+if __name__ == "__main__":
+
+    nojobs = 1
+    dbfn = None
+    cmdfn = None
+    propdef = None
+
+    try:
+        opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:")
+
+        for opt, arg in opts:
+            if opt == "-h":
+                usage()
+                sys.exit(os.EX_OK)
+            elif opt == "-d":
+                dbfn = arg
+            elif opt == "-c":
+                cmdfn = arg
+            elif opt == "-p":
+                propdef = arg
+            else:
+                print("Unknown option '", opt, "'.")
+
+    except getopt.GetoptError as e:
+        print("Error parsing arguments:", e)
+        usage()
+        sys.exit(os.EX_USAGE)
+
+    if dbfn == None:
+        print("No database given.")
+        sys.exit(os.EX_USAGE)
+
+    #try:
+    conn = sqlite3.connect(dbfn)
+    createSchema(conn)
+
+    if propdef != None:
+        createPropertiesTable(conn, propdef)
+
+    if cmdfn != None:
+        print("Adding jobs...")
+        cmds = open(cmdfn).readlines()
+        cmds = [(c.strip(),) for c in cmds]
+        insertJobs(conn, cmds)
+
+    while True:
+        jobid = getNextJobId(conn)
+        if jobid == None:
+            print("All jobs have been started.")
+            break
+        processJob(conn, jobid)
+
+
+    conn.close()
+