server.py 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336
  1. # Copyright 2016 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. import copy
  4. import functools
  5. import uuid
  6. from oslo_concurrency import lockutils
  7. from oslo_config import cfg
  8. from oslo_log import log as logging
  9. from coriolis import constants
  10. from coriolis import context
  11. from coriolis.db import api as db_api
  12. from coriolis.db.sqlalchemy import models
  13. from coriolis import exception
  14. from coriolis import keystone
  15. from coriolis.licensing import client as licensing_client
  16. from coriolis.replica_cron.rpc import client as rpc_cron_client
  17. from coriolis import schemas
  18. from coriolis import utils
  19. from coriolis.worker.rpc import client as rpc_worker_client
  20. VERSION = "1.0"
  21. LOG = logging.getLogger(__name__)
  22. conductor_opts = [
  23. cfg.BoolOpt("debug_os_morphing_errors",
  24. default=False,
  25. help="If set, any OSMorphing task which errors out will have "
  26. "all of its following tasks unscheduled so as to allow "
  27. "for live debugging of the OSMorphing setup.")
  28. ]
  29. CONF = cfg.CONF
  30. CONF.register_opts(conductor_opts, 'conductor')
  31. def endpoint_synchronized(func):
  32. @functools.wraps(func)
  33. def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
  34. @lockutils.synchronized(endpoint_id)
  35. def inner():
  36. return func(self, ctxt, endpoint_id, *args, **kwargs)
  37. return inner()
  38. return wrapper
  39. def replica_synchronized(func):
  40. @functools.wraps(func)
  41. def wrapper(self, ctxt, replica_id, *args, **kwargs):
  42. @lockutils.synchronized(replica_id)
  43. def inner():
  44. return func(self, ctxt, replica_id, *args, **kwargs)
  45. return inner()
  46. return wrapper
  47. def schedule_synchronized(func):
  48. @functools.wraps(func)
  49. def wrapper(self, ctxt, replica_id, schedule_id, *args, **kwargs):
  50. @lockutils.synchronized(schedule_id)
  51. def inner():
  52. return func(self, ctxt, replica_id, schedule_id, *args, **kwargs)
  53. return inner()
  54. return wrapper
  55. def task_synchronized(func):
  56. @functools.wraps(func)
  57. def wrapper(self, ctxt, task_id, *args, **kwargs):
  58. @lockutils.synchronized(task_id)
  59. def inner():
  60. return func(self, ctxt, task_id, *args, **kwargs)
  61. return inner()
  62. return wrapper
  63. def migration_synchronized(func):
  64. @functools.wraps(func)
  65. def wrapper(self, ctxt, migration_id, *args, **kwargs):
  66. @lockutils.synchronized(migration_id)
  67. def inner():
  68. return func(self, ctxt, migration_id, *args, **kwargs)
  69. return inner()
  70. return wrapper
  71. def tasks_execution_synchronized(func):
  72. @functools.wraps(func)
  73. def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs):
  74. @lockutils.synchronized(execution_id)
  75. def inner():
  76. return func(self, ctxt, replica_id, execution_id, *args, **kwargs)
  77. return inner()
  78. return wrapper
  79. class ConductorServerEndpoint(object):
  80. def __init__(self):
  81. self._licensing_client = licensing_client.LicensingClient.from_env()
  82. self._rpc_worker_client = rpc_worker_client.WorkerClient()
  83. self._replica_cron_client = rpc_cron_client.ReplicaCronClient()
  84. def get_all_diagnostics(self, ctxt):
  85. conductor = self.get_diagnostics(ctxt)
  86. cron = self._replica_cron_client.get_diagnostics(ctxt)
  87. worker = self._rpc_worker_client.get_diagnostics(ctxt)
  88. return [
  89. conductor,
  90. cron,
  91. worker,
  92. ]
  93. def _check_delete_reservation_for_transfer(self, transfer_action):
  94. action_id = transfer_action.base_id
  95. if not self._licensing_client:
  96. LOG.warn(
  97. "Licensing client not instantiated. Skipping deletion of "
  98. "reservation for transfer action '%s'", action_id)
  99. return
  100. reservation_id = transfer_action.reservation_id
  101. if reservation_id:
  102. try:
  103. self._licensing_client.delete_reservation(reservation_id)
  104. except (Exception, KeyboardInterrupt):
  105. LOG.warn(
  106. "Failed to delete reservation with ID '%s' for transfer "
  107. "action with ID '%s'. Skipping. Exception\n%s",
  108. reservation_id, action_id, utils.get_exception_details())
  109. def _check_create_reservation_for_transfer(
  110. self, transfer_action, transfer_type):
  111. action_id = transfer_action.base_id
  112. if not self._licensing_client:
  113. LOG.warn(
  114. "Licensing client not instantiated. Skipping creation of "
  115. "reservation for transfer action '%s'", action_id)
  116. return
  117. ninstances = len(transfer_action.instances)
  118. LOG.debug(
  119. "Attempting to create '%s' reservation for %d instances for "
  120. "transfer action with ID '%s'.",
  121. transfer_type, ninstances, action_id)
  122. reservation = self._licensing_client.add_reservation(
  123. transfer_type, ninstances)
  124. transfer_action.reservation_id = reservation['id']
  125. def _check_reservation_for_transfer(self, transfer_action):
  126. action_id = transfer_action.base_id
  127. if not self._licensing_client:
  128. LOG.warn(
  129. "Licensing client not instantiated. Skipping checking of "
  130. "reservation for transfer action '%s'", action_id)
  131. return
  132. reservation_id = transfer_action.reservation_id
  133. if reservation_id:
  134. LOG.debug(
  135. "Attempting to check reservation with ID '%s' for transfer "
  136. "action '%s'", reservation_id, action_id)
  137. self._licensing_client.check_reservation(reservation_id)
  138. else:
  139. LOG.debug(
  140. "Transfer action '%s' has no reservation ID set. Skipping "
  141. "all reservation licensing checks.", action_id)
  142. def create_endpoint(self, ctxt, name, endpoint_type, description,
  143. connection_info):
  144. endpoint = models.Endpoint()
  145. endpoint.name = name
  146. endpoint.type = endpoint_type
  147. endpoint.description = description
  148. endpoint.connection_info = connection_info
  149. db_api.add_endpoint(ctxt, endpoint)
  150. LOG.info("Endpoint created: %s", endpoint.id)
  151. return self.get_endpoint(ctxt, endpoint.id)
  152. def update_endpoint(self, ctxt, endpoint_id, updated_values):
  153. db_api.update_endpoint(ctxt, endpoint_id, updated_values)
  154. LOG.info("Endpoint updated: %s", endpoint_id)
  155. return self.get_endpoint(ctxt, endpoint_id)
  156. def get_endpoints(self, ctxt):
  157. return db_api.get_endpoints(ctxt)
  158. @endpoint_synchronized
  159. def get_endpoint(self, ctxt, endpoint_id):
  160. endpoint = db_api.get_endpoint(ctxt, endpoint_id)
  161. if not endpoint:
  162. raise exception.NotFound("Endpoint not found")
  163. return endpoint
  164. @endpoint_synchronized
  165. def delete_endpoint(self, ctxt, endpoint_id):
  166. q_replicas_count = db_api.get_endpoint_replicas_count(
  167. ctxt, endpoint_id)
  168. if q_replicas_count is not 0:
  169. raise exception.NotAuthorized("%s replicas would be orphaned!" %
  170. q_replicas_count)
  171. db_api.delete_endpoint(ctxt, endpoint_id)
  172. def get_endpoint_instances(self, ctxt, endpoint_id, source_environment,
  173. marker, limit, instance_name_pattern):
  174. endpoint = self.get_endpoint(ctxt, endpoint_id)
  175. return self._rpc_worker_client.get_endpoint_instances(
  176. ctxt, endpoint.type, endpoint.connection_info,
  177. source_environment, marker, limit, instance_name_pattern)
  178. def get_endpoint_instance(
  179. self, ctxt, endpoint_id, source_environment, instance_name):
  180. endpoint = self.get_endpoint(ctxt, endpoint_id)
  181. return self._rpc_worker_client.get_endpoint_instance(
  182. ctxt, endpoint.type, endpoint.connection_info,
  183. source_environment, instance_name)
  184. def get_endpoint_source_options(
  185. self, ctxt, endpoint_id, env, option_names):
  186. endpoint = self.get_endpoint(ctxt, endpoint_id)
  187. return self._rpc_worker_client.get_endpoint_source_options(
  188. ctxt, endpoint.type, endpoint.connection_info, env, option_names)
  189. def get_endpoint_destination_options(
  190. self, ctxt, endpoint_id, env, option_names):
  191. endpoint = self.get_endpoint(ctxt, endpoint_id)
  192. return self._rpc_worker_client.get_endpoint_destination_options(
  193. ctxt, endpoint.type, endpoint.connection_info, env, option_names)
  194. def get_endpoint_networks(self, ctxt, endpoint_id, env):
  195. endpoint = self.get_endpoint(ctxt, endpoint_id)
  196. return self._rpc_worker_client.get_endpoint_networks(
  197. ctxt, endpoint.type, endpoint.connection_info, env)
  198. def get_endpoint_storage(self, ctxt, endpoint_id, env):
  199. endpoint = self.get_endpoint(ctxt, endpoint_id)
  200. return self._rpc_worker_client.get_endpoint_storage(
  201. ctxt, endpoint.type, endpoint.connection_info, env)
  202. def validate_endpoint_connection(self, ctxt, endpoint_id):
  203. endpoint = self.get_endpoint(ctxt, endpoint_id)
  204. return self._rpc_worker_client.validate_endpoint_connection(
  205. ctxt, endpoint.type, endpoint.connection_info)
  206. def validate_endpoint_target_environment(
  207. self, ctxt, endpoint_id, target_env):
  208. endpoint = self.get_endpoint(ctxt, endpoint_id)
  209. return self._rpc_worker_client.validate_endpoint_target_environment(
  210. ctxt, endpoint.type, target_env)
  211. def validate_endpoint_source_environment(
  212. self, ctxt, endpoint_id, source_env):
  213. endpoint = self.get_endpoint(ctxt, endpoint_id)
  214. return self._rpc_worker_client.validate_endpoint_source_environment(
  215. ctxt, endpoint.type, source_env)
  216. def get_available_providers(self, ctxt):
  217. return self._rpc_worker_client.get_available_providers(ctxt)
  218. def get_provider_schemas(self, ctxt, platform_name, provider_type):
  219. return self._rpc_worker_client.get_provider_schemas(
  220. ctxt, platform_name, provider_type)
  221. @staticmethod
  222. def _create_task(instance, task_type, execution, depends_on=None,
  223. on_error=False):
  224. task = models.Task()
  225. task.id = str(uuid.uuid4())
  226. task.instance = instance
  227. task.execution = execution
  228. task.task_type = task_type
  229. task.depends_on = depends_on
  230. task.on_error = on_error
  231. task.index = len(task.execution.tasks) + 1
  232. if not on_error:
  233. task.status = constants.TASK_STATUS_PENDING
  234. else:
  235. task.status = constants.TASK_STATUS_ON_ERROR_ONLY
  236. if depends_on:
  237. for task_id in depends_on:
  238. if [t for t in task.execution.tasks if t.id == task_id and
  239. t.status != constants.TASK_STATUS_ON_ERROR_ONLY]:
  240. task.status = constants.TASK_STATUS_PENDING
  241. break
  242. return task
  243. def _get_task_origin(self, ctxt, action):
  244. endpoint = self.get_endpoint(ctxt, action.origin_endpoint_id)
  245. return {
  246. "connection_info": endpoint.connection_info,
  247. "type": endpoint.type,
  248. "source_environment": action.source_environment
  249. }
  250. def _get_task_destination(self, ctxt, action):
  251. endpoint = self.get_endpoint(ctxt, action.destination_endpoint_id)
  252. return {
  253. "connection_info": endpoint.connection_info,
  254. "type": endpoint.type,
  255. "target_environment": action.destination_environment
  256. }
  257. def _begin_tasks(self, ctxt, execution, task_info={}):
  258. if not ctxt.trust_id:
  259. keystone.create_trust(ctxt)
  260. ctxt.delete_trust_id = True
  261. origin = self._get_task_origin(ctxt, execution.action)
  262. destination = self._get_task_destination(ctxt, execution.action)
  263. for task in execution.tasks:
  264. if (not task.depends_on and
  265. task.status == constants.TASK_STATUS_PENDING):
  266. self._rpc_worker_client.begin_task(
  267. ctxt, server=None,
  268. task_id=task.id,
  269. task_type=task.task_type,
  270. origin=origin,
  271. destination=destination,
  272. instance=task.instance,
  273. task_info=task_info.get(task.instance, {}))
  274. @replica_synchronized
  275. def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances):
  276. replica = self._get_replica(ctxt, replica_id)
  277. self._check_reservation_for_transfer(replica)
  278. self._check_replica_running_executions(ctxt, replica)
  279. execution = models.TasksExecution()
  280. execution.id = str(uuid.uuid4())
  281. execution.status = constants.EXECUTION_STATUS_RUNNING
  282. execution.action = replica
  283. execution.type = constants.EXECUTION_TYPE_REPLICA_EXECUTION
  284. for instance in execution.action.instances:
  285. validate_replica_source_inputs_task = self._create_task(
  286. instance,
  287. constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS,
  288. execution)
  289. get_instance_info_task = self._create_task(
  290. instance,
  291. constants.TASK_TYPE_GET_INSTANCE_INFO,
  292. execution,
  293. depends_on=[validate_replica_source_inputs_task.id])
  294. validate_replica_destination_inputs_task = self._create_task(
  295. instance,
  296. constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS,
  297. execution,
  298. depends_on=[get_instance_info_task.id])
  299. depends_on = [
  300. validate_replica_source_inputs_task.id,
  301. validate_replica_destination_inputs_task.id]
  302. if shutdown_instances:
  303. shutdown_instance_task = self._create_task(
  304. instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
  305. execution, depends_on=depends_on)
  306. depends_on = [shutdown_instance_task.id]
  307. deploy_replica_disks_task = self._create_task(
  308. instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS,
  309. execution, depends_on=depends_on)
  310. deploy_replica_source_resources_task = self._create_task(
  311. instance,
  312. constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES,
  313. execution, depends_on=[deploy_replica_disks_task.id])
  314. deploy_replica_target_resources_task = self._create_task(
  315. instance,
  316. constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES,
  317. execution, depends_on=[
  318. deploy_replica_disks_task.id,
  319. deploy_replica_source_resources_task.id])
  320. replicate_disks_task = self._create_task(
  321. instance, constants.TASK_TYPE_REPLICATE_DISKS,
  322. execution, depends_on=[
  323. deploy_replica_source_resources_task.id,
  324. deploy_replica_target_resources_task.id])
  325. delete_source_resources_task = self._create_task(
  326. instance,
  327. constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
  328. execution,
  329. depends_on=[replicate_disks_task.id],
  330. on_error=True)
  331. self._create_task(
  332. instance,
  333. constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
  334. execution, depends_on=[
  335. replicate_disks_task.id, delete_source_resources_task.id],
  336. on_error=True)
  337. db_api.add_replica_tasks_execution(ctxt, execution)
  338. LOG.info("Replica tasks execution created: %s", execution.id)
  339. self._begin_tasks(ctxt, execution, replica.info)
  340. return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
  341. @replica_synchronized
  342. def get_replica_tasks_executions(self, ctxt, replica_id,
  343. include_tasks=False):
  344. return db_api.get_replica_tasks_executions(
  345. ctxt, replica_id, include_tasks)
  346. @tasks_execution_synchronized
  347. def get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
  348. return self._get_replica_tasks_execution(
  349. ctxt, replica_id, execution_id)
  350. @tasks_execution_synchronized
  351. def delete_replica_tasks_execution(self, ctxt, replica_id, execution_id):
  352. execution = self._get_replica_tasks_execution(
  353. ctxt, replica_id, execution_id)
  354. if execution.status == constants.EXECUTION_STATUS_RUNNING:
  355. raise exception.InvalidMigrationState(
  356. "Cannot delete a running replica tasks execution")
  357. db_api.delete_replica_tasks_execution(ctxt, execution_id)
  358. @tasks_execution_synchronized
  359. def cancel_replica_tasks_execution(self, ctxt, replica_id, execution_id,
  360. force):
  361. execution = self._get_replica_tasks_execution(
  362. ctxt, replica_id, execution_id)
  363. if execution.status != constants.EXECUTION_STATUS_RUNNING:
  364. raise exception.InvalidReplicaState(
  365. "The replica tasks execution is not running")
  366. self._cancel_tasks_execution(ctxt, execution, force)
  367. def _get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
  368. execution = db_api.get_replica_tasks_execution(
  369. ctxt, replica_id, execution_id)
  370. if not execution:
  371. raise exception.NotFound("Tasks execution not found")
  372. return execution
  373. def get_replicas(self, ctxt, include_tasks_executions=False):
  374. return db_api.get_replicas(ctxt, include_tasks_executions)
  375. @replica_synchronized
  376. def get_replica(self, ctxt, replica_id):
  377. return self._get_replica(ctxt, replica_id)
  378. @replica_synchronized
  379. def delete_replica(self, ctxt, replica_id):
  380. replica = self._get_replica(ctxt, replica_id)
  381. self._check_replica_running_executions(ctxt, replica)
  382. self._check_delete_reservation_for_transfer(replica)
  383. db_api.delete_replica(ctxt, replica_id)
  384. @replica_synchronized
  385. def delete_replica_disks(self, ctxt, replica_id):
  386. replica = self._get_replica(ctxt, replica_id)
  387. self._check_replica_running_executions(ctxt, replica)
  388. execution = models.TasksExecution()
  389. execution.id = str(uuid.uuid4())
  390. execution.status = constants.EXECUTION_STATUS_RUNNING
  391. execution.action = replica
  392. execution.type = constants.EXECUTION_TYPE_REPLICA_DISKS_DELETE
  393. has_tasks = False
  394. for instance in replica.instances:
  395. if (instance in replica.info and
  396. "volumes_info" in replica.info[instance]):
  397. self._create_task(
  398. instance, constants.TASK_TYPE_DELETE_REPLICA_DISKS,
  399. execution)
  400. has_tasks = True
  401. if not has_tasks:
  402. raise exception.InvalidReplicaState(
  403. "This replica does not have volumes information for any "
  404. "instance. Ensure that the replica has been executed "
  405. "successfully priorly")
  406. db_api.add_replica_tasks_execution(ctxt, execution)
  407. LOG.info("Replica tasks execution created: %s", execution.id)
  408. self._begin_tasks(ctxt, execution, replica.info)
  409. return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
  410. @staticmethod
  411. def _check_endpoints(ctxt, origin_endpoint, destination_endpoint):
  412. # TODO(alexpilotti): check Barbican secrets content as well
  413. if (origin_endpoint.connection_info ==
  414. destination_endpoint.connection_info):
  415. raise exception.SameDestination()
  416. def create_instances_replica(self, ctxt, origin_endpoint_id,
  417. destination_endpoint_id, source_environment,
  418. destination_environment, instances,
  419. network_map, storage_mappings, notes=None):
  420. origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id)
  421. destination_endpoint = self.get_endpoint(ctxt, destination_endpoint_id)
  422. self._check_endpoints(ctxt, origin_endpoint, destination_endpoint)
  423. replica = models.Replica()
  424. replica.id = str(uuid.uuid4())
  425. replica.origin_endpoint = origin_endpoint
  426. replica.destination_endpoint = destination_endpoint
  427. replica.destination_environment = destination_environment
  428. replica.source_environment = source_environment
  429. replica.instances = instances
  430. replica.executions = []
  431. replica.info = {}
  432. replica.notes = notes
  433. replica.network_map = network_map
  434. replica.storage_mappings = storage_mappings
  435. self._check_create_reservation_for_transfer(
  436. replica, licensing_client.RESERVATION_TYPE_REPLICA)
  437. db_api.add_replica(ctxt, replica)
  438. LOG.info("Replica created: %s", replica.id)
  439. return self.get_replica(ctxt, replica.id)
  440. def _get_replica(self, ctxt, replica_id):
  441. replica = db_api.get_replica(ctxt, replica_id)
  442. if not replica:
  443. raise exception.NotFound("Replica not found")
  444. return replica
  445. def get_migrations(self, ctxt, include_tasks):
  446. return db_api.get_migrations(ctxt, include_tasks)
  447. @migration_synchronized
  448. def get_migration(self, ctxt, migration_id):
  449. # the default serialization mechanism enforces a max_depth of 3
  450. return utils.to_dict(self._get_migration(ctxt, migration_id))
  451. @staticmethod
  452. def _check_running_replica_migrations(ctxt, replica_id):
  453. migrations = db_api.get_replica_migrations(ctxt, replica_id)
  454. if [m.id for m in migrations if m.executions[0].status ==
  455. constants.EXECUTION_STATUS_RUNNING]:
  456. raise exception.InvalidReplicaState(
  457. "This replica is currently being migrated")
  458. @staticmethod
  459. def _check_running_executions(action):
  460. if [e for e in action.executions
  461. if e.status == constants.EXECUTION_STATUS_RUNNING]:
  462. raise exception.InvalidActionTasksExecutionState(
  463. "Another tasks execution is in progress")
  464. def _check_replica_running_executions(self, ctxt, replica):
  465. self._check_running_executions(replica)
  466. self._check_running_replica_migrations(ctxt, replica.id)
  467. @staticmethod
  468. def _check_valid_replica_tasks_execution(replica, force=False):
  469. sorted_executions = sorted(
  470. replica.executions, key=lambda e: e.number, reverse=True)
  471. if not sorted_executions:
  472. raise exception.InvalidReplicaState(
  473. "The Replica has never been executed.")
  474. if not [e for e in sorted_executions
  475. if e.type == constants.EXECUTION_TYPE_REPLICA_EXECUTION and (
  476. e.status == constants.EXECUTION_STATUS_COMPLETED)]:
  477. if not force:
  478. raise exception.InvalidReplicaState(
  479. "A replica must have been executed successfully at least "
  480. "once in order to be migrated")
  481. def _get_provider_types(self, ctxt, endpoint):
  482. provider_types = self.get_available_providers(ctxt).get(endpoint.type)
  483. if provider_types is None:
  484. raise exception.NotFound(
  485. "No provider found for: %s" % endpoint.type)
  486. return provider_types["types"]
  487. @replica_synchronized
  488. def deploy_replica_instances(self, ctxt, replica_id, clone_disks, force,
  489. skip_os_morphing=False, user_scripts=None):
  490. replica = self._get_replica(ctxt, replica_id)
  491. self._check_reservation_for_transfer(replica)
  492. self._check_replica_running_executions(ctxt, replica)
  493. self._check_valid_replica_tasks_execution(replica, force)
  494. destination_endpoint = self.get_endpoint(
  495. ctxt, replica.destination_endpoint_id)
  496. destination_provider_types = self._get_provider_types(
  497. ctxt, destination_endpoint)
  498. for instance, info in replica.info.items():
  499. if not info.get("volumes_info"):
  500. raise exception.InvalidReplicaState(
  501. "The replica doesn't contain volumes information for "
  502. "instance: %s. If replicated disks are deleted, the "
  503. "replica needs to be executed anew before a migration can "
  504. "occur" % instance)
  505. instances = replica.instances
  506. migration = models.Migration()
  507. migration.id = str(uuid.uuid4())
  508. migration.origin_endpoint_id = replica.origin_endpoint_id
  509. migration.destination_endpoint_id = replica.destination_endpoint_id
  510. migration.destination_environment = replica.destination_environment
  511. migration.source_environment = replica.source_environment
  512. migration.network_map = replica.network_map
  513. migration.storage_mappings = replica.storage_mappings
  514. migration.instances = instances
  515. migration.replica = replica
  516. migration.info = replica.info
  517. for instance in instances:
  518. migration.info[instance]["clone_disks"] = clone_disks
  519. scripts = self._get_instance_scripts(user_scripts, instance)
  520. migration.info[instance]["user_scripts"] = scripts
  521. execution = models.TasksExecution()
  522. migration.executions = [execution]
  523. execution.status = constants.EXECUTION_STATUS_RUNNING
  524. execution.number = 1
  525. execution.type = constants.EXECUTION_TYPE_REPLICA_DEPLOY
  526. for instance in instances:
  527. validate_replica_desployment_inputs_task = self._create_task(
  528. instance,
  529. constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS,
  530. execution)
  531. create_snapshot_task_depends_on = [
  532. validate_replica_desployment_inputs_task.id]
  533. if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
  534. destination_provider_types):
  535. get_optimal_flavor_task = self._create_task(
  536. instance, constants.TASK_TYPE_GET_OPTIMAL_FLAVOR,
  537. execution, depends_on=[
  538. validate_replica_desployment_inputs_task.id])
  539. create_snapshot_task_depends_on.append(
  540. get_optimal_flavor_task.id)
  541. create_snapshot_task = self._create_task(
  542. instance, constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS,
  543. execution, depends_on=create_snapshot_task_depends_on)
  544. deploy_replica_task = self._create_task(
  545. instance, constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE,
  546. execution, [create_snapshot_task.id])
  547. if not skip_os_morphing:
  548. task_deploy_os_morphing_resources = self._create_task(
  549. instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
  550. execution, depends_on=[deploy_replica_task.id])
  551. task_osmorphing = self._create_task(
  552. instance, constants.TASK_TYPE_OS_MORPHING,
  553. execution, depends_on=[
  554. task_deploy_os_morphing_resources.id])
  555. task_delete_os_morphing_resources = self._create_task(
  556. instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
  557. execution, depends_on=[task_osmorphing.id],
  558. on_error=True)
  559. next_task = task_delete_os_morphing_resources
  560. else:
  561. next_task = deploy_replica_task
  562. finalize_deployment_task = self._create_task(
  563. instance,
  564. constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT,
  565. execution,
  566. depends_on=[next_task.id])
  567. self._create_task(
  568. instance, constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS,
  569. execution, depends_on=[finalize_deployment_task.id],
  570. on_error=clone_disks)
  571. cleanup_deployment_task = self._create_task(
  572. instance,
  573. constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT,
  574. execution, on_error=True)
  575. if not clone_disks:
  576. self._create_task(
  577. instance,
  578. constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS,
  579. execution,
  580. depends_on=[cleanup_deployment_task.id],
  581. on_error=True)
  582. db_api.add_migration(ctxt, migration)
  583. LOG.info("Migration created: %s", migration.id)
  584. self._begin_tasks(ctxt, execution, migration.info)
  585. return self.get_migration(ctxt, migration.id)
  586. def _get_instance_scripts(self, user_scripts, instance):
  587. user_scripts = user_scripts or {}
  588. ret = {
  589. "global": user_scripts.get("global", {}),
  590. "instances": {},
  591. }
  592. if user_scripts:
  593. instance_script = user_scripts.get(
  594. "instances", {}).get(instance)
  595. if instance_script:
  596. ret["instances"][instance] = instance_script
  597. return ret
  598. def migrate_instances(self, ctxt, origin_endpoint_id,
  599. destination_endpoint_id, source_environment,
  600. destination_environment, instances, network_map,
  601. storage_mappings, replication_count,
  602. shutdown_instances=False, notes=None,
  603. skip_os_morphing=False, user_scripts=None):
  604. origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id)
  605. destination_endpoint = self.get_endpoint(ctxt, destination_endpoint_id)
  606. self._check_endpoints(ctxt, origin_endpoint, destination_endpoint)
  607. destination_provider_types = self._get_provider_types(
  608. ctxt, destination_endpoint)
  609. migration = models.Migration()
  610. migration.id = str(uuid.uuid4())
  611. migration.origin_endpoint = origin_endpoint
  612. migration.destination_endpoint = destination_endpoint
  613. migration.destination_environment = destination_environment
  614. migration.source_environment = source_environment
  615. migration.network_map = network_map
  616. migration.storage_mappings = storage_mappings
  617. execution = models.TasksExecution()
  618. execution.status = constants.EXECUTION_STATUS_RUNNING
  619. execution.number = 1
  620. execution.type = constants.EXECUTION_TYPE_MIGRATION
  621. migration.executions = [execution]
  622. migration.instances = instances
  623. migration.info = {}
  624. migration.notes = notes
  625. migration.shutdown_instances = shutdown_instances
  626. migration.replication_count = replication_count
  627. self._check_create_reservation_for_transfer(
  628. migration, licensing_client.RESERVATION_TYPE_MIGRATION)
  629. for instance in instances:
  630. # NOTE: we must explicitly set this in each VM's info
  631. # to prevent the Replica disks from being cloned:
  632. migration.info[instance] = {"clone_disks": False}
  633. scripts = self._get_instance_scripts(user_scripts, instance)
  634. migration.info[instance]["user_scripts"] = scripts
  635. validate_migration_source_inputs_task = self._create_task(
  636. instance,
  637. constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS,
  638. execution)
  639. get_instance_info_task = self._create_task(
  640. instance,
  641. constants.TASK_TYPE_GET_INSTANCE_INFO,
  642. execution,
  643. depends_on=[validate_migration_source_inputs_task.id])
  644. validate_migration_destination_inputs_task = self._create_task(
  645. instance,
  646. constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS,
  647. execution,
  648. depends_on=[get_instance_info_task.id])
  649. depends_on = [
  650. validate_migration_source_inputs_task.id,
  651. validate_migration_destination_inputs_task.id]
  652. create_instance_disks_task = self._create_task(
  653. instance, constants.TASK_TYPE_CREATE_INSTANCE_DISKS,
  654. execution, depends_on=depends_on)
  655. deploy_migration_source_resources_task = self._create_task(
  656. instance,
  657. constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES,
  658. execution, depends_on=[create_instance_disks_task.id])
  659. deploy_migration_target_resources_task = self._create_task(
  660. instance,
  661. constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES,
  662. execution, depends_on=[
  663. create_instance_disks_task.id,
  664. deploy_migration_source_resources_task.id])
  665. # NOTE(aznashwan): re-executing the REPLICATE_DISKS task only works
  666. # if all the source disk snapshotting and worker setup steps are
  667. # performed by the source plugin in REPLICATE_DISKS.
  668. # This should no longer be a problem when worker pooling lands.
  669. # Alternatively, if the DEPLOY_REPLICA_SOURCE/DEST_RESOURCES tasks
  670. # will no longer have a state conflict, iterating through and
  671. # re-executing DEPLOY_REPLICA_SOURCE_RESOURCES will be required:
  672. last_migration_task = None
  673. migration_resources_tasks = [
  674. deploy_migration_source_resources_task.id,
  675. deploy_migration_target_resources_task.id]
  676. for i in range(migration.replication_count):
  677. # insert SHUTDOWN_INSTANCES task before the last sync:
  678. if i == (migration.replication_count - 1) and (
  679. migration.shutdown_instances):
  680. shutdown_deps = migration_resources_tasks
  681. if last_migration_task:
  682. shutdown_deps = [last_migration_task.id]
  683. last_migration_task = self._create_task(
  684. instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
  685. execution, depends_on=shutdown_deps)
  686. replication_deps = migration_resources_tasks
  687. if last_migration_task:
  688. replication_deps = [last_migration_task.id]
  689. last_migration_task = self._create_task(
  690. instance, constants.TASK_TYPE_REPLICATE_DISKS,
  691. execution, depends_on=replication_deps)
  692. delete_source_resources_task = self._create_task(
  693. instance,
  694. constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES,
  695. execution, depends_on=[last_migration_task.id],
  696. on_error=True)
  697. delete_destination_resources_task = self._create_task(
  698. instance,
  699. constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES,
  700. execution, depends_on=[
  701. last_migration_task.id,
  702. delete_source_resources_task.id],
  703. on_error=True)
  704. deploy_instance_task = self._create_task(
  705. instance, constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES,
  706. execution, depends_on=[
  707. delete_source_resources_task.id,
  708. delete_destination_resources_task.id])
  709. last_task = deploy_instance_task
  710. if not skip_os_morphing:
  711. task_deploy_os_morphing_resources = self._create_task(
  712. instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
  713. execution, depends_on=[last_task.id])
  714. task_osmorphing = self._create_task(
  715. instance, constants.TASK_TYPE_OS_MORPHING,
  716. execution, depends_on=[
  717. task_deploy_os_morphing_resources.id])
  718. task_delete_os_morphing_resources = self._create_task(
  719. instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
  720. execution, depends_on=[task_osmorphing.id],
  721. on_error=True)
  722. last_task = task_delete_os_morphing_resources
  723. if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
  724. destination_provider_types):
  725. get_optimal_flavor_task = self._create_task(
  726. instance, constants.TASK_TYPE_GET_OPTIMAL_FLAVOR,
  727. execution, depends_on=[last_task.id])
  728. last_task = get_optimal_flavor_task
  729. self._create_task(
  730. instance,
  731. constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT,
  732. execution, depends_on=[last_task.id])
  733. self._create_task(
  734. instance,
  735. constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT,
  736. execution, on_error=True)
  737. self._create_task(
  738. instance, constants.TASK_TYPE_CLEANUP_INSTANCE_STORAGE,
  739. execution, on_error=True)
  740. db_api.add_migration(ctxt, migration)
  741. LOG.info("Migration created: %s", migration.id)
  742. self._begin_tasks(ctxt, execution)
  743. return self.get_migration(ctxt, migration.id)
  744. def _get_migration(self, ctxt, migration_id):
  745. migration = db_api.get_migration(ctxt, migration_id)
  746. if not migration:
  747. raise exception.NotFound("Migration not found")
  748. return migration
  749. @migration_synchronized
  750. def delete_migration(self, ctxt, migration_id):
  751. migration = self._get_migration(ctxt, migration_id)
  752. execution = migration.executions[0]
  753. if execution.status == constants.EXECUTION_STATUS_RUNNING:
  754. raise exception.InvalidMigrationState(
  755. "Cannot delete a running migration")
  756. db_api.delete_migration(ctxt, migration_id)
  757. @migration_synchronized
  758. def cancel_migration(self, ctxt, migration_id, force):
  759. migration = self._get_migration(ctxt, migration_id)
  760. execution = migration.executions[0]
  761. if execution.status != constants.EXECUTION_STATUS_RUNNING:
  762. raise exception.InvalidMigrationState(
  763. "The migration is not running")
  764. execution = migration.executions[0]
  765. self._cancel_tasks_execution(ctxt, execution, force)
  766. self._check_delete_reservation_for_transfer(migration)
  767. def _cancel_tasks_execution(self, ctxt, execution, force=False):
  768. has_error_tasks = False
  769. has_running_tasks = False
  770. for task in execution.tasks:
  771. if task.on_error:
  772. # NOTE: always allow on_error tasks to execute
  773. # as they may do required cleanup:
  774. has_error_tasks = True
  775. continue
  776. if task.status == constants.TASK_STATUS_RUNNING:
  777. self._rpc_worker_client.cancel_task(
  778. ctxt, task.host, task.id, task.process_id, force)
  779. has_running_tasks = True
  780. elif task.status == constants.TASK_STATUS_PENDING:
  781. db_api.set_task_status(
  782. ctxt, task.id, constants.TASK_STATUS_CANCELED)
  783. if not has_running_tasks:
  784. try:
  785. origin = self._get_task_origin(ctxt, execution.action)
  786. destination = self._get_task_destination(
  787. ctxt, execution.action)
  788. for task in execution.tasks:
  789. if task.status in [constants.TASK_STATUS_PENDING,
  790. constants.TASK_STATUS_ON_ERROR_ONLY]:
  791. if task.on_error:
  792. action = db_api.get_action(
  793. ctxt, execution.action_id)
  794. task_info = action.info.get(task.instance, {})
  795. self._rpc_worker_client.begin_task(
  796. ctxt, server=None,
  797. task_id=task.id,
  798. task_type=task.task_type,
  799. origin=origin,
  800. destination=destination,
  801. instance=task.instance,
  802. task_info=task_info)
  803. has_running_tasks = True
  804. except exception.NotFound as ex:
  805. LOG.error("A required endpoint could not be found")
  806. LOG.exception(ex)
  807. # NOTE: only set the whole execution to 'ERROR' if nothing's
  808. # running and no on_error=True tasks are pending.
  809. # Otherwise, the lifecycle of the rest of the execution will
  810. # be governed in `task_completed` when any of the
  811. # running/pending tasks finish:
  812. if not has_running_tasks and not has_error_tasks:
  813. self._set_tasks_execution_status(
  814. ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
  815. @staticmethod
  816. def _set_tasks_execution_status(ctxt, execution_id, execution_status):
  817. LOG.info("Tasks execution %(id)s completed with status: %(status)s",
  818. {"id": execution_id, "status": execution_status})
  819. db_api.set_execution_status(ctxt, execution_id, execution_status)
  820. if ctxt.delete_trust_id:
  821. keystone.delete_trust(ctxt)
  822. @task_synchronized
  823. def set_task_host(self, ctxt, task_id, host, process_id):
  824. db_api.set_task_host(ctxt, task_id, host, process_id)
  825. db_api.set_task_status(
  826. ctxt, task_id, constants.TASK_STATUS_RUNNING)
  827. def _start_pending_tasks(self, ctxt, execution, parent_task, task_info):
  828. origin = self._get_task_origin(ctxt, execution.action)
  829. destination = self._get_task_destination(ctxt, execution.action)
  830. for task in execution.tasks:
  831. if task.status == constants.TASK_STATUS_PENDING:
  832. if task.depends_on and parent_task.id in task.depends_on:
  833. start_task = True
  834. for depend_task_id in task.depends_on:
  835. if depend_task_id != parent_task.id:
  836. depend_task = db_api.get_task(ctxt, depend_task_id)
  837. if (depend_task.status !=
  838. constants.TASK_STATUS_COMPLETED):
  839. start_task = False
  840. break
  841. if start_task:
  842. server = None
  843. self._rpc_worker_client.begin_task(
  844. ctxt, server=server,
  845. task_id=task.id,
  846. task_type=task.task_type,
  847. origin=origin,
  848. destination=destination,
  849. instance=task.instance,
  850. task_info=task_info)
  851. def _update_replica_volumes_info(self, ctxt, replica_id, instance,
  852. updated_task_info):
  853. """ WARN: the lock for the Replica must be pre-acquired. """
  854. db_api.set_transfer_action_info(
  855. ctxt, replica_id, instance,
  856. updated_task_info)
  857. def _update_volumes_info_for_migration_parent_replica(
  858. self, ctxt, migration_id, instance, updated_task_info):
  859. migration = db_api.get_migration(ctxt, migration_id)
  860. replica_id = migration.replica_id
  861. with lockutils.lock(replica_id):
  862. LOG.debug(
  863. "Updating volume_info in replica due to snapshot "
  864. "restore during migration. replica id: %s", replica_id)
  865. self._update_replica_volumes_info(
  866. ctxt, replica_id, instance, updated_task_info)
  867. def _handle_post_task_actions(self, ctxt, task, execution, task_info):
  868. task_type = task.task_type
  869. if task_type == constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
  870. # When restoring a snapshot in some import providers (OpenStack),
  871. # a new volume_id is generated. This needs to be updated in the
  872. # Replica instance as well.
  873. volumes_info = task_info.get("volumes_info")
  874. if volumes_info:
  875. self._update_volumes_info_for_migration_parent_replica(
  876. ctxt, execution.action_id, task.instance,
  877. {"volumes_info": volumes_info})
  878. elif task_type == constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS:
  879. if not task_info.get("clone_disks"):
  880. # The migration completed. If the replica is executed again,
  881. # new volumes need to be deployed in place of the migrated
  882. # ones.
  883. self._update_volumes_info_for_migration_parent_replica(
  884. ctxt, execution.action_id, task.instance,
  885. {"volumes_info": None})
  886. elif task_type in (
  887. constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT,
  888. constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT):
  889. # set 'transfer_result' in the 'base_transfer_action'
  890. # table if the task returned a result.
  891. if "transfer_result" in task_info:
  892. transfer_result = task_info.get("transfer_result")
  893. try:
  894. schemas.validate_value(
  895. transfer_result,
  896. schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
  897. LOG.debug(
  898. "Setting result for transfer action '%s': %s",
  899. execution.action_id, transfer_result)
  900. db_api.set_transfer_action_result(
  901. ctxt, execution.action_id, task.instance,
  902. transfer_result)
  903. except exception.SchemaValidationException:
  904. LOG.warn(
  905. "Could not validate transfer result '%s' against the "
  906. "VM export info schema. NOT saving value in Database. "
  907. "Exception details: %s",
  908. transfer_result, utils.get_exception_details())
  909. else:
  910. LOG.debug(
  911. "No 'transfer_result' was returned for task type '%s' "
  912. "for transfer action '%s'", task_type, execution.action_id)
  913. elif task_type in (
  914. constants.TASK_TYPE_UPDATE_SOURCE_REPLICA,
  915. constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA):
  916. # NOTE: perform the actual db update on the Replica's properties:
  917. db_api.update_replica(ctxt, execution.action_id, task_info)
  918. # NOTE: remember to update the `volumes_info`:
  919. # NOTE: considering this method is only called with a lock on the
  920. # `execution.action_id` (in a Replica update tasks' case that's the
  921. # ID of the Replica itself) we can safely call
  922. # `_update_replica_volumes_info` below:
  923. self._update_replica_volumes_info(
  924. ctxt, execution.action_id, task.instance,
  925. {"volumes_info": task_info.get("volumes_info")})
  926. @task_synchronized
  927. def task_completed(self, ctxt, task_id, task_info):
  928. LOG.info("Task completed: %s", task_id)
  929. db_api.set_task_status(
  930. ctxt, task_id, constants.TASK_STATUS_COMPLETED)
  931. task = db_api.get_task(ctxt, task_id)
  932. execution = db_api.get_tasks_execution(ctxt, task.execution_id)
  933. task_error_states = [
  934. constants.TASK_STATUS_ERROR,
  935. constants.TASK_STATUS_CANCELED,
  936. constants.TASK_STATUS_CANCELED_FOR_DEBUGGING]
  937. action_id = execution.action_id
  938. with lockutils.lock(action_id):
  939. LOG.info("Setting instance %(instance)s "
  940. "action info: %(task_info)s",
  941. {"instance": task.instance, "task_info": task_info})
  942. updated_task_info = db_api.set_transfer_action_info(
  943. ctxt, action_id, task.instance, task_info)
  944. self._handle_post_task_actions(
  945. ctxt, task, execution, updated_task_info)
  946. if execution.status == constants.EXECUTION_STATUS_RUNNING:
  947. self._start_pending_tasks(
  948. ctxt, execution, task, updated_task_info)
  949. if not [t for t in execution.tasks
  950. if t.status in [constants.TASK_STATUS_RUNNING,
  951. constants.TASK_STATUS_PENDING]]:
  952. # The execution is in error status if there's one or more
  953. # tasks in error or canceled status
  954. if [t for t in execution.tasks
  955. if t.status in task_error_states]:
  956. execution_status = constants.EXECUTION_STATUS_ERROR
  957. else:
  958. execution_status = constants.EXECUTION_STATUS_COMPLETED
  959. self._set_tasks_execution_status(
  960. ctxt, execution.id, execution_status)
  961. def _cancel_execution_for_osmorphing_debugging(self, ctxt, execution):
  962. # go through all scheduled tasks and cancel them:
  963. for subtask in execution.tasks:
  964. if subtask.task_type == constants.TASK_TYPE_OS_MORPHING:
  965. continue
  966. if subtask.status == constants.TASK_STATUS_RUNNING:
  967. raise execution.CoriolisException(
  968. "Task %s is still running although it should not!",
  969. subtask.id)
  970. if subtask.status in [
  971. constants.TASK_STATUS_PENDING,
  972. constants.TASK_STATUS_ON_ERROR_ONLY]:
  973. db_api.set_task_status(
  974. ctxt, subtask.id,
  975. constants.TASK_STATUS_CANCELED_FOR_DEBUGGING)
  976. @task_synchronized
  977. def set_task_error(self, ctxt, task_id, exception_details):
  978. LOG.error("Task error: %(task_id)s - %(ex)s",
  979. {"task_id": task_id, "ex": exception_details})
  980. db_api.set_task_status(
  981. ctxt, task_id, constants.TASK_STATUS_ERROR, exception_details)
  982. task = db_api.get_task(ctxt, task_id)
  983. execution = db_api.get_tasks_execution(ctxt, task.execution_id)
  984. action_id = execution.action_id
  985. action = db_api.get_action(ctxt, action_id)
  986. with lockutils.lock(action_id):
  987. if task.task_type == constants.TASK_TYPE_OS_MORPHING and (
  988. CONF.conductor.debug_os_morphing_errors):
  989. LOG.debug("Attempting to cancel execution '%s' for OSMorphing "
  990. "debugging.", execution.id)
  991. # NOTE: the OSMorphing task always runs by itself so no
  992. # further tasks should be running, but we double-check here:
  993. running = [
  994. t for t in execution.tasks
  995. if t.status == constants.TASK_STATUS_RUNNING
  996. and t.task_type != constants.TASK_TYPE_OS_MORPHING]
  997. if not running:
  998. self._cancel_execution_for_osmorphing_debugging(
  999. ctxt, execution)
  1000. LOG.warn(
  1001. "All subtasks for Migration '%s' have been cancelled "
  1002. "to allow for OSMorphing debugging. The connection "
  1003. "info for the worker VM is: %s",
  1004. action_id, action.info.get(task.instance, {}).get(
  1005. 'osmorphing_connection_info', {}))
  1006. self._set_tasks_execution_status(
  1007. ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
  1008. else:
  1009. LOG.warn(
  1010. "Some tasks are running in parallel with the "
  1011. "OSMorphing task, a debug setup cannot be safely "
  1012. "achieved. Proceeding with cleanup tasks as usual.")
  1013. self._cancel_tasks_execution(ctxt, execution)
  1014. else:
  1015. self._cancel_tasks_execution(ctxt, execution)
  1016. # NOTE: if this is a migration, make sure to delete
  1017. # its associated reservation.
  1018. if action.type == "migration":
  1019. self._check_delete_reservation_for_transfer(action)
  1020. @task_synchronized
  1021. def task_event(self, ctxt, task_id, level, message):
  1022. LOG.info("Task event: %s", task_id)
  1023. db_api.add_task_event(ctxt, task_id, level, message)
  1024. @task_synchronized
  1025. def task_progress_update(self, ctxt, task_id, current_step, total_steps,
  1026. message):
  1027. LOG.info("Task progress update: %s", task_id)
  1028. db_api.add_task_progress_update(ctxt, task_id, current_step,
  1029. total_steps, message)
  1030. def _get_replica_schedule(self, ctxt, replica_id,
  1031. schedule_id, expired=True):
  1032. schedule = db_api.get_replica_schedule(
  1033. ctxt, replica_id, schedule_id, expired=expired)
  1034. if not schedule:
  1035. raise exception.NotFound("Schedule not found")
  1036. return schedule
  1037. def create_replica_schedule(self, ctxt, replica_id,
  1038. schedule, enabled, exp_date,
  1039. shutdown_instance):
  1040. keystone.create_trust(ctxt)
  1041. replica = self._get_replica(ctxt, replica_id)
  1042. replica_schedule = models.ReplicaSchedule()
  1043. replica_schedule.id = str(uuid.uuid4())
  1044. replica_schedule.replica = replica
  1045. replica_schedule.replica_id = replica_id
  1046. replica_schedule.schedule = schedule
  1047. replica_schedule.expiration_date = exp_date
  1048. replica_schedule.enabled = enabled
  1049. replica_schedule.shutdown_instance = shutdown_instance
  1050. replica_schedule.trust_id = ctxt.trust_id
  1051. db_api.add_replica_schedule(
  1052. ctxt, replica_schedule,
  1053. lambda ctxt, sched: self._replica_cron_client.register(
  1054. ctxt, sched))
  1055. return self.get_replica_schedule(
  1056. ctxt, replica_id, replica_schedule.id)
  1057. @schedule_synchronized
  1058. def update_replica_schedule(self, ctxt, replica_id, schedule_id,
  1059. updated_values):
  1060. db_api.update_replica_schedule(
  1061. ctxt, replica_id, schedule_id, updated_values, None,
  1062. lambda ctxt, sched: self._replica_cron_client.register(
  1063. ctxt, sched))
  1064. return self._get_replica_schedule(ctxt, replica_id, schedule_id)
  1065. def _cleanup_schedule_resources(self, ctxt, schedule):
  1066. self._replica_cron_client.unregister(ctxt, schedule)
  1067. if schedule.trust_id:
  1068. tmp_trust = context.get_admin_context(
  1069. trust_id=schedule.trust_id)
  1070. keystone.delete_trust(tmp_trust)
  1071. @schedule_synchronized
  1072. def delete_replica_schedule(self, ctxt, replica_id, schedule_id):
  1073. db_api.delete_replica_schedule(
  1074. ctxt, replica_id, schedule_id, None,
  1075. lambda ctxt, sched: self._cleanup_schedule_resources(
  1076. ctxt, sched))
  1077. @replica_synchronized
  1078. def get_replica_schedules(self, ctxt, replica_id=None, expired=True):
  1079. return db_api.get_replica_schedules(
  1080. ctxt, replica_id=replica_id, expired=expired)
  1081. @schedule_synchronized
  1082. def get_replica_schedule(self, ctxt, replica_id,
  1083. schedule_id, expired=True):
  1084. schedule = self._get_replica_schedule(
  1085. ctxt, replica_id, schedule_id, expired=True)
  1086. if not schedule:
  1087. raise exception.NotFound("Schedule not found")
  1088. return schedule
  1089. @replica_synchronized
  1090. def update_replica(
  1091. self, ctxt, replica_id, properties):
  1092. replica = self._get_replica(ctxt, replica_id)
  1093. self._check_replica_running_executions(ctxt, replica)
  1094. self._check_valid_replica_tasks_execution(replica, force=True)
  1095. execution = models.TasksExecution()
  1096. execution.id = str(uuid.uuid4())
  1097. execution.status = constants.EXECUTION_STATUS_RUNNING
  1098. execution.action = replica
  1099. execution.type = constants.EXECUTION_TYPE_REPLICA_UPDATE
  1100. LOG.debug(
  1101. "Replica '%s' info pre-replica-update: %s",
  1102. replica_id, replica.info)
  1103. for instance in execution.action.instances:
  1104. # NOTE: "circular assignment" would lead to a `None` value
  1105. # so we must operate on a copy:
  1106. inst_info_copy = copy.deepcopy(replica.info[instance])
  1107. inst_info_copy.update(properties)
  1108. replica.info[instance] = inst_info_copy
  1109. get_instance_info_task = self._create_task(
  1110. instance, constants.TASK_TYPE_GET_INSTANCE_INFO,
  1111. execution)
  1112. update_source_replica_task = self._create_task(
  1113. instance, constants.TASK_TYPE_UPDATE_SOURCE_REPLICA,
  1114. execution)
  1115. self._create_task(
  1116. instance, constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA,
  1117. execution,
  1118. depends_on=[
  1119. get_instance_info_task.id,
  1120. update_source_replica_task.id])
  1121. LOG.debug(
  1122. "Replica '%s' info post-replica-update: %s",
  1123. replica_id, replica.info)
  1124. db_api.add_replica_tasks_execution(ctxt, execution)
  1125. LOG.debug("Execution for Replica update tasks created: %s",
  1126. execution.id)
  1127. self._begin_tasks(ctxt, execution, replica.info)
  1128. return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
  1129. def get_diagnostics(self, ctxt):
  1130. return utils.get_diagnostics_info()