from subprocess import run, CalledProcessError, PIPE
import os
from pkg_resources import resource_filename
from HTSeqCountCluster.logger import Logger
from HTSeqCountCluster.pbsjob.pbsutils import (basejobids, write_code_file,
import_temp, file_to_str)
from HTSeqCountCluster.pbsjob.pbsconfig import __DEFAULT__
from HTSeqCountCluster import pbsjob
from HTSeqCountCluster.pbsjob.qstat import Qstat
[docs]class BasePBSJob(object):
"""Base class for simple jobs."""
def __init__(self, base_jobname):
"""Initialize job attributes."""
self.default_job_attributes = __DEFAULT__
self.file2str = file2str
self.sgejob_log = Logger().default(logname="SGE JOB", logfile=None)
self.pbsworkdir = os.getcwd()
# Import the temp.pbs file using pkg_resources
self.temp_pbs = resource_filename(pbsjob.__name__, "temp.pbs")
@classmethod
def _configure(cls, length, base_jobname):
"""Configure job attributes or set it up."""
baseid, base = basejobids(length, base_jobname)
return baseid, base
[docs] def debug(self, code):
"""Debug the SGEJob."""
raise NotImplementedError
def _cleanup(self, jobname):
"""Clean up job scripts."""
self.sgejob_log.warning('Your job will now be cleaned up.')
os.remove(jobname + '.pbs')
self.sgejob_log.warning('%s.pbs has been deleted.' % jobname)
os.remove(jobname + '.py')
self.sgejob_log.warning('%s.py has been deleted.' % jobname)
[docs]class PBSJob(BasePBSJob):
"""Create a qsub/pbs job & script for the job to execute."""
def __init__(self, email_address, base_jobname=None):
super().__init__(base_jobname=base_jobname)
self.email = email_address
self.attributes = self.default_job_attributes
self.jobname = self.default_job_attributes['job_name']
if base_jobname is not None:
_, self.jobname = self._configure(base_jobname=base_jobname,
length=5)
self.attributes = self._update_default_attributes()
def _update_default_attributes(self):
pyfile_path = os.path.join(self.pbsworkdir, self.jobname + '.py')
# These attributes are automatically updated if a jobname is given.
new_attributes = {'email': self.email,
'job_name': self.jobname,
'outfile': self.jobname + '.o',
'errfile': self.jobname + '.e',
'script': self.jobname,
'log_name': self.jobname + '.log',
'cmd': 'python3 ' + pyfile_path,
}
self.default_job_attributes.update(new_attributes)
return self.default_job_attributes
[docs] def submit_code(self, code, cleanup=True, default=True):
"""Create and submit a qsub job.
Submit python code."""
# TIP If python is in your environment as only 'python' update that.
# If default, a python file will be created from code that is used.
# Allow user input to be a python file
if os.path.isfile(code) and str(code).endswith('.py'):
code_str = self.file2str(code)
self.sgejob_log.info('%s converted to string.' % code)
elif type(code) == str:
code_str = code
if default:
self.sgejob_log.info(
'You are running a job with default attributes.')
writecodefile(filename=self.jobname,
code=code_str, language='python')
pyfilename = self.jobname + '.py'
self.sgejob_log.info(
'%s python file has been created.' % pyfilename)
# Create the pbs script from the template or dict
pbstemp = import_temp(self.temp_pbs)
pbsfilename = self.jobname + '.pbs'
with open(pbsfilename, 'w') as pbsfile:
pbsfile.write(pbstemp.substitute(self.attributes))
pbsfile.close()
self.sgejob_log.info('%s has been created.' % pbsfilename)
else:
msg = 'Custom SGEJob creation is not yet implemented.'
raise NotImplementedError(msg)
# TODO Add custom job creation
# Submit the job using qsub
try:
cmd = ['qsub ' + self.jobname + '.pbs'] # this is the command
# Shell MUST be True
cmd_status = run(cmd, stdout=PIPE, stderr=PIPE,
shell=True, check=True)
except CalledProcessError as err:
self.sgejob_log.error(err.stderr.decode('utf-8'))
if cleanup:
self._cleanup(self.jobname)
else:
if cmd_status.returncode == 0: # Command was successful.
# The cmd_status has stdout that must be decoded.
# When a qsub job is submitted, the stdout is the job id.
submitted_jobid = cmd_status.stdout.decode('utf-8')
self.sgejob_log.info(self.jobname + ' was submitted.')
self.sgejob_log.info('Your job id is: %s' % submitted_jobid)
return submitted_jobid
if cleanup:
self._cleanup(self.jobname)
else: # Unsuccessful. Stdout will be '1'
self.sgejob_log.error('PBS job not submitted.')
[docs] def submit_cmd(self, cmd, cleanup=True):
"""Create and submit a qsub job.
Submit python code."""
cmddict = {'cmd': cmd}
self.attributes.update(cmddict)
# Create the pbs script from the template or dict
pbstemp = import_temp(self.temp_pbs)
pbsfilename = self.jobname + '.pbs'
with open(pbsfilename, 'w') as pbsfile:
pbsfile.write(pbstemp.substitute(self.attributes))
pbsfile.close()
self.sgejob_log.info('%s has been created.' % pbsfilename)
# Submit the job using qsub
try:
cmd = ['qsub ' + self.jobname + '.pbs'] # this is the command
# Shell MUST be True
cmd_status = run(cmd, stdout=PIPE, stderr=PIPE,
shell=True, check=True)
except CalledProcessError as err:
self.sgejob_log.error(err.stderr.decode('utf-8'))
if cleanup:
self._cleanup(self.jobname)
else:
if cmd_status.returncode == 0: # Command was successful.
# The cmd_status has stdout that must be decoded.
# When a qsub job is submitted, the stdout is the job id.
submitted_jobid = cmd_status.stdout.decode('utf-8')
self.sgejob_log.info(self.jobname + ' was submitted.')
self.sgejob_log.info('Your job id is: %s' % submitted_jobid)
return submitted_jobid
if cleanup:
self._cleanup(self.jobname)
else: # Unsuccessful. Stdout will be '1'
self.sgejob_log.error('PBS job not submitted.')