Add a parallel job processing tool
[shutils.git] / paralleljobs.py
1 #!/usr/bin/python
2
3 import sys, getopt, os
4 import sqlite3
5 import subprocess
6
7
8 def createPropertiesTable(conn, propdef):
9 conn.execute("BEGIN EXCLUSIVE")
10
11 c = conn.cursor()
12 c.execute("SELECT count(name) FROM sqlite_master WHERE name='properties';")
13 if c.fetchone() == (0,):
14 print("Creating properties table.")
15 sqlstmt = "CREATE TABLE properties (jobid INTEGER PRIMARY KEY, %s, \
16 FOREIGN KEY (jobid) REFERENCES jobs (id));" % (propdef,)
17 c.execute(sqlstmt)
18 conn.commit()
19
20 def runCmd(cmd):
21 proc = subprocess.Popen(cmd, \
22 stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
23 out, err = proc.communicate()
24 exitcode = proc.wait()
25 return exitcode, out, err
26
27 def processJob(conn, jobid):
28 print("Process job %d" % (jobid))
29
30 c = conn.cursor()
31 c.execute("SELECT cmd FROM jobs WHERE id=?", (jobid,))
32 cmd, = c.fetchone()
33
34 ec, out, err = runCmd(cmd)
35 c.execute("UPDATE jobs SET exitcode=?, done=1 WHERE id=?;", (ec, jobid))
36
37 propstr = []
38 for l in out.splitlines():
39 if l.startswith("DB-PROPERTIES:"):
40 propstr += [l[14:]]
41 for l in err.splitlines():
42 if l.startswith("DB-PROPERTIES:"):
43 propstr += [l[14:]]
44
45 prop = {}
46 for ps in propstr:
47 p = eval(ps)
48 for k, v in p.iteritems():
49 prop[k] = v
50
51 if len(prop) > 0:
52 collist = ", ".join([str(k) for k in prop.keys()])
53 collist = "jobid, " + collist
54
55 vallist = ", ".join(["?" for k in prop.keys()])
56 vallist = "?, " + vallist
57
58 c = conn.cursor()
59 sqlstmt = "INSERT INTO properties (%s) VALUES (%s);" % (collist,vallist)
60 c.execute(sqlstmt, [jobid] + list(prop.values()))
61
62 conn.commit()
63
64 def insertJobs(conn, cmds):
65 conn.execute("BEGIN EXCLUSIVE")
66 conn.executemany("INSERT INTO jobs (cmd) VALUES (?);", cmds)
67 conn.commit()
68
69 def createSchema(conn):
70
71 c = conn.cursor()
72 c.execute("BEGIN EXCLUSIVE")
73
74 # Create table, if necessary
75 c.execute("SELECT count(name) FROM sqlite_master WHERE name='jobs';")
76 if c.fetchone() == (0,):
77 print("Creating jobs table.")
78 conn.execute("CREATE TABLE jobs ( \
79 id INTEGER PRIMARY KEY AUTOINCREMENT, \
80 cmd STRING NOT NULL, \
81 started BOOL DEFAULT (0) NOT NULL, \
82 done BOOL DEFAULT (0) NOT NULL, \
83 exitcode INTEGER )")
84 conn.commit()
85
86 def getNextJobId(conn):
87
88 c = conn.cursor()
89 c.execute("BEGIN EXCLUSIVE")
90 c.execute("SELECT id FROM jobs WHERE NOT started=1 LIMIT 1;")
91
92 r = c.fetchone()
93 if r == None:
94 return None
95
96 jobid, = r
97 conn.execute("UPDATE jobs SET started=1 WHERE id=?;", (jobid,))
98 conn.commit()
99
100 return jobid
101
102
103
104 def usage():
105 """Print usage text of this program"""
106
107 print("""
108 Take the jobs defined in jobs table of given database and process one job after
109 the other. Multiple instances may be launched against the same database.
110
111 A list of jobs may be importet line-by-line from a file using the -c option.
112 Every job may output to stdout or stderr a string of the form
113 DB-PROPERTIES: {{ "key": "value", "key2": 1.23, "key3": True}}
114 It is assumed that a table 'properties' exists with the columns jobid, key,
115 key2, and key3. The corresponding values are inserted into this table. Using
116 the option -p such a properties table can be created by giving a list of
117 column definitions in SQL style.
118
119 Usage:
120 {0} [OPTIONS] -d database
121 {0} -h
122
123 OPTIONS:
124 -c cmdfn add jobs from the file with list of commands
125 -d database the database to process
126 -h print this text
127 -p cols-def create properties table with SQL column spec
128
129 Examples:
130 {0} -d stats.db -c cmds.sh -p 'time REAL, mem INTEGER'
131 """.format(sys.argv[0]))
132
133
134 if __name__ == "__main__":
135
136 nojobs = 1
137 dbfn = None
138 cmdfn = None
139 propdef = None
140
141 try:
142 opts, args = getopt.getopt(sys.argv[1:], "hd:c:p:")
143
144 for opt, arg in opts:
145 if opt == "-h":
146 usage()
147 sys.exit(os.EX_OK)
148 elif opt == "-d":
149 dbfn = arg
150 elif opt == "-c":
151 cmdfn = arg
152 elif opt == "-p":
153 propdef = arg
154 else:
155 print("Unknown option '", opt, "'.")
156
157 except getopt.GetoptError as e:
158 print("Error parsing arguments:", e)
159 usage()
160 sys.exit(os.EX_USAGE)
161
162 if dbfn == None:
163 print("No database given.")
164 sys.exit(os.EX_USAGE)
165
166 #try:
167 conn = sqlite3.connect(dbfn)
168 createSchema(conn)
169
170 if propdef != None:
171 createPropertiesTable(conn, propdef)
172
173 if cmdfn != None:
174 print("Adding jobs...")
175 cmds = open(cmdfn).readlines()
176 cmds = [(c.strip(),) for c in cmds]
177 insertJobs(conn, cmds)
178
179 while True:
180 jobid = getNextJobId(conn)
181 if jobid == None:
182 print("All jobs have been started.")
183 break
184 processJob(conn, jobid)
185
186
187 conn.close()
188