base.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. # Copyright 2020 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. from oslo_config import cfg
  4. from oslo_log import log as logging
  5. from taskflow import task as taskflow_tasks
  6. from taskflow.types import failure
  7. from coriolis import constants
  8. from coriolis import exception
  9. from coriolis import utils
  10. from coriolis.tasks import factory as tasks_factory
  11. from coriolis.scheduler.rpc import client as rpc_scheduler_client
  12. from coriolis.worker.rpc import client as rpc_worker_client
  13. TASK_RETURN_VALUE_FORMAT = "%s-result" % (
  14. constants.TASK_LOCK_NAME_FORMAT)
  15. LOG = logging.getLogger()
  16. taskflow_opts = [
  17. cfg.IntOpt("worker_task_execution_timeout",
  18. default=3600,
  19. help="Number of seconds until Coriolis tasks which are executed"
  20. "remotely on a Worker Service through taskflow timeout.")
  21. ]
  22. CONF = cfg.CONF
  23. CONF.register_opts(taskflow_opts, 'taskflow')
  24. class BaseCoriolisTaskflowTask(taskflow_tasks.Task):
  25. """ Base class for all TaskFlow tasks within Coriolis. """
  26. def _get_error_str_for_flow_failures(
  27. self, flow_failures, full_tracebacks=True):
  28. if not flow_failures:
  29. return "No flow failures provided."
  30. if not flow_failures.items():
  31. return "No flow failures present."
  32. res = ""
  33. for (task_id, task_failure) in flow_failures.items():
  34. label = "Error message"
  35. failure_str = task_failure.exception_str
  36. if full_tracebacks:
  37. label = "Traceback"
  38. failure_str = task_failure.traceback_str
  39. else:
  40. failure_str = task_failure.exception_str
  41. if isinstance(
  42. task_failure.exception,
  43. exception.TaskProcessException):
  44. # NOTE: TaskProcessException contains a full trace
  45. # from the worker service so we must split it:
  46. exception_lines = task_failure.exception_str.split('\n')
  47. if exception_lines:
  48. if len(exception_lines) > 2:
  49. failure_str = exception_lines[-2].strip()
  50. else:
  51. failure_str = exception_lines[-1].strip()
  52. res = (
  53. "%s %s for task '%s': %s\n" % (
  54. res, label, task_id, failure_str))
  55. if res:
  56. # remove extra newline:
  57. res = res[:-1]
  58. return res
  59. def revert(self, *args, **kwargs):
  60. result = kwargs.get('result')
  61. if isinstance(result, failure.Failure):
  62. # it means that this is the task which error'd out:
  63. LOG.error(
  64. "Taskflow task '%s' is reverting after errorring out with the "
  65. "following trace: %s", self.name, result.traceback_str)
  66. else:
  67. # else the failures were from other tasks:
  68. flow_failures = kwargs.get('flow_failures', {})
  69. LOG.error(
  70. "Taskflow task '%s' is reverting after the failure of one "
  71. "or more other tasks (%s) Tracebacks were:\n%s" % (
  72. self.name, list(flow_failures.keys()),
  73. self._get_error_str_for_flow_failures(
  74. flow_failures, full_tracebacks=True)))
  75. class BaseRunWorkerTask(BaseCoriolisTaskflowTask):
  76. """ Base taskflow.Task implementation for tasks which can be run
  77. on the worker service.
  78. This class can be seen as an "adapter" between the current
  79. coriolis.tasks.TaskRunner classes and taskflow ones.
  80. :param task_id: ID of the task. This value is declared as a returned value
  81. from the task and can be set as a requirement for other tasks, thus
  82. achieving a dependency system.
  83. :param main_task_runner_class: constants.TASK_TYPE_* referencing the
  84. main coriolis.tasks.TaskRunner class to be run on a worker service.
  85. :param cleanup_task_runner_task: constants.TASK_TYPE_* referencing the
  86. cleanup task to be run on reversion. No cleanup will be performed
  87. during the task's reversion (apart from Worker Service deallocation)
  88. otherwise.
  89. """
  90. def __init__(
  91. self, task_name, task_id, task_instance, main_task_runner_type,
  92. cleanup_task_runner_type=None, depends_on=None,
  93. raise_on_cleanup_failure=False, **kwargs):
  94. self._task_id = task_id
  95. self._task_name = task_name
  96. self._task_instance = task_instance
  97. self._main_task_runner_type = main_task_runner_type
  98. self._cleanup_task_runner_type = cleanup_task_runner_type
  99. self._raise_on_cleanup_failure = raise_on_cleanup_failure
  100. self._scheduler_client_instance = None
  101. super(BaseRunWorkerTask, self).__init__(name=task_name, **kwargs)
  102. @property
  103. def _scheduler_client(self):
  104. if not getattr(self, '_scheduler_client_instance', None):
  105. self._scheduler_client_instance = (
  106. rpc_scheduler_client.SchedulerClient())
  107. return self._scheduler_client_instance
  108. def _set_provides_for_dependencies(self, kwargs):
  109. dep = TASK_RETURN_VALUE_FORMAT % self._task_name
  110. if kwargs.get('provides') is not None:
  111. kwargs['provides'].append(dep)
  112. else:
  113. kwargs['provides'] = [dep]
  114. def _set_requires_for_dependencies(self, kwargs, depends_on):
  115. dep_requirements = [
  116. TASK_RETURN_VALUE_FORMAT % dep_id
  117. for dep_id in depends_on]
  118. if kwargs.get('requires') is not None:
  119. kwargs['requires'].extend(dep_requirements)
  120. elif dep_requirements:
  121. kwargs['requires'] = dep_requirements
  122. return kwargs
  123. def _set_requires_for_task_info_fields(self, kwargs):
  124. new_requires = kwargs.get('requires', [])
  125. main_task_runner = tasks_factory.get_task_runner_class(
  126. self._main_task_runner_type)
  127. main_task_deps = main_task_runner.get_required_task_info_properties()
  128. new_requires.extend(main_task_deps)
  129. if self._cleanup_task_runner_type:
  130. cleanup_task_runner = tasks_factory.get_task_runner_class(
  131. self._cleanup_task_runner_type)
  132. cleanup_task_deps = list(
  133. set(
  134. cleanup_task_runner.get_required_task_info_properties(
  135. )).difference(
  136. main_task_runner.get_returned_task_info_properties()))
  137. new_requires.extend(cleanup_task_deps)
  138. kwargs['requires'] = new_requires
  139. return kwargs
  140. def _set_provides_for_task_info_fields(self, kwargs):
  141. new_provides = kwargs.get('provides', [])
  142. main_task_runner = tasks_factory.get_task_runner_class(
  143. self._main_task_runner_type)
  144. main_task_res = main_task_runner.get_returned_task_info_properties()
  145. new_provides.extend(main_task_res)
  146. if self._cleanup_task_runner_type:
  147. cleanup_task_runner = tasks_factory.get_task_runner_class(
  148. self._cleanup_task_runner_type)
  149. cleanup_task_res = list(
  150. set(
  151. cleanup_task_runner.get_returned_task_info_properties(
  152. )).difference(
  153. main_task_runner.get_returned_task_info_properties()))
  154. new_provides.extend(cleanup_task_res)
  155. kwargs['provides'] = new_provides
  156. return kwargs
  157. def _get_worker_service_rpc_for_task(
  158. self, ctxt, task_id, task_type, origin, destination,
  159. retry_count=5, retry_period=2,
  160. rpc_timeout=CONF.taskflow.worker_task_execution_timeout):
  161. task_info = {
  162. "id": task_id,
  163. "task_type": task_type}
  164. worker_service = self._scheduler_client.get_worker_service_for_task(
  165. ctxt, task_info, origin, destination, retry_count=retry_count,
  166. retry_period=retry_period, random_choice=True)
  167. LOG.debug(
  168. "[Task '%s'] Was offered the following worker service for executing "
  169. "Taskflow worker task '%s': %s",
  170. self._task_name, task_id, worker_service['id'])
  171. return rpc_worker_client.WorkerClient.from_service_definition(
  172. worker_service, timeout=rpc_timeout)
  173. def _execute_task(
  174. self, ctxt, task_id, task_type, origin, destination, task_info):
  175. worker_rpc = self._get_worker_service_rpc_for_task(
  176. ctxt, self._task_id, task_type, origin, destination)
  177. try:
  178. LOG.debug(
  179. "[Task '%s'] Starting to run task '%s' (type '%s') "
  180. "on worker service." % (
  181. self._task_id, self._task_name, task_type))
  182. res = worker_rpc.run_task(
  183. ctxt, self._task_id, task_type, origin, destination,
  184. self._task_instance, task_info)
  185. LOG.debug(
  186. "[Task '%s'] Taskflow worker task '%s' (type %s) has "
  187. "successfully run and returned the following info: %s" % (
  188. self._task_name, task_id, task_type, res))
  189. return res
  190. except Exception as ex:
  191. LOG.debug(
  192. "[Task %s] Exception occurred while executing task '%s' "
  193. "(type '%s') on the worker service: %s", self._task_name,
  194. task_id, task_type, utils.get_exception_details())
  195. raise
  196. def execute(self, context, origin, destination, task_info):
  197. res = self._execute_task(
  198. context, self._task_id, self._main_task_runner_type, origin,
  199. destination, task_info)
  200. return res
  201. def revert(self, context, origin, destination, task_info, **kwargs):
  202. super(BaseRunWorkerTask, self).revert(
  203. context, origin, destination, task_info, **kwargs)
  204. if not self._cleanup_task_runner_type:
  205. LOG.debug(
  206. "Task '%s' (main type '%s') had no cleanup task runner "
  207. "associated with it. Skipping any reversion logic",
  208. self._task_name, self._main_task_runner_type)
  209. return
  210. try:
  211. res = self._execute_task(
  212. context, self._task_id, self._cleanup_task_runner_type, origin,
  213. destination, task_info)
  214. except Exception as ex:
  215. LOG.warn(
  216. "Task cleanup for '%s' (main task type '%s', cleanup task type"
  217. "'%s') has failed with the following trace: %s",
  218. self._task_name, self._main_task_runner_type,
  219. self._cleanup_task_runner_type, utils.get_exception_details())
  220. if self._raise_on_cleanup_failure:
  221. raise
  222. return
  223. LOG.debug(
  224. "Reversion of taskflow task '%s' (ID '%s') was successfully "
  225. "executed using task runner '%s' with the following result: %s" % (
  226. self._task_name, self._task_id, self._cleanup_task_runner_type,
  227. res))