Source code for HTSeqCountCluster.pbsjob.qstat
"""Access qstat information about SGE jobs."""
from subprocess import check_output, CalledProcessError
import getpass
import re
from HTSeqCountCluster.logger import Logger
[docs]class Qstat(object):
def __init__(self):
"""Initialize class."""
_username = getpass.getuser()
self.username = _username
self.split_regex = re.compile(r'\s+')
self.qstat_log = Logger().default(logname="qstat", logfile=None)
[docs] def qstatinfo(self, qstat_path='qstat'):
"""Retrieve qstat output.
:param qstat_path: [description], defaults to 'qstat'
:type qstat_path: str, optional
"""
try:
qstatinfo = check_output([qstat_path])
except CalledProcessError as cpe:
return_code = 'qstat returncode: %s' % cpe.returncode
std_error = 'qstat standard output: %s' % cpe.stderr
self.qstat_log(return_code + '\n' + std_error)
except FileNotFoundError:
raise FileNotFoundError('qstat is not on your machine.')
else:
jobs = self._output_parser(qstatinfo)
return jobs
def _output_parser(self, output):
"""Parse output from qstat pbs commandline program.
Returns a list of dictionaries for each job.
:param output: The qstat output.
:type output: [type]
"""
lines = output.decode('utf-8').split('\n')
del lines[:5]
jobs = []
for line in lines:
els = self.split_regex.split(line)
try:
j = {"job_id": els[0], "name": els[1], "user": els[2],
"elapsed_time": els[3], "status": els[4],
"queue": els[5]}
jobs.append(j)
except IndexError:
pass
return jobs
[docs] def all_job_ids(self):
"""Retrieve a list of all jobs running or queued."""
jobs = self.qstatinfo()
ids = [j['job_id'] for j in jobs]
return ids
[docs] def all_running_jobs(self):
"""Retrieve a list of running jobs."""
jobs = self.qstatinfo()
ids = [j['job_id'] for j in jobs if j['status'] == 'R']
return ids
[docs] def all_queued_jobs(self):
"""Retrieve a list of queued jobs."""
jobs = self.qstatinfo()
ids = [j['job_id'] for j in jobs if j['status'] == 'Q']
return ids
[docs] def myjobs(self):
"""Retrieve a list of all the current user's jobs."""
jobs = self.qstatinfo()
ids = [j['job_id'] for j in jobs if j['user'] == self.username]
if len(ids) < 1:
return 'You have no jobs running or queued.'
else:
rids = [j['job_id'] for j in jobs if j['user'] == self.username
and j['status'] == 'R']
qids = [j['job_id'] for j in jobs if j['user'] == self.username
and j['status'] == 'Q']
return 'Running jobs: %s\nQueued jobs: %s' % (rids, qids)
[docs] def watch(self, job_id):
"""Wait until a job or list of jobs finishes and get updates."""
jobs = self.qstatinfo()
rids = [j['job_id'] for j in jobs if j['user'] == self.username
and j['status'] == 'R']
qids = [j['job_id'] for j in jobs if j['user'] == self.username
and j['status'] == 'Q']
if job_id in qids:
return 'Waiting for %s to start running.' % job_id
self.watch(job_id)
elif job_id in rids:
return 'Waiting for %s to finish running.' % job_id
self.watch(job_id)
else:
return 'Job id not found.'