"""Module to handle a task."""
# Copyright 2017 Qarnot computing
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import makedirs, path
import time
import warnings
import sys
from . import get_url, raise_on_error, _util
from .status import Status
from .bucket import Bucket
from .pool import Pool
from .error import Error
from .exceptions import MissingTaskException, MaxTaskException, NotEnoughCreditsException, \
MissingBucketException, BucketStorageUnavailableException
try:
from progressbar import AnimatedMarker, Bar, Percentage, AdaptiveETA, ProgressBar
except ImportError:
pass
RUNNING_DOWNLOADING_STATES = ['Submitted', 'PartiallyDispatched',
'FullyDispatched', 'PartiallyExecuting',
'FullyExecuting', 'DownloadingResults', 'UploadingResults']
[docs]class Task(object):
"""Represents a Qarnot task.
.. note::
A :class:`Task` must be created with
:meth:`qarnot.connection.Connection.create_task`
or retrieved with :meth:`qarnot.connection.Connection.tasks` or :meth:`qarnot.connection.Connection.retrieve_task`.
"""
[docs] def __init__(self, connection, name, profile_or_pool=None, instancecount_or_range=1, shortname=None, job=None):
"""Create a new :class:`Task`.
:param connection: the cluster on which to send the task
:type connection: :class:`qarnot.connection.Connection`
:param name: given name of the task
:type name: :class:`str`
:param profile_or_pool: which profile to use with this task, or which Pool to run task,
:type profile_or_pool: str or :class:`~qarnot.pool.Pool` or None
:param instancecount_or_range: number of instances or ranges on which to run task
:type instancecount_or_range: int or str
:param shortname: userfriendly task name
:type shortname: :class:`str`
:param job: which job to attach the task to
:type job: :class:`~qarnot.job.Job`
"""
self._name = name
self._shortname = shortname
self._profile = None
self._pool_uuid = None
self._job_uuid = None
if profile_or_pool is not None:
if isinstance(profile_or_pool, Pool):
self._pool_uuid = profile_or_pool.uuid
else:
self._profile = profile_or_pool
if job is not None:
if isinstance(profile_or_pool, Pool):
raise Exception("Cannot attach a same task to pool and a job simultaneously")
self._job_uuid = job.uuid
if isinstance(instancecount_or_range, int):
self._instancecount = instancecount_or_range
self._advanced_range = None
else:
self._advanced_range = instancecount_or_range
self._instancecount = 0
self._running_core_count = 0
self._running_instance_count = 0
self._resource_objects = []
self._result_object = None
self._connection = connection
self._constants = {}
"""
:type: dict(str,str)
Constants of the task.
Can be set until :meth:`run` or :meth:`submit` is called
.. note:: See available constants for a specific profile
with :meth:`qarnot.connection.Connection.retrieve_profile`.
"""
self._dependentOn = []
self._auto_update = True
self._last_auto_update_state = self._auto_update
self._update_cache_time = 5
self._last_cache = time.time()
self._constraints = {}
self._state = 'UnSubmitted' # RO property same for below
self._uuid = None
self._snapshots = False
self._dirty = False
self._rescount = -1
self._snapshot_whitelist = None
self._snapshot_blacklist = None
self._results_whitelist = None
self._results_blacklist = None
self._status = None
self._completed_instances = []
self._tags = []
self._creation_date = None
self._errors = None
self._resource_object_ids = []
self._result_object_id = None
self._is_summary = False
self._completion_time_to_live = "00:00:00"
self._auto_delete = False
self._wait_for_pool_resources_synchronization = None
@classmethod
def _retrieve(cls, connection, uuid):
"""Retrieve a submitted task given its uuid.
:param qarnot.connection.Connection connection:
the cluster to retrieve the task from
:param str uuid: the uuid of the task to retrieve
:rtype: Task
:returns: The retrieved task.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: no such task
"""
resp = connection._get(get_url('task update', uuid=uuid))
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
return Task.from_json(connection, resp.json())
[docs] def run(self, output_dir=None, job_timeout=None, live_progress=False, results_progress=None):
"""Submit a task, wait for the results and download them if required.
:param str output_dir: (optional) path to a directory that will contain the results
:param float job_timeout: (optional) Number of seconds before the task :meth:`abort` if it is not
already finished
:param bool live_progress: (optional) display a live progress
:param results_progress: (optional) can be a callback (read,total,filename) or True to display a progress bar
:type results_progress: bool or function(float, float, str)
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.MaxTaskException: Task quota reached
:raises qarnot.exceptions.NotEnoughCreditsException: Not enough credits
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
.. note:: Will ensure all added file are on the resource bucket
regardless of their uploading mode.
.. note:: If this function is interrupted (script killed for example),
but the task is submitted, the task will still be executed remotely
(results will not be downloaded)
.. warning:: Will override *output_dir* content.
"""
self.submit()
self.wait(timeout=job_timeout, live_progress=live_progress)
if job_timeout is not None:
self.abort()
if output_dir is not None:
self.download_results(output_dir, progress=results_progress)
[docs] def resume(self, output_dir, job_timeout=None, live_progress=False, results_progress=None):
"""Resume waiting for this task if it is still in submitted mode.
Equivalent to :meth:`wait` + :meth:`download_results`.
:param str output_dir: path to a directory that will contain the results
:param float job_timeout: Number of seconds before the task :meth:`abort` if it is not
already finished
:param bool live_progress: display a live progress
:param results_progress: can be a callback (read,total,filename) or True to display a progress bar
:type results_progress: bool or function(float, float, str)
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not exist
.. note:: Do nothing if the task has not been submitted.
.. warning:: Will override *output_dir* content.
"""
if self._uuid is None:
return output_dir
self.wait(timeout=job_timeout, live_progress=live_progress)
self.download_results(output_dir, progress=results_progress)
[docs] def submit(self):
"""Submit task to the cluster if it is not already submitted.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.MaxTaskException: Task quota reached
:raises qarnot.exceptions.NotEnoughCreditsException: Not enough credits
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
.. note:: Will ensure all added files are on the resource bucket
regardless of their uploading mode.
.. note:: To get the results, call :meth:`download_results` once the job is done.
"""
self._pre_submit()
payload = self._to_json()
resp = self._connection._post(get_url('tasks'), json=payload)
if resp.status_code == 404:
raise MissingBucketException(resp.json()['message'])
elif resp.status_code == 403:
raise MaxTaskException(resp.json()['message'])
elif resp.status_code == 402:
raise NotEnoughCreditsException(resp.json()['message'])
raise_on_error(resp)
self._uuid = resp.json()['uuid']
self._post_submit()
def _pre_submit(self):
"""Pre submit action on the task & its resources"""
if self._uuid is not None:
return self._state
for resource_buckets in self.resources:
resource_buckets.flush()
def _post_submit(self):
"""Post submit action on the task after submission"""
if not isinstance(self._snapshots, bool):
self.snapshot(self._snapshots)
self.update(True)
[docs] def abort(self):
"""Abort this task if running.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not exist
"""
self.update(True)
resp = self._connection._post(
get_url('task abort', uuid=self._uuid))
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
self.update(True)
[docs] def update_resources(self):
""" Update resources for a running task.
The typical workflow is as follows:
1. Upload new files on your resource bucket,
2. Call this method,
3. The new files will appear on all the compute nodes in the $DOCKER_WORKDIR folder
Note: There is no way to know when the files are effectively transfered. This information is available on the compute node only.
Note: The update is additive only: files deleted from the bucket will NOT be deleted from the task's resources directory.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not exist
"""
self.update(True)
resp = self._connection._patch(
get_url('task update', uuid=self._uuid))
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
self.update(True)
[docs] def delete(self, purge_resources=False, purge_results=False):
"""Delete this task on the server.
:param bool purge_resources: parameter value is used to determine if the bucket is also deleted.
Defaults to False.
:param bool purge_results: parameter value is used to determine if the bucket is also deleted.
Defaults to False.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not exist
"""
if purge_resources or purge_results:
self._update_if_summary()
if self._auto_update:
self._auto_update = False
if self._uuid is None:
return
if purge_results and self.results is not None:
try:
self.results.update()
except MissingBucketException:
purge_results = False
resp = self._connection._delete(
get_url('task update', uuid=self._uuid))
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
if purge_resources and len(self.resources) != 0:
toremove = []
for r in self.resources:
try:
r.update()
r.delete()
toremove.append(r)
except (MissingBucketException, BucketStorageUnavailableException) as exception:
warnings.warn(str(exception))
for tr in toremove:
self.resources.remove(tr)
if purge_results and self._result_object is not None:
try:
self._result_object.delete()
self._result_object = None
except MissingBucketException as exception:
warnings.warn(str(exception))
self._state = "Deleted"
self._uuid = None
[docs] def update(self, flushcache=False):
"""
Update the task object from the REST Api.
The flushcache parameter can be used to force the update, otherwise a cached version of the object
will be served when accessing properties of the object.
Some methods will flush the cache, like :meth:`submit`, :meth:`abort`, :meth:`wait` and :meth:`instant`.
Cache behavior is configurable with :attr:`auto_update` and :attr:`update_cache_time`.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not represent a
valid one
"""
if self._uuid is None:
return
now = time.time()
if (now - self._last_cache) < self._update_cache_time and not flushcache:
return
resp = self._connection._get(
get_url('task update', uuid=self._uuid))
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
self._update(resp.json())
self._last_cache = time.time()
self._is_summary = False
def _update(self, json_task):
"""Update this task from retrieved info."""
self._name = json_task['name']
self._shortname = json_task.get('shortname')
self._profile = json_task['profile']
self._pool_uuid = json_task.get('poolUuid')
self._job_uuid = json_task.get('jobUuid')
self._instancecount = json_task.get('instanceCount')
self._advanced_range = json_task.get('advancedRanges')
self._wait_for_pool_resources_synchronization = json_task.get('waitForPoolResourcesSynchronization', None)
if json_task['runningCoreCount'] is not None:
self._running_core_count = json_task['runningCoreCount']
if json_task['runningInstanceCount'] is not None:
self._running_instance_count = json_task['runningInstanceCount']
if 'resourceBuckets' in json_task and json_task['resourceBuckets']:
self._resource_object_ids = json_task['resourceBuckets']
if 'resultBucket' in json_task and json_task['resultBucket']:
self._result_object_id = json_task['resultBucket']
if 'status' in json_task:
self._status = json_task['status']
self._creation_date = _util.parse_datetime(json_task['creationDate'])
if 'errors' in json_task:
self._errors = [Error(d) for d in json_task['errors']]
else:
self._errors = []
if 'constants' in json_task:
for constant in json_task['constants']:
self._constants[constant.get('key')] = constant.get('value')
self._uuid = json_task['uuid']
self._state = json_task['state']
self._tags = json_task.get('tags', None)
if 'resultsCount' in json_task:
if self._rescount < json_task['resultsCount']:
self._dirty = True
self._rescount = json_task['resultsCount']
if 'resultsBlacklist' in json_task:
self._results_blacklist = json_task['resultsBlacklist']
if 'resultsWhitelist' in json_task:
self._results_whitelist = json_task['resultsWhitelist']
if 'snapshotWhitelist' in json_task:
self._snapshot_whitelist = json_task['snapshotWhitelist']
if 'snapshotBlacklist' in json_task:
self._snapshot_blacklist = json_task['snapshotBlacklist']
if 'completedInstances' in json_task:
self._completed_instances = [CompletedInstance(x) for x in json_task['completedInstances']]
else:
self._completed_instances = []
if 'autoDeleteOnCompletion' in json_task:
self._auto_delete = json_task["autoDeleteOnCompletion"]
if 'completionTimeToLive' in json_task:
self._completion_time_to_live = json_task["completionTimeToLive"]
[docs] @classmethod
def from_json(cls, connection, json_task, is_summary=False):
"""Create a Task object from a json task.
:param qarnot.connection.Connection connection: the cluster connection
:param dict json_task: Dictionary representing the task
:returns: The created :class:`~qarnot.task.Task`.
"""
if 'instanceCount' in json_task:
instance_count_or_range = json_task['instanceCount']
else:
instance_count_or_range = json_task['advancedRanges']
new_task = cls(connection,
json_task['name'],
json_task.get('profile') or json_task.get('poolUuid'),
instance_count_or_range)
new_task._update(json_task)
new_task._is_summary = is_summary
return new_task
[docs] def commit(self):
"""Replicate local changes on the current object instance to the REST API
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
.. note:: When updating buckets' properties, auto update will be disabled until commit is called.
"""
data = self._to_json()
resp = self._connection._put(get_url('task update', uuid=self._uuid), json=data)
self._auto_update = self._last_auto_update_state
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
[docs] def wait(self, timeout=None, live_progress=False):
"""Wait for this task until it is completed.
:param float timeout: maximum time (in seconds) to wait before returning
(None => no timeout)
:param bool live_progress: display a live progress
:rtype: :class:`bool`
:returns: Is the task finished
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not represent a valid
one
"""
live_progress = live_progress and sys.stdout.isatty()
if live_progress:
try:
widgets = [
Percentage(),
' ', AnimatedMarker(),
' ', Bar(),
' ', AdaptiveETA()
]
progressbar = ProgressBar(widgets=widgets, max_value=100)
except Exception:
live_progress = False
start = time.time()
if self._uuid is None:
self.update(True)
return False
nap = min(10, timeout) if timeout is not None else 10
self.update(True)
while self._state in RUNNING_DOWNLOADING_STATES:
if live_progress:
n = 0
progress = 0
while True:
time.sleep(1)
n += 1
if n >= nap:
break
progress = self.status.execution_progress if self.status is not None else 0
progress = max(0, min(progress, 100))
progressbar.update(progress)
else:
time.sleep(nap)
self.update(True)
if timeout is not None:
elapsed = time.time() - start
if timeout <= elapsed:
self.update()
return False
else:
nap = min(10, timeout - elapsed)
self.update(True)
if live_progress:
progressbar.finish()
return True
[docs] def snapshot(self, interval):
"""Start snapshooting results.
If called, this task's results will be periodically
updated, instead of only being available at the end.
Snapshots will be taken every *interval* second from the time
the task is submitted.
:param int interval: the interval in seconds at which to take snapshots
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not represent a
valid one
.. note:: To get the temporary results, call :meth:`download_results`.
"""
if self._uuid is None:
self._snapshots = interval
return
resp = self._connection._post(get_url('task snapshot', uuid=self._uuid),
json={"interval": interval})
if resp.status_code == 400:
raise ValueError(interval)
elif resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
self._snapshots = True
[docs] def instant(self):
"""Make a snapshot of the current task.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not exist
.. note:: To get the temporary results, call :meth:`download_results`.
"""
if self._uuid is None:
return
resp = self._connection._post(get_url('task instant', uuid=self._uuid),
json=None)
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
self.update(True)
@property
def state(self):
""":type: :class:`str`
:getter: return this task's state
State of the task.
Value is in
* UnSubmitted
* Submitted
* PartiallyDispatched
* FullyDispatched
* PartiallyExecuting
* FullyExecuting
* UploadingResults
* DownloadingResults
* Cancelled
* Success
* Failure
.. warning::
this is the state of the task when the object was retrieved,
call :meth:`update` for up to date value.
"""
if self._auto_update:
self.update()
return self._state
@property
def resources(self):
""":type: list(:class:`~qarnot.bucket.Bucket`)
:getter: Returns this task's resources bucket
:setter: Sets this task's resources bucket
Represents resource files.
"""
self._update_if_summary()
if self._auto_update:
self.update()
if not self._resource_objects:
for bid in self._resource_object_ids:
d = Bucket(self._connection, bid)
self._resource_objects.append(d)
return self._resource_objects
@resources.setter
def resources(self, value):
"""This is a setter."""
self._resource_objects = value
@property
def results(self):
""":type: :class:`~qarnot.bucket.Bucket`
:getter: Returns this task's results bucket
:setter: Sets this task's results bucket
Represents results files."""
self._update_if_summary()
if self._result_object is None and self._result_object_id is not None:
self._result_object = Bucket(self._connection, self._result_object_id)
if self._auto_update:
self.update()
return self._result_object
@results.setter
def results(self, value):
""" This is a setter."""
self._result_object = value
[docs] def download_results(self, output_dir, progress=None):
"""Download results in given *output_dir*.
:param str output_dir: local directory for the retrieved files.
:param progress: can be a callback (read,total,filename) or True to display a progress bar
:type progress: bool or function(float, float, str)
:raises qarnot.exceptions.MissingBucketException: the bucket is not on the server
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
.. warning:: Will override *output_dir* content.
"""
if self._uuid is not None:
self.update()
if not path.exists(output_dir):
makedirs(output_dir)
if self._dirty:
self.results.get_all_files(output_dir, progress=progress)
[docs] def stdout(self):
"""Get the standard output of the task
since the submission of the task.
:rtype: :class:`str`
:returns: The standard output.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not exist
.. note:: The buffer is circular, if stdout is too big, prefer calling
:meth:`fresh_stdout` regularly.
"""
if self._uuid is None:
return ""
resp = self._connection._get(
get_url('task stdout', uuid=self._uuid))
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
return resp.text
[docs] def fresh_stdout(self):
"""Get what has been written on the standard output since last time
this function was called or since the task has been submitted.
:rtype: :class:`str`
:returns: The new output since last call.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not exist
"""
if self._uuid is None:
return ""
resp = self._connection._post(
get_url('task stdout', uuid=self._uuid))
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
return resp.text
[docs] def stderr(self):
"""Get the standard error of the task
since the submission of the task.
:rtype: :class:`str`
:returns: The standard error.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not exist
.. note:: The buffer is circular, if stderr is too big, prefer calling
:meth:`fresh_stderr` regularly.
"""
if self._uuid is None:
return ""
resp = self._connection._get(
get_url('task stderr', uuid=self._uuid))
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
return resp.text
[docs] def fresh_stderr(self):
"""Get what has been written on the standard error since last time
this function was called or since the task has been submitted.
:rtype: :class:`str`
:returns: The new error messages since last call.
:raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises qarnot.exceptions.UnauthorizedException: invalid credentials
:raises qarnot.exceptions.MissingTaskException: task does not exist
"""
if self._uuid is None:
return ""
resp = self._connection._post(
get_url('task stderr', uuid=self._uuid))
if resp.status_code == 404:
raise MissingTaskException(resp.json()['message'])
raise_on_error(resp)
return resp.text
@property
def uuid(self):
""":type: :class:`str`
:getter: Returns this task's uuid
The task's uuid.
Automatically set when a task is submitted.
"""
return self._uuid
@property
def name(self):
""":type: :class:`str`
:getter: Returns this task's name
:setter: Sets this task's name
The task's name.
Can be set until task is submitted.
"""
return self._name
@name.setter
def name(self, value):
"""Setter for name."""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
else:
self._name = value
@property
def shortname(self):
""":type: :class:`str`
:getter: Returns this task's shortname
:setter: Sets this task's shortname
The task's shortname, must be DNS compliant and unique, if not provided, will default to :attr:`uuid`.
Can be set until task is submitted.
"""
return self._shortname
@shortname.setter
def shortname(self, value):
"""Setter for shortname."""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
else:
self._shortname = value
@property
def tags(self):
""":type: :class:list(`str`)
:getter: Returns this task's tags
:setter: Sets this task's tags
Custom tags.
"""
self._update_if_summary()
if self._auto_update:
self.update()
return self._tags
@tags.setter
def tags(self, value):
"""setter for tags"""
self._tags = value
self._auto_update = False
@property
def pool(self):
""":type: :class:`~qarnot.pool.Pool`
:getter: Returns this task's pool
:setter: Sets this task's pool
The pool to run the task in.
Can be set until :meth:`run` is called.
.. warning:: This property is mutually exclusive with :attr:`profile`
"""
return self._connection.retrieve_pool(self._pool_uuid)
@pool.setter
def pool(self, value):
"""setter for pool"""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
if self._profile is not None:
raise AttributeError("Can't set pool if profile is not None")
else:
self._pool_uuid = value.uuid
@property
def profile(self):
""":type: :class:`str`
:getter: Returns this task's profile
:setter: Sets this task's profile
The profile to run the task with.
Can be set until :meth:`run` or :meth:`submit` is called.
.. warning:: This property is mutually exclusive with :attr:`pool`
"""
return self._profile
@profile.setter
def profile(self, value):
"""setter for profile"""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
if self._pool_uuid is not None:
raise AttributeError("Can't set profile if pool is not None")
else:
self._profile = value
@property
def instancecount(self):
""":type: :class:`int`
:getter: Returns this task's instance count
:setter: Sets this task's instance count
Number of instances needed for the task.
Can be set until :meth:`run` or :meth:`submit` is called.
:raises AttributeError: if :attr:`advanced_range` is not None when setting this property
.. warning:: This property is mutually exclusive with :attr:`advanced_range`
"""
return self._instancecount
@instancecount.setter
def instancecount(self, value):
"""Setter for instancecount."""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
if self.advanced_range is not None:
raise AttributeError("Can't set instancecount if advanced_range is not None")
self._instancecount = value
@property
def running_core_count(self):
""":type: :class:`int`
:getter: Returns this task's running core count
Number of core running inside the task.
"""
return self._running_core_count
@property
def running_instance_count(self):
""":type: :class:`int`
:getter: Returns this task's running instance count
Number of instances running inside the task.
"""
return self._running_instance_count
@property
def advanced_range(self):
""":type: :class:`str`
:getter: Returns this task's advanced range
:setter: Sets this task's advanced range
Advanced instances range selection.
Allows to select which instances will be computed.
Should be None or match the following extended regular expression
"""r"""**"([0-9]+|[0-9]+-[0-9]+)(,([0-9]+|[0-9]+-[0-9]+))+"**
eg: 1,3-8,9,12-19
Can be set until :meth:`run` is called.
:raises AttributeError: if :attr:`instancecount` is not 0 when setting this property
.. warning:: This property is mutually exclusive with :attr:`instancecount`
"""
return self._advanced_range
@advanced_range.setter
def advanced_range(self, value):
"""Setter for advanced_range."""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
if self.instancecount != 0:
raise AttributeError("Can't set advanced_range if instancecount is not 0")
self._advanced_range = value
@property
def snapshot_whitelist(self):
""":type: :class:`str`
:getter: Returns this task's snapshot whitelist
:setter: Sets this task's snapshot whitelist
Snapshot white list (regex) for :meth:`snapshot` and :meth:`instant`
Can be set until task is submitted.
"""
self._update_if_summary()
return self._snapshot_whitelist
@snapshot_whitelist.setter
def snapshot_whitelist(self, value):
"""Setter for snapshot whitelist, this can only be set before tasks submission
"""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
self._snapshot_whitelist = value
@property
def snapshot_blacklist(self):
""":type: :class:`str`
:getter: Returns this task's snapshot blacklist
:setter: Sets this task's snapshot blacklist
Snapshot black list (regex) for :meth:`snapshot` :meth:`instant`
Can be set until task is submitted.
"""
self._update_if_summary()
return self._snapshot_blacklist
@snapshot_blacklist.setter
def snapshot_blacklist(self, value):
"""Setter for snapshot blacklist, this can only be set before tasks submission
"""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
self._snapshot_blacklist = value
@property
def results_whitelist(self):
""":type: :class:`str`
:getter: Returns this task's results whitelist
:setter: Sets this task's results whitelist
Results whitelist (regex)
Can be set until task is submitted.
"""
self._update_if_summary()
return self._results_whitelist
@results_whitelist.setter
def results_whitelist(self, value):
"""Setter for results whitelist, this can only be set before tasks submission
"""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
self._results_whitelist = value
@property
def results_blacklist(self):
""":type: :class:`str`
:getter: Returns this task's results blacklist
:setter: Sets this task's results blacklist
Results blacklist (regex)
Can be set until task is submitted.
"""
self._update_if_summary()
if self._auto_update:
self.update()
return self._results_blacklist
@results_blacklist.setter
def results_blacklist(self, value):
"""Setter for results blacklist, this can only be set before tasks submission
"""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
self._results_blacklist = value
@property
def status(self):
""":type: :class:`qarnot.status.Status`
:getter: Returns this task's status
Status of the task
"""
self._update_if_summary()
if self._auto_update:
self.update()
if self._status:
return Status(self._status)
return self._status
@property
def completed_instances(self):
""":type: list(:class:`CompletedInstance`)
:getter: Return this task's completed instances
"""
self._update_if_summary()
if self._auto_update:
self.update()
return self._completed_instances
@property
def creation_date(self):
""":type: :class:`str`
:getter: Returns this task's creation date
Creation date of the task (UTC Time)
"""
return self._creation_date
@property
def errors(self):
""":type: list(:class:`Error`)
:getter: Returns this task's errors if any.
Error reason if any, empty string if none
"""
self._update_if_summary()
if self._auto_update:
self.update()
return self._errors
@property
def constants(self):
""":type: dictionary{:class:`str` : :class:`str`}
:getter: Returns this task's constants dictionary.
:setter: set the task's constants dictionary.
Update the constants if needed
Constants are the parametrazer of the profils.
Use them to adjust your profile parametter.
"""
self._update_if_summary()
if self._auto_update:
self.update()
return self._constants
@constants.setter
def constants(self, value):
"""Setter for constants
"""
self._update_if_summary()
if self._auto_update:
self.update()
self._constants = value
@property
def constraints(self):
""":type: dictionary{:class:`str` : :class:`str`}
:getter: Returns this task's constraints dictionary.
:setter: set the task's constraints dictionary.
Update the constraints if needed
advance usage
"""
self._update_if_summary()
if self._auto_update:
self.update()
return self._constraints
@constraints.setter
def constraints(self, value):
"""Setter for constraints
"""
self._update_if_summary()
if self._auto_update:
self.update()
self._constraints = value
@property
def wait_for_pool_resources_synchronization(self):
""":type: :class:`bool` or None
:getter: Returns this task's wait_for_pool_resources_synchronization.
:setter: set the task's wait_for_pool_resources_synchronization.
:raises qarnot.exceptions.AttributeError: can't set this attribute on a launched task
"""
return self._wait_for_pool_resources_synchronization
@wait_for_pool_resources_synchronization.setter
def wait_for_pool_resources_synchronization(self, value):
"""Setter for wait_for_pool_resources_synchronization
"""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
self._wait_for_pool_resources_synchronization = value
@property
def auto_update(self):
""":type: :class:`bool`
:getter: Returns this task's auto update state
:setter: Sets this task's auto update state
Auto update state, default to True
When auto update is disabled properties will always return cached value
for the object and a call to :meth:`update` will be required to get latest values from the REST Api.
"""
return self._auto_update
@auto_update.setter
def auto_update(self, value):
"""Setter for auto_update feature
"""
self._auto_update = value
self._last_auto_update_state = self._auto_update
@property
def update_cache_time(self):
""":type: :class:`int`
:getter: Returns this task's auto update state
:setter: Sets this task's auto update state
Cache expiration time, default to 5s
"""
return self._update_cache_time
@update_cache_time.setter
def update_cache_time(self, value):
"""Setter for update_cache_time
"""
self._update_cache_time = value
[docs] def set_task_dependencies_from_uuids(self, uuids):
"""Setter for the task dependencies using uuid
"""
self._dependentOn += uuids
[docs] def set_task_dependencies_from_tasks(self, tasks):
"""Setter for the task dependencies using tasks
"""
self._dependentOn += [task._uuid for task in tasks]
@property
def auto_delete(self):
"""Autodelete this Task if it is finished and your max number of task is reach
Can be set until :meth:`run` or :meth:`submit` is called.
:type: :class:`bool`
:getter: Returns is this task must autodelete
:setter: Sets this task's autodelete
:default_value: "False"
:raises AttributeError: if you try to reset the auto_delete after the task is submit
"""
self._update_if_summary()
return self._auto_delete
@auto_delete.setter
def auto_delete(self, value):
"""Setter for auto_delete, this can only be set before tasks submission
"""
self._update_if_summary()
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched task")
self._auto_delete = value
@property
def completion_ttl(self):
"""The task will be auto delete `completion_ttl` after it is finished
Can be set until :meth:`run` or :meth:`submit` is called.
:getter: Returns this task's completed time to live.
:type: :class:`str`
:setter: Sets this task's this task's completed time to live.
:type: :class:`str` or :class:`datetime.timedelta`
:default_value: ""
:raises AttributeError: if you try to set it after the task is submitted
The `completion_ttl` must be a timedelta or a time span format string (example: 'd.hh:mm:ss' or 'hh:mm:ss')
"""
self._update_if_summary()
return self._completion_time_to_live
@completion_ttl.setter
def completion_ttl(self, value):
"""Setter for completion_ttl, this can only be set before tasks submission"""
self._update_if_summary()
if self._uuid is not None:
raise AttributeError("can't set attribute on a submitted job")
self._completion_time_to_live = _util.parse_to_timespan_string(value)
def _to_json(self):
"""Get a dict ready to be json packed from this task."""
const_list = [
{'key': key, 'value': value}
for key, value in self._constants.items()
]
constr_list = [
{'key': key, 'value': value}
for key, value in self._constraints.items()
]
json_task = {
'name': self._name,
'profile': self._profile,
'poolUuid': self._pool_uuid,
'jobUuid': None if self._job_uuid == "" else self._job_uuid,
'constants': const_list,
'constraints': constr_list,
'dependencies': {},
'waitForPoolResourcesSynchronization': self._wait_for_pool_resources_synchronization
}
json_task['dependencies']["dependsOn"] = self._dependentOn
if self._shortname is not None:
json_task['shortname'] = self._shortname
self._resource_object_ids = [x.uuid for x in self._resource_objects]
json_task['resourceBuckets'] = self._resource_object_ids
if self._result_object is not None:
json_task['resultBucket'] = self._result_object.uuid
if self._advanced_range is not None:
json_task['advancedRanges'] = self._advanced_range
else:
json_task['instanceCount'] = self._instancecount
json_task["tags"] = self._tags
if self._snapshot_whitelist is not None:
json_task['snapshotWhitelist'] = self._snapshot_whitelist
if self._snapshot_blacklist is not None:
json_task['snapshotBlacklist'] = self._snapshot_blacklist
if self._results_whitelist is not None:
json_task['resultsWhitelist'] = self._results_whitelist
if self._results_blacklist is not None:
json_task['resultsBlacklist'] = self._results_blacklist
json_task['autoDeleteOnCompletion'] = self._auto_delete
json_task['completionTimeToLive'] = self._completion_time_to_live
return json_task
def _update_if_summary(self):
"""Trigger flush update if the task is made from a summary.
This should be called before accessing any fields not contained in a summary task
"""
if self._is_summary:
self.update(True)
[docs] def __repr__(self):
return '{0} - {1} - {2} - {3} - InstanceCount : {4} - {5} - Resources : {6} - Results : {7}'\
.format(self.name,
self.shortname,
self._uuid,
self._profile,
self._instancecount,
self.state,
(self._resource_object_ids if self._resource_objects is not None else ""),
(self._result_object.uuid if self._result_object is not None else ""))
# Context manager
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if (exc_type is None) or exc_type != MissingTaskException:
self.delete()
return False
[docs]class CompletedInstance(object):
"""Completed Instance Information
.. note:: Read-only class
"""
[docs] def __init__(self, json):
self.instance_id = json['instanceId']
""":type: :class:`int`
Instance number."""
self.state = json['state']
""":type: :class:`str`
Instance final state."""
self.wall_time_sec = json['wallTimeSec']
""":type: :class:`float`
Instance wall time in seconds."""
self.exec_time_sec = json['execTimeSec']
""":type: :class:`float`
Execution time in seconds."""
self.exec_time_sec_ghz = json['execTimeSecGHz']
""":type: :class:`float`
Execution time in seconds GHz."""
self.peak_memory_mb = json['peakMemoryMB']
""":type: :class:`int`
Peak memory size in MB."""
self.average_ghz = json['averageGHz']
""":type: :class:`float`
Instance execution time GHz"""
self.results = json['results']
""":type: :class:list(`str`)
Instance produced results"""
[docs] def __repr__(self):
if sys.version_info > (3, 0):
return ', '.join("{0}={1}".format(key, val) for (key, val) in self.__dict__.items())
else:
return ', '.join("{0}={1}".format(key, val) for (key, val) in self.__dict__.iteritems()) # pylint: disable=no-member
[docs]class BulkTaskResponse(object):
"""Bulk Task Response Information
.. note:: Read-only class
"""
[docs] def __init__(self, json):
self.status_code = json['statusCode']
""":type: :class:`int`
Status code."""
self.uuid = json['uuid']
""":type: :class:`str`
Created Task Uuid."""
self.message = json['message']
""":type: :class:`str`
User friendly error message."""
[docs] def is_success(self):
"""Check that the task submit has been successful.
:rtype: :class:`bool`
:returns: The task creation success(depending on received uuid and the status code).
"""
return self.status_code >= 200 and self.status_code < 300 and self.uuid
[docs] def __repr__(self):
if sys.version_info > (3, 0):
return ', '.join("{0}={1}".format(key, val) for (key, val) in self.__dict__.items())
else:
return ', '.join("{0}={1}".format(key, val) for (key, val) in self.__dict__.iteritems()) # pylint: disable=no-member