api.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089
  1. # Copyright 2016 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. from oslo_config import cfg
  4. from oslo_db import api as db_api
  5. from oslo_db import options as db_options
  6. from oslo_db.sqlalchemy import enginefacade
  7. from oslo_log import log as logging
  8. from oslo_utils import timeutils
  9. from sqlalchemy import func
  10. from sqlalchemy import or_
  11. from sqlalchemy import orm
  12. from sqlalchemy.sql import null
  13. from coriolis.db.sqlalchemy import models
  14. from coriolis import exception
  15. from coriolis import utils
  16. CONF = cfg.CONF
  17. db_options.set_defaults(CONF)
  18. LOG = logging.getLogger(__name__)
  19. _BACKEND_MAPPING = {'sqlalchemy': 'coriolis.db.sqlalchemy.api'}
  20. IMPL = db_api.DBAPI.from_config(CONF, backend_mapping=_BACKEND_MAPPING)
  21. def get_engine():
  22. return IMPL.get_engine()
  23. def get_session():
  24. return IMPL.get_session()
  25. def db_sync(engine, version=None):
  26. """Migrate the database to `version` or the most recent version."""
  27. return IMPL.db_sync(engine, version=version)
  28. def db_version(engine):
  29. """Display the current database version."""
  30. return IMPL.db_version(engine)
  31. def _session(context):
  32. return (context and context.session) or get_session()
  33. def is_user_context(context):
  34. """Indicates if the request context is a normal user."""
  35. if not context:
  36. return False
  37. if not context.user_id or not context.project_id:
  38. return False
  39. if context.is_admin:
  40. return False
  41. return True
  42. def _model_query(context, *args):
  43. session = _session(context)
  44. return session.query(*args)
  45. def _update_sqlalchemy_object_fields(obj, updateable_fields, values_to_update):
  46. """ Updates the given 'values_to_update' on the provided sqlalchemy object
  47. as long as they are included as 'updateable_fields'.
  48. :param obj: object: sqlalchemy object
  49. :param updateable_fields: list(str): list of fields which are updateable
  50. :param values_to_update: dict: dict with the key/vals to update
  51. """
  52. if not isinstance(values_to_update, dict):
  53. raise exception.InvalidInput(
  54. "Properties to update for DB object of type '%s' must be a dict, "
  55. "got the following (type %s): %s" % (
  56. type(obj), type(values_to_update), values_to_update))
  57. non_updateable_fields = set(
  58. values_to_update.keys()).difference(
  59. set(updateable_fields))
  60. if non_updateable_fields:
  61. raise exception.Conflict(
  62. "Fields %s for '%s' database cannot be updated. "
  63. "Only updateable fields are: %s" % (
  64. non_updateable_fields, type(obj), updateable_fields))
  65. for field_name, field_val in values_to_update.items():
  66. if not hasattr(obj, field_name):
  67. raise exception.InvalidInput(
  68. "No region field named '%s' to update." % field_name)
  69. setattr(obj, field_name, field_val)
  70. LOG.debug(
  71. "Successfully updated the following fields on DB object "
  72. "of type '%s': %s" % (type(obj), values_to_update.keys()))
  73. def _get_replica_schedules_filter(context, replica_id=None,
  74. schedule_id=None, expired=True):
  75. now = timeutils.utcnow()
  76. q = _soft_delete_aware_query(context, models.ReplicaSchedule)
  77. q = q.join(models.Replica)
  78. sched_filter = q.filter()
  79. if is_user_context(context):
  80. sched_filter = sched_filter.filter(
  81. models.Replica.project_id == context.tenant)
  82. if replica_id:
  83. sched_filter = sched_filter.filter(
  84. models.Replica.id == replica_id)
  85. if schedule_id:
  86. sched_filter = sched_filter.filter(
  87. models.ReplicaSchedule.id == schedule_id)
  88. if not expired:
  89. sched_filter = sched_filter.filter(
  90. or_(models.ReplicaSchedule.expiration_date == null(),
  91. models.ReplicaSchedule.expiration_date > now))
  92. return sched_filter
  93. def _soft_delete_aware_query(context, *args, **kwargs):
  94. """Query helper that accounts for context's `show_deleted` field.
  95. :param show_deleted: if True, overrides context's show_deleted field.
  96. """
  97. query = _model_query(context, *args)
  98. show_deleted = kwargs.get('show_deleted')
  99. if context and context.show_deleted:
  100. show_deleted = True
  101. if not show_deleted:
  102. query = query.filter_by(deleted_at=None)
  103. return query
  104. @enginefacade.reader
  105. def get_endpoints(context):
  106. q = _soft_delete_aware_query(context, models.Endpoint).options(
  107. orm.joinedload('mapped_regions'))
  108. if is_user_context(context):
  109. q = q.filter(
  110. models.Endpoint.project_id == context.tenant)
  111. return q.filter().all()
  112. @enginefacade.reader
  113. def get_endpoint(context, endpoint_id):
  114. q = _soft_delete_aware_query(context, models.Endpoint).options(
  115. orm.joinedload('mapped_regions'))
  116. if is_user_context(context):
  117. q = q.filter(
  118. models.Endpoint.project_id == context.tenant)
  119. return q.filter(
  120. models.Endpoint.id == endpoint_id).first()
  121. @enginefacade.writer
  122. def add_endpoint(context, endpoint):
  123. endpoint.user_id = context.user
  124. endpoint.project_id = context.tenant
  125. _session(context).add(endpoint)
  126. @enginefacade.writer
  127. def update_endpoint(context, endpoint_id, updated_values):
  128. endpoint = get_endpoint(context, endpoint_id)
  129. if not endpoint:
  130. raise exception.NotFound("Endpoint with ID '%s' found" % endpoint_id)
  131. if not isinstance(updated_values, dict):
  132. raise exception.InvalidInput(
  133. "Update payload for endpoints must be a dict. Got the following "
  134. "(type: %s): %s" % (type(updated_values), updated_values))
  135. def _try_unmap_regions(region_ids):
  136. for region_to_unmap in region_ids:
  137. try:
  138. LOG.debug(
  139. "Attempting to unmap region '%s' from endpoint '%s'",
  140. region_to_unmap, endpoint_id)
  141. delete_endpoint_region_mapping(
  142. context, endpoint_id, region_to_unmap)
  143. except Exception as ex:
  144. LOG.warn(
  145. "Exception occurred while attempting to unmap region '%s' "
  146. "from endpoint '%s'. Ignoring. Error was: %s",
  147. region_to_unmap, endpoint_id,
  148. utils.get_exception_details())
  149. newly_mapped_regions = []
  150. regions_to_unmap = []
  151. # NOTE: `.pop()` required for `_update_sqlalchemy_object_fields` call:
  152. desired_region_mappings = updated_values.pop('mapped_regions', None)
  153. if desired_region_mappings is not None:
  154. # ensure all requested regions exist:
  155. for region_id in desired_region_mappings:
  156. region = get_region(context, region_id)
  157. if not region:
  158. raise exception.NotFound(
  159. "Could not find region with ID '%s' for associating "
  160. "with endpoint '%s' during update process." % (
  161. region_id, endpoint_id))
  162. # get all existing mappings:
  163. existing_region_mappings = [
  164. mapping.region_id
  165. for mapping in get_region_mappings_for_endpoint(
  166. context, endpoint_id)]
  167. # check and add new mappings:
  168. to_map = set(
  169. desired_region_mappings).difference(set(existing_region_mappings))
  170. regions_to_unmap = set(
  171. existing_region_mappings).difference(set(desired_region_mappings))
  172. LOG.debug(
  173. "Remapping regions for endpoint '%s' from %s to %s",
  174. endpoint_id, existing_region_mappings, desired_region_mappings)
  175. region_id = None
  176. try:
  177. for region_id in to_map:
  178. mapping = models.EndpointRegionMapping()
  179. mapping.region_id = region_id
  180. mapping.endpoint_id = endpoint_id
  181. add_endpoint_region_mapping(context, mapping)
  182. newly_mapped_regions.append(region_id)
  183. except Exception as ex:
  184. LOG.warn(
  185. "Exception occurred while adding region mapping for '%s' to "
  186. "endpoint '%s'. Cleaning up created mappings (%s). Error was: "
  187. "%s", region_id, endpoint_id, newly_mapped_regions,
  188. utils.get_exception_details())
  189. _try_unmap_regions(newly_mapped_regions)
  190. raise
  191. updateable_fields = ["name", "description", "connection_info"]
  192. try:
  193. _update_sqlalchemy_object_fields(
  194. endpoint, updateable_fields, updated_values)
  195. except Exception as ex:
  196. LOG.warn(
  197. "Exception occurred while updating fields of endpoint '%s'. "
  198. "Cleaning ""up created mappings (%s). Error was: %s",
  199. endpoint_id, newly_mapped_regions, utils.get_exception_details())
  200. _try_unmap_regions(newly_mapped_regions)
  201. raise
  202. # remove all of the old region mappings:
  203. LOG.debug(
  204. "Unmapping the following regions during update of endpoint '%s': %s",
  205. endpoint_id, regions_to_unmap)
  206. _try_unmap_regions(regions_to_unmap)
  207. @enginefacade.writer
  208. def delete_endpoint(context, endpoint_id):
  209. endpoint = get_endpoint(context, endpoint_id)
  210. args = {"id": endpoint_id}
  211. if is_user_context(context):
  212. args["project_id"] = context.tenant
  213. count = _soft_delete_aware_query(context, models.Endpoint).filter_by(
  214. **args).soft_delete()
  215. if count == 0:
  216. raise exception.NotFound("0 Endpoint entries were soft deleted")
  217. # NOTE(aznashwan): many-to-many tables with soft deletion on either end of
  218. # the association are not handled properly so we must manually delete each
  219. # association ourselves:
  220. for reg in endpoint.mapped_regions:
  221. delete_endpoint_region_mapping(context, endpoint_id, reg.id)
  222. @enginefacade.reader
  223. def get_replica_tasks_executions(context, replica_id, include_tasks=False):
  224. q = _soft_delete_aware_query(context, models.TasksExecution)
  225. q = q.join(models.Replica)
  226. if include_tasks:
  227. q = _get_tasks_with_details_options(q)
  228. if is_user_context(context):
  229. q = q.filter(models.Replica.project_id == context.tenant)
  230. return q.filter(
  231. models.Replica.id == replica_id).all()
  232. @enginefacade.reader
  233. def get_replica_tasks_execution(context, replica_id, execution_id):
  234. q = _soft_delete_aware_query(context, models.TasksExecution).join(
  235. models.Replica)
  236. q = _get_tasks_with_details_options(q)
  237. if is_user_context(context):
  238. q = q.filter(models.Replica.project_id == context.tenant)
  239. return q.filter(
  240. models.Replica.id == replica_id,
  241. models.TasksExecution.id == execution_id).first()
  242. @enginefacade.writer
  243. def add_replica_tasks_execution(context, execution):
  244. if is_user_context(context):
  245. if execution.action.project_id != context.tenant:
  246. raise exception.NotAuthorized()
  247. # include deleted records
  248. max_number = _model_query(
  249. context, func.max(models.TasksExecution.number)).filter_by(
  250. action_id=execution.action.id).first()[0] or 0
  251. execution.number = max_number + 1
  252. _session(context).add(execution)
  253. @enginefacade.writer
  254. def delete_replica_tasks_execution(context, execution_id):
  255. q = _soft_delete_aware_query(context, models.TasksExecution).filter(
  256. models.TasksExecution.id == execution_id)
  257. if is_user_context(context):
  258. if not q.join(models.Replica).filter(
  259. models.Replica.project_id == context.tenant).first():
  260. raise exception.NotAuthorized()
  261. count = q.soft_delete()
  262. if count == 0:
  263. raise exception.NotFound("0 entries were soft deleted")
  264. @enginefacade.reader
  265. def get_replica_schedules(context, replica_id=None, expired=True):
  266. sched_filter = _get_replica_schedules_filter(
  267. context, replica_id=replica_id, expired=expired)
  268. return sched_filter.all()
  269. @enginefacade.reader
  270. def get_replica_schedule(context, replica_id, schedule_id, expired=True):
  271. sched_filter = _get_replica_schedules_filter(
  272. context, replica_id=replica_id, schedule_id=schedule_id,
  273. expired=expired)
  274. return sched_filter.first()
  275. @enginefacade.writer
  276. def update_replica_schedule(context, replica_id, schedule_id,
  277. updated_values, pre_update_callable=None,
  278. post_update_callable=None):
  279. # NOTE(gsamfira): we need to refactor the DB layer a bit to allow
  280. # two-phase transactions or at least allow running these functions
  281. # inside a single transaction block.
  282. schedule = get_replica_schedule(context, replica_id, schedule_id)
  283. if pre_update_callable:
  284. pre_update_callable(schedule=schedule)
  285. for val in ["schedule", "expiration_date", "enabled", "shutdown_instance"]:
  286. if val in updated_values:
  287. setattr(schedule, val, updated_values[val])
  288. if post_update_callable:
  289. # at this point nothing has really been sent to the DB,
  290. # but we may need to act upon the new changes elsewhere
  291. # before we actually commit to the database
  292. post_update_callable(context, schedule)
  293. @enginefacade.writer
  294. def delete_replica_schedule(context, replica_id,
  295. schedule_id, pre_delete_callable=None,
  296. post_delete_callable=None):
  297. # NOTE(gsamfira): we need to refactor the DB layer a bit to allow
  298. # two-phase transactions or at least allow running these functions
  299. # inside a single transaction block.
  300. q = _soft_delete_aware_query(context, models.ReplicaSchedule).filter(
  301. models.ReplicaSchedule.id == schedule_id,
  302. models.ReplicaSchedule.replica_id == replica_id)
  303. schedule = q.first()
  304. if not schedule:
  305. raise exception.NotFound(
  306. "No such schedule")
  307. if is_user_context(context):
  308. if not q.join(models.Replica).filter(
  309. models.Replica.project_id == context.tenant).first():
  310. raise exception.NotAuthorized()
  311. if pre_delete_callable:
  312. pre_delete_callable(context, schedule)
  313. count = q.soft_delete()
  314. if post_delete_callable:
  315. post_delete_callable(context, schedule)
  316. if count == 0:
  317. raise exception.NotFound("0 entries were soft deleted")
  318. @enginefacade.writer
  319. def add_replica_schedule(context, schedule, post_create_callable=None):
  320. # NOTE(gsamfira): we need to refactor the DB layer a bit to allow
  321. # two-phase transactions or at least allow running these functions
  322. # inside a single transaction block.
  323. if schedule.replica.project_id != context.tenant:
  324. raise exception.NotAuthorized()
  325. _session(context).add(schedule)
  326. if post_create_callable:
  327. post_create_callable(context, schedule)
  328. def _get_replica_with_tasks_executions_options(q):
  329. return q.options(orm.joinedload(models.Replica.executions))
  330. @enginefacade.reader
  331. def get_replicas(context,
  332. include_tasks_executions=False,
  333. include_info=False,
  334. to_dict=True):
  335. q = _soft_delete_aware_query(context, models.Replica)
  336. if include_tasks_executions:
  337. q = _get_replica_with_tasks_executions_options(q)
  338. if include_info is False:
  339. q = q.options(orm.defer('info'))
  340. q = q.filter()
  341. if is_user_context(context):
  342. q = q.filter(
  343. models.Replica.project_id == context.tenant)
  344. db_result = q.all()
  345. if to_dict:
  346. return [
  347. i.to_dict(
  348. include_info=include_info,
  349. include_executions=include_tasks_executions)
  350. for i in db_result]
  351. return db_result
  352. @enginefacade.reader
  353. def get_replica(context, replica_id):
  354. q = _soft_delete_aware_query(context, models.Replica)
  355. q = _get_replica_with_tasks_executions_options(q)
  356. if is_user_context(context):
  357. q = q.filter(
  358. models.Replica.project_id == context.tenant)
  359. return q.filter(
  360. models.Replica.id == replica_id).first()
  361. @enginefacade.reader
  362. def get_endpoint_replicas_count(context, endpoint_id):
  363. origin_args = {'origin_endpoint_id': endpoint_id}
  364. q_origin_count = _soft_delete_aware_query(
  365. context, models.Replica).filter_by(**origin_args).count()
  366. destination_args = {'destination_endpoint_id': endpoint_id}
  367. q_destination_count = _soft_delete_aware_query(
  368. context, models.Replica).filter_by(**destination_args).count()
  369. return q_origin_count + q_destination_count
  370. @enginefacade.writer
  371. def add_replica(context, replica):
  372. replica.user_id = context.user
  373. replica.project_id = context.tenant
  374. _session(context).add(replica)
  375. @enginefacade.writer
  376. def _delete_transfer_action(context, cls, id):
  377. args = {"base_id": id}
  378. if is_user_context(context):
  379. args["project_id"] = context.tenant
  380. count = _soft_delete_aware_query(context, cls).filter_by(
  381. **args).soft_delete()
  382. if count == 0:
  383. raise exception.NotFound("0 entries were soft deleted")
  384. _soft_delete_aware_query(context, models.TasksExecution).filter_by(
  385. action_id=id).soft_delete()
  386. @enginefacade.writer
  387. def delete_replica(context, replica_id):
  388. _delete_transfer_action(context, models.Replica, replica_id)
  389. @enginefacade.reader
  390. def get_replica_migrations(context, replica_id):
  391. q = _soft_delete_aware_query(context, models.Migration)
  392. q = q.join("replica")
  393. q = q.options(orm.joinedload("executions"))
  394. if is_user_context(context):
  395. q = q.filter(
  396. models.Migration.project_id == context.tenant)
  397. return q.filter(
  398. models.Replica.id == replica_id).all()
  399. @enginefacade.reader
  400. def get_migrations(context, include_tasks=False,
  401. include_info=False, to_dict=True):
  402. q = _soft_delete_aware_query(context, models.Migration)
  403. if include_tasks:
  404. q = _get_migration_task_query_options(q)
  405. else:
  406. q = q.options(orm.joinedload("executions"))
  407. if include_info is False:
  408. q = q.options(orm.defer('info'))
  409. args = {}
  410. if is_user_context(context):
  411. args["project_id"] = context.tenant
  412. result = q.filter_by(**args).all()
  413. if to_dict:
  414. return [i.to_dict(
  415. include_info=include_info,
  416. include_tasks=include_tasks) for i in result]
  417. return result
  418. def _get_tasks_with_details_options(query):
  419. return query.options(
  420. orm.joinedload("action")).options(
  421. orm.joinedload("tasks").
  422. joinedload("progress_updates")).options(
  423. orm.joinedload("tasks").
  424. joinedload("events"))
  425. def _get_migration_task_query_options(query):
  426. return query.options(
  427. orm.joinedload("executions").
  428. joinedload("tasks").
  429. joinedload("progress_updates")).options(
  430. orm.joinedload("executions").
  431. joinedload("tasks").
  432. joinedload("events")).options(
  433. orm.joinedload("executions").
  434. joinedload("action"))
  435. @enginefacade.reader
  436. def get_migration(context, migration_id):
  437. q = _soft_delete_aware_query(context, models.Migration)
  438. q = _get_migration_task_query_options(q)
  439. args = {"id": migration_id}
  440. if is_user_context(context):
  441. args["project_id"] = context.tenant
  442. return q.filter_by(**args).first()
  443. @enginefacade.writer
  444. def add_migration(context, migration):
  445. migration.user_id = context.user
  446. migration.project_id = context.tenant
  447. _session(context).add(migration)
  448. @enginefacade.writer
  449. def delete_migration(context, migration_id):
  450. _delete_transfer_action(context, models.Migration, migration_id)
  451. @enginefacade.writer
  452. def set_execution_status(
  453. context, execution_id, status, update_action_status=True):
  454. execution = _soft_delete_aware_query(
  455. context, models.TasksExecution).join(
  456. models.TasksExecution.action)
  457. if is_user_context(context):
  458. execution = execution.filter(
  459. models.BaseTransferAction.project_id == context.tenant)
  460. execution = execution.filter(
  461. models.TasksExecution.id == execution_id).first()
  462. if not execution:
  463. raise exception.NotFound(
  464. "Tasks execution not found: %s" % execution_id)
  465. execution.status = status
  466. if update_action_status:
  467. set_action_last_execution_status(
  468. context, execution.action_id, status)
  469. @enginefacade.reader
  470. def get_action(context, action_id):
  471. action = _soft_delete_aware_query(
  472. context, models.BaseTransferAction)
  473. if is_user_context(context):
  474. action = action.filter(
  475. models.BaseTransferAction.project_id == context.tenant)
  476. action = action.filter(
  477. models.BaseTransferAction.base_id == action_id).first()
  478. if not action:
  479. raise exception.NotFound(
  480. "Transfer action not found: %s" % action_id)
  481. return action
  482. @enginefacade.writer
  483. def set_action_last_execution_status(
  484. context, action_id, last_execution_status):
  485. action = get_action(context, action_id)
  486. action.last_execution_status = last_execution_status
  487. @enginefacade.writer
  488. def update_transfer_action_info_for_instance(
  489. context, action_id, instance, new_instance_info):
  490. """ Updates the info for the given action with the provided dict.
  491. Returns the updated value.
  492. Sub-fields of the dict already in the info will get overwritten entirely!
  493. """
  494. action = get_action(context, action_id)
  495. if not new_instance_info:
  496. LOG.debug(
  497. "No new info provided for action '%s' and instance '%s'. "
  498. "Nothing to update in the DB.",
  499. action_id, instance)
  500. return action.info.get(instance, {})
  501. # Copy is needed, otherwise sqlalchemy won't save the changes
  502. action_info = action.info.copy()
  503. if instance in action_info:
  504. instance_info_old = action_info[instance]
  505. old_keys = set(instance_info_old.keys())
  506. new_keys = set(new_instance_info.keys())
  507. overwritten_keys = old_keys.intersection(new_keys)
  508. if overwritten_keys:
  509. LOG.debug(
  510. "Overwriting the values of the following keys for info of "
  511. "instance '%s' of action with ID '%s': %s",
  512. instance, action_id, overwritten_keys)
  513. newly_added_keys = new_keys.difference(old_keys)
  514. if newly_added_keys:
  515. LOG.debug(
  516. "The following new keys will be added for info of instance "
  517. "'%s' in action with ID '%s': %s",
  518. instance, action_id, newly_added_keys)
  519. instance_info_old_copy = instance_info_old.copy()
  520. instance_info_old_copy.update(new_instance_info)
  521. action_info[instance] = instance_info_old_copy
  522. action.info = action_info
  523. return action_info[instance]
  524. @enginefacade.writer
  525. def set_transfer_action_result(context, action_id, instance, result):
  526. """ Adds the result for the given 'instance' in the 'transfer_result'
  527. JSON in the 'base_transfer_action' table.
  528. """
  529. action = get_action(context, action_id)
  530. transfer_result = {}
  531. if action.transfer_result:
  532. transfer_result = action.transfer_result.copy()
  533. transfer_result[instance] = result
  534. action.transfer_result = transfer_result
  535. return transfer_result[instance]
  536. @enginefacade.reader
  537. def get_tasks_execution(context, execution_id):
  538. q = _soft_delete_aware_query(context, models.TasksExecution)
  539. q = q.join(models.BaseTransferAction)
  540. q = q.options(orm.joinedload("action"))
  541. q = q.options(orm.joinedload("tasks"))
  542. if is_user_context(context):
  543. q = q.filter(
  544. models.BaseTransferAction.project_id == context.tenant)
  545. execution = q.filter(
  546. models.TasksExecution.id == execution_id).first()
  547. if not execution:
  548. raise exception.NotFound(
  549. "Tasks execution not found: %s" % execution_id)
  550. return execution
  551. def _get_task(context, task_id):
  552. task = _soft_delete_aware_query(context, models.Task).filter_by(
  553. id=task_id).first()
  554. if not task:
  555. raise exception.NotFound("Task not found: %s" % task_id)
  556. return task
  557. @enginefacade.writer
  558. def set_task_status(context, task_id, status, exception_details=None):
  559. task = _get_task(context, task_id)
  560. task.status = status
  561. task.exception_details = exception_details
  562. @enginefacade.writer
  563. def set_task_host_properties(context, task_id, host=None, process_id=None):
  564. task = _get_task(context, task_id)
  565. if host:
  566. task.host = host
  567. if process_id:
  568. task.process_id = process_id
  569. @enginefacade.reader
  570. def get_task(context, task_id):
  571. q = _soft_delete_aware_query(context, models.Task)
  572. return q.filter_by(id=task_id).first()
  573. @enginefacade.writer
  574. def add_task_event(context, task_id, level, message):
  575. task_event = models.TaskEvent()
  576. task_event.task_id = task_id
  577. task_event.level = level
  578. task_event.message = message
  579. _session(context).add(task_event)
  580. def _get_progress_update(context, task_id, current_step):
  581. q = _soft_delete_aware_query(context, models.TaskProgressUpdate)
  582. return q.filter(
  583. models.TaskProgressUpdate.task_id == task_id,
  584. models.TaskProgressUpdate.current_step == current_step).first()
  585. @enginefacade.writer
  586. def add_task_progress_update(context, task_id, current_step, total_steps,
  587. message):
  588. task_progress_update = _get_progress_update(context, task_id, current_step)
  589. if not task_progress_update:
  590. task_progress_update = models.TaskProgressUpdate()
  591. _session(context).add(task_progress_update)
  592. task_progress_update.task_id = task_id
  593. task_progress_update.current_step = current_step
  594. task_progress_update.total_steps = total_steps
  595. task_progress_update.message = message
  596. @enginefacade.writer
  597. def update_replica(context, replica_id, updated_values):
  598. replica = get_replica(context, replica_id)
  599. if not replica:
  600. raise exception.NotFound("Replica not found")
  601. mapped_info_fields = {
  602. 'destination_environment': 'target_environment'}
  603. updateable_fields = [
  604. "source_environment", "destination_environment", "notes",
  605. "network_map", "storage_mappings"]
  606. for field in updateable_fields:
  607. if mapped_info_fields.get(field, field) in updated_values:
  608. LOG.debug(
  609. "Updating the '%s' field of Replica '%s' to: '%s'",
  610. field, replica_id, updated_values[
  611. mapped_info_fields.get(field, field)])
  612. setattr(
  613. replica, field,
  614. updated_values[mapped_info_fields.get(field, field)])
  615. non_updateable_fields = set(
  616. updated_values.keys()).difference({
  617. mapped_info_fields.get(field, field)
  618. for field in updateable_fields})
  619. if non_updateable_fields:
  620. LOG.warn(
  621. "The following Replica fields can NOT be updated: %s",
  622. non_updateable_fields)
  623. # the oslo_db library uses this method for both the `created_at` and
  624. # `updated_at` fields
  625. setattr(replica, 'updated_at', timeutils.utcnow())
  626. @enginefacade.writer
  627. def add_region(context, region):
  628. _session(context).add(region)
  629. @enginefacade.reader
  630. def get_regions(context):
  631. q = _soft_delete_aware_query(context, models.Region)
  632. q = q.options(orm.joinedload('mapped_endpoints'))
  633. q = q.options(orm.joinedload('mapped_services'))
  634. return q.all()
  635. @enginefacade.reader
  636. def get_region(context, region_id):
  637. q = _soft_delete_aware_query(context, models.Region)
  638. q = q.options(orm.joinedload('mapped_endpoints'))
  639. q = q.options(orm.joinedload('mapped_services'))
  640. return q.filter(
  641. models.Region.id == region_id).first()
  642. @enginefacade.writer
  643. def update_region(context, region_id, updated_values):
  644. if not region_id:
  645. raise exception.InvalidInput(
  646. "No region ID specified for updating.")
  647. region = get_region(context, region_id)
  648. if not region:
  649. raise exception.NotFound(
  650. "Region with ID '%s' does not exist." % region_id)
  651. updateable_fields = ["name", "description", "enabled"]
  652. _update_sqlalchemy_object_fields(
  653. region, updateable_fields, updated_values)
  654. @enginefacade.writer
  655. def delete_region(context, region_id):
  656. region = get_region(context, region_id)
  657. count = _soft_delete_aware_query(context, models.Region).filter_by(
  658. id=region_id).soft_delete()
  659. if count == 0:
  660. raise exception.NotFound("0 region entries were soft deleted")
  661. # NOTE(aznashwan): many-to-many tables with soft deletion on either end of
  662. # the association are not handled properly so we must manually delete each
  663. # association ourselves:
  664. for endp in region.mapped_endpoints:
  665. delete_endpoint_region_mapping(context, endp.id, region_id)
  666. for svc in region.mapped_services:
  667. delete_service_region_mapping(context, svc.id, region_id)
  668. @enginefacade.writer
  669. def add_endpoint_region_mapping(context, endpoint_region_mapping):
  670. region_id = endpoint_region_mapping.region_id
  671. endpoint_id = endpoint_region_mapping.endpoint_id
  672. if None in [region_id, endpoint_id]:
  673. raise exception.InvalidInput(
  674. "Provided endpoint region mapping params for the region ID "
  675. "('%s') and the endpoint ID ('%s') must both be non-null." % (
  676. region_id, endpoint_id))
  677. _session(context).add(endpoint_region_mapping)
  678. @enginefacade.reader
  679. def get_endpoint_region_mapping(context, endpoint_id, region_id):
  680. q = _soft_delete_aware_query(context, models.EndpointRegionMapping)
  681. q = q.filter(
  682. models.EndpointRegionMapping.region == region_id)
  683. q = q.filter(
  684. models.EndpointRegionMapping.endpoint_id == endpoint_id)
  685. return q.all()
  686. @enginefacade.writer
  687. def delete_endpoint_region_mapping(context, endpoint_id, region_id):
  688. args = {"endpoint_id": endpoint_id, "region_id": region_id}
  689. # TODO(aznashwan): many-to-many realtionships have no sane way of
  690. # supporting soft deletion from the sqlalchemy layer wihout
  691. # writing join condictions, so we hard-`delete()` instead of
  692. # `soft_delete()` util we find a better option:
  693. count = _soft_delete_aware_query(
  694. context, models.EndpointRegionMapping).filter_by(
  695. **args).delete()
  696. if count == 0:
  697. raise exception.NotFound(
  698. "There is no mapping between endpoint '%s' and region '%s'." % (
  699. endpoint_id, region_id))
  700. LOG.debug(
  701. "Deleted mapping between endpoint '%s' and region '%s' from DB",
  702. endpoint_id, region_id)
  703. @enginefacade.reader
  704. def get_region_mappings_for_endpoint(
  705. context, endpoint_id, enabled_regions_only=False):
  706. q = _soft_delete_aware_query(context, models.EndpointRegionMapping)
  707. q = q.join(models.Region)
  708. q = q.filter(
  709. models.EndpointRegionMapping.endpoint_id == endpoint_id)
  710. if enabled_regions_only:
  711. q = q.filter(
  712. models.Region.enabled == True)
  713. return q.all()
  714. @enginefacade.reader
  715. def get_mapped_endpoints_for_region(context, region_id):
  716. q = _soft_delete_aware_query(context, models.Endpoint)
  717. q = q.join(models.EndpointRegionMapping)
  718. q = q.filter(
  719. models.EndpointRegionMapping.endpoint_id == region_id)
  720. return q.all()
  721. @enginefacade.writer
  722. def add_service(context, service):
  723. _session(context).add(service)
  724. @enginefacade.reader
  725. def get_services(context):
  726. q = _soft_delete_aware_query(context, models.Service).options(
  727. orm.joinedload('mapped_regions'))
  728. return q.all()
  729. @enginefacade.reader
  730. def get_service(context, service_id):
  731. q = _soft_delete_aware_query(context, models.Service).options(
  732. orm.joinedload('mapped_regions'))
  733. return q.filter(
  734. models.Service.id == service_id).first()
  735. @enginefacade.reader
  736. def find_service(context, host, binary, topic=None):
  737. args = {"host": host, "binary": binary}
  738. if topic:
  739. args["topic"] = topic
  740. q = _soft_delete_aware_query(context, models.Service).options(
  741. orm.joinedload('mapped_regions')).filter_by(**args)
  742. return q.first()
  743. @enginefacade.writer
  744. def update_service(context, service_id, updated_values):
  745. if not service_id:
  746. raise exception.InvalidInput(
  747. "No service ID specified for updating.")
  748. service = get_service(context, service_id)
  749. if not service:
  750. raise exception.NotFound(
  751. "Service with ID '%s' does not exist." % service_id)
  752. if not isinstance(updated_values, dict):
  753. raise exception.InvalidInput(
  754. "Update payload for services must be a dict. Got the following "
  755. "(type: %s): %s" % (type(updated_values), updated_values))
  756. def _try_unmap_regions(region_ids):
  757. for region_to_unmap in region_ids:
  758. try:
  759. LOG.debug(
  760. "Attempting to unmap region '%s' from service '%s'",
  761. region_to_unmap, service_id)
  762. delete_service_region_mapping(
  763. context, service_id, region_to_unmap)
  764. except Exception as ex:
  765. LOG.warn(
  766. "Exception occurred while attempting to unmap region '%s' "
  767. "from service '%s'. Ignoring. Error was: %s",
  768. region_to_unmap, service_id,
  769. utils.get_exception_details())
  770. newly_mapped_regions = []
  771. regions_to_unmap = []
  772. # NOTE: `.pop()` required for `_update_sqlalchemy_object_fields` call:
  773. desired_region_mappings = updated_values.pop('mapped_regions', None)
  774. if desired_region_mappings is not None:
  775. # ensure all requested regions exist:
  776. for region_id in desired_region_mappings:
  777. region = get_region(context, region_id)
  778. if not region:
  779. raise exception.NotFound(
  780. "Could not find region with ID '%s' for associating "
  781. "with serce '%s' during update process." % (
  782. region_id, service_id))
  783. # get all existing mappings:
  784. existing_region_mappings = [
  785. mapping.region_id
  786. for mapping in get_region_mappings_for_service(
  787. context, service_id)]
  788. # check and add new mappings:
  789. to_map = set(
  790. desired_region_mappings).difference(set(existing_region_mappings))
  791. regions_to_unmap = set(
  792. existing_region_mappings).difference(set(desired_region_mappings))
  793. LOG.debug(
  794. "Remapping regions for service '%s' from %s to %s",
  795. service_id, existing_region_mappings, desired_region_mappings)
  796. region_id = None
  797. try:
  798. for region_id in to_map:
  799. mapping = models.ServiceRegionMapping()
  800. mapping.region_id = region_id
  801. mapping.service_id = service_id
  802. add_service_region_mapping(context, mapping)
  803. newly_mapped_regions.append(region_id)
  804. except Exception as ex:
  805. LOG.warn(
  806. "Exception occurred while adding region mapping for '%s' to "
  807. "service '%s'. Cleaning up created mappings (%s). Error was: "
  808. "%s", region_id, service_id, newly_mapped_regions,
  809. utils.get_exception_details())
  810. _try_unmap_regions(newly_mapped_regions)
  811. raise
  812. updateable_fields = ["enabled", "status", "providers", "specs"]
  813. try:
  814. _update_sqlalchemy_object_fields(
  815. service, updateable_fields, updated_values)
  816. except Exception as ex:
  817. LOG.warn(
  818. "Exception occurred while updating fields of service '%s'. "
  819. "Cleaning ""up created mappings (%s). Error was: %s",
  820. service_id, newly_mapped_regions, utils.get_exception_details())
  821. _try_unmap_regions(newly_mapped_regions)
  822. raise
  823. # remove all of the old region mappings:
  824. LOG.debug(
  825. "Unmapping the following regions during update of service '%s': %s",
  826. service_id, regions_to_unmap)
  827. _try_unmap_regions(regions_to_unmap)
  828. @enginefacade.writer
  829. def delete_service(context, service_id):
  830. service = get_service(context, service_id)
  831. count = _soft_delete_aware_query(context, models.Service).filter_by(
  832. id=service_id).soft_delete()
  833. if count == 0:
  834. raise exception.NotFound("0 service entries were soft deleted")
  835. # NOTE(aznashwan): many-to-many tables with soft deletion on either end of
  836. # the association are not handled properly so we must manually delete each
  837. # association ourselves:
  838. for reg in service.mapped_regions:
  839. delete_service_region_mapping(context, service_id, reg.id)
  840. @enginefacade.writer
  841. def add_service_region_mapping(context, service_region_mapping):
  842. region_id = service_region_mapping.region_id
  843. service_id = service_region_mapping.service_id
  844. if None in [region_id, service_id]:
  845. raise exception.InvalidInput(
  846. "Provided service region mapping params for the region ID "
  847. "('%s') and the service ID ('%s') must both be non-null." % (
  848. region_id, service_id))
  849. _session(context).add(service_region_mapping)
  850. @enginefacade.reader
  851. def get_service_region_mapping(context, service_id, region_id):
  852. q = _soft_delete_aware_query(context, models.ServiceRegionMapping)
  853. q = q.filter(
  854. models.ServiceRegionMapping.region == region_id)
  855. q = q.filter(
  856. models.ServiceRegionMapping.service_id == service_id)
  857. return q.all()
  858. @enginefacade.writer
  859. def delete_service_region_mapping(context, service_id, region_id):
  860. args = {"service_id": service_id, "region_id": region_id}
  861. # TODO(aznashwan): many-to-many realtionships have no sane way of
  862. # supporting soft deletion from the sqlalchemy layer wihout
  863. # writing join condictions, so we hard-`delete()` instead of
  864. # `soft_delete()` util we find a better option:
  865. count = _soft_delete_aware_query(
  866. context, models.ServiceRegionMapping).filter_by(
  867. **args).delete()
  868. if count == 0:
  869. raise exception.NotFound(
  870. "There is no mapping between service '%s' and region '%s'." % (
  871. service_id, region_id))
  872. @enginefacade.reader
  873. def get_region_mappings_for_service(
  874. context, service_id, enabled_regions_only=False):
  875. q = _soft_delete_aware_query(context, models.ServiceRegionMapping)
  876. q = q.join(models.Region)
  877. q = q.filter(
  878. models.ServiceRegionMapping.service_id == service_id)
  879. if enabled_regions_only:
  880. q = q.filter(
  881. models.Region.enabled == True)
  882. return q.all()
  883. @enginefacade.reader
  884. def get_mapped_services_for_region(context, region_id):
  885. q = _soft_delete_aware_query(context, models.Service)
  886. q = q.join(models.ServiceRegionMapping)
  887. q = q.filter(
  888. models.ServiceRegionMapping.service_id == region_id)
  889. return q.all()