#!/usr/bin/env python
'''
Submit jobs on cluster via PBS qsub command
Gao Wang (copyleft) 2012
ChangeLog
---------
2014-May-20
* Add "--batch" option
2014-May-15
* Add "-H" option
2013-March-17
* Allow for additional pbs attributes via "qsub -W" switch
2012-April-02
* Initial implementation
'''
import sys, os, re, random, glob
from time import sleep
from subprocess import Popen, PIPE
try:
from argparse import ArgumentParser
except:
sys.exit('argparse module is required (available in Python 2.7.2+ or Python 3.2.1+)')
class qsub:
def __init__(self, size, hold_size, name, nodes, ppn, mem, walltime, queue, directory, emails,
additional_attributes, parallel = False, pbs = None, dry_run = False):
self.size = size
self.hold_size = hold_size
self.name = name
self.nodes = nodes
self.ppn = ppn
self.mem = mem
self.walltime = walltime
self.queue = queue
self.dir = directory
self.emails = emails
self.parallel = parallel
self.pbs = pbs
if not self.dir:
self.dir = os.getcwd()
if parallel:
try:
Popen(['parallel','--help'], shell = False, stdout = PIPE, stderr = PIPE)
except OSError:
sys.stderr.write("Parallel mode disabled since 'GNU parallel' is not installed.\n")
self.parallel = False
if not re.match("(.+?):(.+?):(.+?)", self.walltime):
sys.exit('Invalid walltime {0}'.format(self.walltime))
bad_addr = []
for email in self.emails:
if not re.match("^[a-zA-Z0-9._%-]+@[a-zA-Z0-9._%-]+.[a-zA-Z]{2,6}$", email):
sys.stderr.write('Invalid email address ignored: {0}'.format(email))
bad_addr.append(email)
self.emails = [x for x in self.emails if x not in bad_addr]
if dry_run:
self.exe = ['echo']
else:
self.exe = ['qsub'] + ((['-W'] + additional_attributes) if len(additional_attributes) else [])
def submit(self, cmds):
if not self.size:
self.size = len(cmds)
i = j = 0
while (i < len(cmds)):
k = i
i += self.size
j += 1
jext = '' if self.size == len(cmds) else '_%s' % j
cmd = '#PBS -S /bin/bash\n#PBS -V\n#PBS -N %s\n' % (self.name + jext)
if len(self.emails):
cmd += '#PBS -M %s\n#PBS -m abe\n' % ', '.join(self.emails)
if self.queue:
cmd += '#PBS -q {0}\n'.format(random.choice(self.queue))
if self.hold_size:
cmd += '#PBS -t 1-{0}%{1}\n'.format(min(i, len(cmds)) - k, self.hold_size)
cmd += '#PBS -l nodes=%s:ppn=%s,walltime=%s\n' % (self.nodes, self.ppn, self.walltime)
if self.mem:
cmd += '#PBS -l mem=%s\n' % self.mem
cmd += '#PBS -o {0}.out\n#PBS -e {0}.err\n'.format(self.name + jext)
# Di Zhang suggests using cd instead of #PBS -d
# cmd += '#PBS -d %s\n' % self.dir
cmd += 'cd %s\n' % self.dir
# cmd += 'cat $PBS_NODEFILE > %s\n' % (os.path.join(self.dir, self.name + jext + '.node'))
#
if self.parallel:
if self.nodes == 1:
cmd += 'parallel -j %s << EOF\n' % self.ppn
else:
cmd += 'parallel -j %s -u --sshloginfile $PBS_NODEFILE --wd $PWD << EOF\n' % self.ppn
#
if self.hold_size:
cmd += '\n'.join(['if [ $PBS_ARRAYID -eq {0} ]; then {1}; fi'.format(idx + 1, item)
for idx, item in enumerate(cmds[k:min(i, len(cmds))])])
else:
cmd += '\n'.join(cmds[k:min(i, len(cmds))])
#
if self.parallel:
cmd += '\nEOF\nexit 0\n'
else:
cmd += '\nexit 0\n'
# submit job
p = Popen(self.exe, shell = False, stdin = PIPE, stdout = PIPE, stderr = PIPE, close_fds = True)
decode = True if sys.version_info[0] > 2 else False
(cout, cerr) = p.communicate(cmd.encode(sys.getdefaultencoding()) if decode else cmd)
# print job name
output = cout.decode(sys.getdefaultencoding()).rstrip() if decode else cout.rstrip()
if output:
print(output)
# keep backup scripts
if self.pbs:
with open(self.pbs + jext + '.pbs', 'wb') as f:
f.write(cmd.encode(sys.getdefaultencoding()) if decode else cmd)
# take a break
if self.exe != ['echo']:
sleep(0.1)
# sleep for a while
if j % 250 == 0 and self.exe != ['echo']:
sys.stderr.write('250 jobs submitted; taking a 10s break.\n')
sleep(10)
class BatchSubmitter:
def __init__(self, dirname):
self.files = glob.glob(os.path.join(dirname, '*.pbs'))
self.dir = os.path.join(dirname, 'submitted')
os.system('rm -rf {0}; mkdir {0}'.format(self.dir))
self.total = len(self.files)
self.submitted = 0
def submit(self, limit, interval):
self.limit = limit
print >> open(self.dir + '.limit', 'w'), limit
done = False
while not done:
for i in range(self.__getslots()):
try:
os.system('qsub {0}; mv {0} {1}'.format(self.files.pop(0), self.dir))
self.submitted += 1
sleep(0.5)
except IndexError:
done = True
break
print >> open(self.dir + '.log', 'w'), '%s/%s submitted' % (self.submitted, self.total)
sleep(interval)
def __getslots(self):
os.system("qstat | awk '{print $5}' | grep -P 'Q|R|\-|Time' | wc -l &> %s" % (self.dir + '.qstat'))
os.system("qstat | wc -l >> %s 2>&1" % (self.dir + '.qstat'))
try:
with open(self.dir + '.limit', 'r') as f:
total = int(f.readlines()[0].split()[0])
except:
total = self.limit
try:
with open(self.dir + '.qstat', 'r') as f:
counts = [int(x.split()[0]) for x in f.readlines()]
if counts[0] < 2 and counts[1] > 0:
# something is wrong with qstat
counts[0] = total + 2
except:
counts = [total + 2, 0]
return max(total - (counts[0] - 2), 0)
if __name__ == '__main__':
parser = ArgumentParser(description='A handy program to send commands to the Portable Batch System (PBS) in cluster environments. Commands should be piped to this program from standard input stream.',epilog = '''2012 (copyleft) Gao Wang''' )
parser.add_argument('name', help = 'job name')
parser.add_argument('-s', '--size', metavar = 'N', type = int, help = 'number of commands per job, default to submitting all commands as one job')
parser.add_argument('-H', '--hold-size', metavar = 'N', type = int, dest = 'hold_size',
help = 'triggers "array mode" per job and defines number of commands per hold')
parser.add_argument('-n', '--nodes', metavar = 'N', type = int, default = 1, help = 'number of nodes reserved per job, default to 1')
parser.add_argument('-p', '--ppn', metavar = 'N', type = int, default = 1, help = 'number of cpus reserved per node, default to 1')
parser.add_argument('-m', '--mem', help = 'reserved memory with units be b (bytes), w (words), kb, kw, mb, mw, gb or gw')
parser.add_argument('-w', '--walltime', default = '240:00:00', help = 'reserved computation time (hh:mm:ss), default to 240:00:00')
parser.add_argument('-q', '--queue', nargs = '+', help = 'name of the queue to send jobs to, can be multiple queues to randomly chose from')
parser.add_argument('-d', '--dir', help = 'directory under which the jobs will be executed, default to current directory')
parser.add_argument('-e', '--emails', nargs = '+', default = [], help = 'email notification addresses')
parser.add_argument('-a', '--additional_attributes', metavar = 'OPT', nargs = '+', default = [], help = 'additional qsub attributes, e.g., -a "group_list=bigmem"')
parser.add_argument('--remove-colon', dest = 'rmcolon', action = 'store_true', help = 'replace colon with line break')
parser.add_argument('--parallel', action = 'store_true',
help = 'use parallel mode, powered by GNU parallel, and will overwrite "--hold-size" option when triggered')
parser.add_argument('--save', action = 'store_true', help = 'save pbs scripts to disk')
parser.add_argument('--dry-run', action = 'store_true', dest = 'dry_run',
help = 'perform a trial run which saves pbs scripts to disk without submitting any job')
parser.add_argument('--batch', metavar = 'N', nargs = 2, type = int, default = [0, 0],
help = 'submit N1 jobs at a time; check every N2 seconds to submit additional jobs if any of the previous N1 jobs are done')
args = parser.parse_args()
#
try:
if args.rmcolon:
cmds = [y.strip() for y in ''.join([re.sub(r'\\(\s*)\n', ' ', re.sub('\r\n|(\s*);(\s*)','\n', x)) for x in sys.stdin.readlines() if not x.startswith('#') and len(x.strip()) > 0]).split('\n') if y.strip()]
else:
cmds = [y.strip() for y in ''.join([re.sub(r'\\(\s*)\n', ' ', x.replace('\r\n','\n')) for x in sys.stdin.readlines() if not x.startswith('#') and len(x.strip()) > 0]).split('\n') if y.strip()]
except KeyboardInterrupt:
sys.exit("Standard input stream expected")
# various adjustments
if len(cmds) == 0:
sys.exit(0)
if not args.size:
args.size = len(cmds)
if args.size >= len(cmds):
args.batch[0] = 0
if args.batch[0] > 0 and args.batch[1] > 0:
args.dry_run = True
if args.dry_run:
args.save = True
if args.dir:
args.dir = os.path.abspath(os.path.expanduser(args.dir))
if not os.path.isdir(args.dir):
sys.exit("Cannot find directory {}".format(args.dir))
os.chdir(args.dir)
fn = dirname = None
if args.save:
if args.size < len(cmds):
dirname = args.name + '_PBS'
os.system('rm -rf %s; mkdir %s' % (dirname, dirname))
fn = os.path.join(dirname, args.name)
else:
fn = args.name
if args.parallel:
args.hold_size = None
if args.hold_size and args.size <= args.hold_size:
sys.exit("Array hold blocks must be smaller than total of commands in the array!")
# submit job
q = qsub(args.size, args.hold_size, args.name, args.nodes, args.ppn, args.mem, args.walltime, args.queue,
args.dir, args.emails, args.additional_attributes, args.parallel, fn, args.dry_run)
try:
q.submit(cmds)
except Exception as e:
sys.exit("ERROR: %s" % (e))
# batch submit job
if args.batch[0] > 0 and args.batch[1] > 0:
bs = BatchSubmitter(dirname)
bs.submit(args.batch[0], args.batch[1])