server.py 209 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483
  1. # Copyright 2016 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. import contextlib
  4. import copy
  5. import functools
  6. import itertools
  7. import random
  8. import time
  9. import uuid
  10. from oslo_concurrency import lockutils
  11. from oslo_config import cfg
  12. from oslo_log import log as logging
  13. from coriolis import constants
  14. from coriolis import context
  15. from coriolis.db import api as db_api
  16. from coriolis.db.sqlalchemy import models
  17. from coriolis import exception
  18. from coriolis import keystone
  19. from coriolis.licensing import client as licensing_client
  20. from coriolis.replica_cron.rpc import client as rpc_cron_client
  21. from coriolis.scheduler.rpc import client as rpc_scheduler_client
  22. from coriolis import schemas
  23. from coriolis.tasks import factory as tasks_factory
  24. from coriolis import utils
  25. from coriolis.worker.rpc import client as rpc_worker_client
  26. VERSION = "1.0"
  27. LOG = logging.getLogger(__name__)
  28. CONDUCTOR_OPTS = [
  29. cfg.BoolOpt("debug_os_morphing_errors",
  30. default=False,
  31. help="If set, any OSMorphing task which errors out will have "
  32. "all of its following tasks unscheduled so as to allow "
  33. "for live debugging of the OSMorphing setup.")
  34. ]
  35. CONF = cfg.CONF
  36. CONF.register_opts(CONDUCTOR_OPTS, 'conductor')
  37. TASK_DEADLOCK_ERROR_MESSAGE = (
  38. "A fatal deadlock has occurred. Further debugging is required. "
  39. "Please review the Conductor logs and contact support for assistance.")
  40. RPC_TOPIC_TO_CLIENT_CLASS_MAP = {
  41. constants.WORKER_MAIN_MESSAGING_TOPIC: rpc_worker_client.WorkerClient,
  42. constants.SCHEDULER_MAIN_MESSAGING_TOPIC: (
  43. rpc_scheduler_client.SchedulerClient),
  44. constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC: (
  45. rpc_cron_client.ReplicaCronClient)
  46. }
  47. def endpoint_synchronized(func):
  48. @functools.wraps(func)
  49. def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
  50. @lockutils.synchronized(
  51. constants.ENDPOINT_LOCK_NAME_FORMAT % endpoint_id,
  52. external=True)
  53. def inner():
  54. return func(self, ctxt, endpoint_id, *args, **kwargs)
  55. return inner()
  56. return wrapper
  57. def replica_synchronized(func):
  58. @functools.wraps(func)
  59. def wrapper(self, ctxt, replica_id, *args, **kwargs):
  60. @lockutils.synchronized(
  61. constants.REPLICA_LOCK_NAME_FORMAT % replica_id,
  62. external=True)
  63. def inner():
  64. return func(self, ctxt, replica_id, *args, **kwargs)
  65. return inner()
  66. return wrapper
  67. def schedule_synchronized(func):
  68. @functools.wraps(func)
  69. def wrapper(self, ctxt, replica_id, schedule_id, *args, **kwargs):
  70. @lockutils.synchronized(
  71. constants.SCHEDULE_LOCK_NAME_FORMAT % schedule_id,
  72. external=True)
  73. def inner():
  74. return func(self, ctxt, replica_id, schedule_id, *args, **kwargs)
  75. return inner()
  76. return wrapper
  77. def task_synchronized(func):
  78. @functools.wraps(func)
  79. def wrapper(self, ctxt, task_id, *args, **kwargs):
  80. @lockutils.synchronized(
  81. constants.TASK_LOCK_NAME_FORMAT % task_id,
  82. external=True)
  83. def inner():
  84. return func(self, ctxt, task_id, *args, **kwargs)
  85. return inner()
  86. return wrapper
  87. def parent_tasks_execution_synchronized(func):
  88. @functools.wraps(func)
  89. def wrapper(self, ctxt, task_id, *args, **kwargs):
  90. task = db_api.get_task(ctxt, task_id)
  91. @lockutils.synchronized(
  92. constants.EXECUTION_LOCK_NAME_FORMAT % task.execution_id,
  93. external=True)
  94. @lockutils.synchronized(
  95. constants.TASK_LOCK_NAME_FORMAT % task_id,
  96. external=True)
  97. def inner():
  98. return func(self, ctxt, task_id, *args, **kwargs)
  99. return inner()
  100. return wrapper
  101. def migration_synchronized(func):
  102. @functools.wraps(func)
  103. def wrapper(self, ctxt, migration_id, *args, **kwargs):
  104. @lockutils.synchronized(
  105. constants.MIGRATION_LOCK_NAME_FORMAT % migration_id,
  106. external=True)
  107. def inner():
  108. return func(self, ctxt, migration_id, *args, **kwargs)
  109. return inner()
  110. return wrapper
  111. def tasks_execution_synchronized(func):
  112. @functools.wraps(func)
  113. def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs):
  114. @lockutils.synchronized(
  115. constants.EXECUTION_LOCK_NAME_FORMAT % execution_id,
  116. external=True)
  117. def inner():
  118. return func(self, ctxt, replica_id, execution_id, *args, **kwargs)
  119. return inner()
  120. return wrapper
  121. def minion_pool_tasks_execution_synchronized(func):
  122. @functools.wraps(func)
  123. def wrapper(self, ctxt, minion_pool_id, execution_id, *args, **kwargs):
  124. @lockutils.synchronized(
  125. constants.EXECUTION_LOCK_NAME_FORMAT % execution_id,
  126. external=True)
  127. def inner():
  128. return func(self, ctxt, minion_pool_id, execution_id, *args, **kwargs)
  129. return inner()
  130. return wrapper
  131. def region_synchronized(func):
  132. @functools.wraps(func)
  133. def wrapper(self, ctxt, region_id, *args, **kwargs):
  134. @lockutils.synchronized(
  135. constants.REGION_LOCK_NAME_FORMAT % region_id,
  136. external=True)
  137. def inner():
  138. return func(self, ctxt, region_id, *args, **kwargs)
  139. return inner()
  140. return wrapper
  141. def service_synchronized(func):
  142. @functools.wraps(func)
  143. def wrapper(self, ctxt, service_id, *args, **kwargs):
  144. @lockutils.synchronized(
  145. constants.SERVICE_LOCK_NAME_FORMAT % service_id,
  146. external=True)
  147. def inner():
  148. return func(self, ctxt, service_id, *args, **kwargs)
  149. return inner()
  150. return wrapper
  151. def minion_pool_synchronized(func):
  152. @functools.wraps(func)
  153. def wrapper(self, ctxt, minion_pool_id, *args, **kwargs):
  154. @lockutils.synchronized(
  155. constants.MINION_POOL_LOCK_NAME_FORMAT % minion_pool_id,
  156. external=True)
  157. def inner():
  158. return func(self, ctxt, minion_pool_id, *args, **kwargs)
  159. return inner()
  160. return wrapper
  161. class ConductorServerEndpoint(object):
  162. def __init__(self):
  163. self._licensing_client = licensing_client.LicensingClient.from_env()
  164. self._worker_client_instance = None
  165. self._scheduler_client_instance = None
  166. self._replica_cron_client_instance = None
  167. # NOTE(aznashwan): it is unsafe to fork processes with pre-instantiated
  168. # oslo_messaging clients as the underlying eventlet thread queues will
  169. # be invalidated. Considering this class both serves from a "main
  170. # process" as well as forking child processes, it is safest to
  171. # instantiate the clients only when needed:
  172. @property
  173. def _worker_client(self):
  174. if not self._worker_client_instance:
  175. self._worker_client_instance = (
  176. rpc_worker_client.WorkerClient())
  177. return self._worker_client_instance
  178. @property
  179. def _scheduler_client(self):
  180. if not self._scheduler_client_instance:
  181. self._scheduler_client_instance = (
  182. rpc_scheduler_client.SchedulerClient())
  183. return self._scheduler_client_instance
  184. @property
  185. def _replica_cron_client(self):
  186. if not self._replica_cron_client_instance:
  187. self._replica_cron_client_instance = (
  188. rpc_cron_client.ReplicaCronClient())
  189. return self._replica_cron_client_instance
  190. def get_all_diagnostics(self, ctxt):
  191. diagnostics = [
  192. self.get_diagnostics(ctxt),
  193. self._replica_cron_client.get_diagnostics(ctxt),
  194. self._scheduler_client.get_diagnostics(ctxt)]
  195. worker_diagnostics = []
  196. for worker_service in self._scheduler_client.get_workers_for_specs(
  197. ctxt):
  198. worker_rpc = self._get_worker_rpc_for_host(worker_service['host'])
  199. diagnostics.append(worker_rpc.get_diagnostics(ctxt))
  200. return diagnostics
  201. def _get_any_worker_service(self, ctxt, random_choice=False, raw_dict=False):
  202. services = self._scheduler_client.get_workers_for_specs(ctxt)
  203. if not services:
  204. raise exception.NoWorkerServiceError()
  205. service = services[0]
  206. if random_choice:
  207. service = random.choice(services)
  208. if raw_dict:
  209. return service
  210. return db_api.get_service(ctxt, service['id'])
  211. def _get_worker_rpc_for_host(self, worker_host, **client_kwargs):
  212. return rpc_worker_client.WorkerClient(host=worker_host, **client_kwargs)
  213. def _get_worker_service_rpc_for_specs(
  214. self, ctxt, provider_requirements=None, region_sets=None,
  215. enabled=True, random_choice=False, raise_on_no_matches=True):
  216. requirements_str = (
  217. "enabled=%s; region_sets=%s; provider_requirements=%s" % (
  218. enabled, region_sets, provider_requirements))
  219. LOG.info(
  220. "Requesting Worker Service from scheduler with the following "
  221. "specifications: %s", requirements_str)
  222. services = self._scheduler_client.get_workers_for_specs(
  223. ctxt, provider_requirements=provider_requirements,
  224. region_sets=region_sets, enabled=enabled)
  225. if not services:
  226. if raise_on_no_matches:
  227. raise exception.NoSuitableWorkerServiceError()
  228. return None
  229. LOG.debug(
  230. "Was offered Worker Services with the following IDs for "
  231. "requirements '%s': %s",
  232. requirements_str, [s["id"] for s in services])
  233. selected_service = services[0]
  234. if random_choice:
  235. selected_service = random.choice(services)
  236. LOG.info(
  237. "Was offered Worker Service with ID '%s' for requirements: %s",
  238. selected_service['id'], requirements_str)
  239. return self._get_worker_rpc_for_host(selected_service['host'])
  240. def _check_delete_reservation_for_transfer(self, transfer_action):
  241. action_id = transfer_action.base_id
  242. if not self._licensing_client:
  243. LOG.warn(
  244. "Licensing client not instantiated. Skipping deletion of "
  245. "reservation for transfer action '%s'", action_id)
  246. return
  247. reservation_id = transfer_action.reservation_id
  248. if reservation_id:
  249. try:
  250. self._licensing_client.delete_reservation(reservation_id)
  251. except (Exception, KeyboardInterrupt):
  252. LOG.warn(
  253. "Failed to delete reservation with ID '%s' for transfer "
  254. "action with ID '%s'. Skipping. Exception\n%s",
  255. reservation_id, action_id, utils.get_exception_details())
  256. def _check_create_reservation_for_transfer(
  257. self, transfer_action, transfer_type):
  258. action_id = transfer_action.base_id
  259. if not self._licensing_client:
  260. LOG.warn(
  261. "Licensing client not instantiated. Skipping creation of "
  262. "reservation for transfer action '%s'", action_id)
  263. return
  264. ninstances = len(transfer_action.instances)
  265. LOG.debug(
  266. "Attempting to create '%s' reservation for %d instances for "
  267. "transfer action with ID '%s'.",
  268. transfer_type, ninstances, action_id)
  269. reservation = self._licensing_client.add_reservation(
  270. transfer_type, ninstances)
  271. transfer_action.reservation_id = reservation['id']
  272. def _check_reservation_for_transfer(
  273. self, transfer_action, reservation_type):
  274. action_id = transfer_action.base_id
  275. if not self._licensing_client:
  276. LOG.warn(
  277. "Licensing client not instantiated. Skipping checking of "
  278. "reservation for transfer action '%s'", action_id)
  279. return
  280. reservation_id = transfer_action.reservation_id
  281. if reservation_id:
  282. LOG.debug(
  283. "Attempting to check reservation with ID '%s' for transfer "
  284. "action '%s'", reservation_id, action_id)
  285. try:
  286. transfer_action.reservation_id = (
  287. self._licensing_client.check_refresh_reservation(
  288. reservation_id)['id'])
  289. except Exception as ex:
  290. exc_code = getattr(ex, 'code', None)
  291. if exc_code in [404, 409]:
  292. if exc_code == 409:
  293. LOG.debug(
  294. "Server-side exception occurred while trying to "
  295. "check the existing reservation '%s' for action "
  296. "'%s'. Attempting to create a new reservation. "
  297. "Trace was: %s",
  298. reservation_id, action_id,
  299. utils.get_exception_details())
  300. elif exc_code == 404:
  301. LOG.debug(
  302. "Could not find previous reservation with ID '%s' "
  303. "for action '%s'. Attempting to create a new "
  304. "reservation. Trace was: %s",
  305. reservation_id, action_id,
  306. utils.get_exception_details())
  307. self._check_create_reservation_for_transfer(
  308. transfer_action, reservation_type)
  309. else:
  310. raise ex
  311. else:
  312. LOG.debug(
  313. "Transfer action '%s' has no reservation ID set.", action_id)
  314. self._check_create_reservation_for_transfer(
  315. transfer_action, reservation_type)
  316. def create_endpoint(self, ctxt, name, endpoint_type, description,
  317. connection_info, mapped_regions=None):
  318. endpoint = models.Endpoint()
  319. endpoint.id = str(uuid.uuid4())
  320. endpoint.name = name
  321. endpoint.type = endpoint_type
  322. endpoint.description = description
  323. endpoint.connection_info = connection_info
  324. db_api.add_endpoint(ctxt, endpoint)
  325. LOG.info("Endpoint created: %s", endpoint.id)
  326. # add region associations:
  327. if mapped_regions:
  328. try:
  329. db_api.update_endpoint(
  330. ctxt, endpoint.id, {
  331. "mapped_regions": mapped_regions})
  332. except Exception as ex:
  333. LOG.warn(
  334. "Error adding region mappings during new endpoint creation "
  335. "(name: %s), cleaning up endpoint and all created "
  336. "mappings for regions: %s", endpoint.name, mapped_regions)
  337. db_api.delete_endpoint(ctxt, endpoint.id)
  338. raise
  339. return self.get_endpoint(ctxt, endpoint.id)
  340. @endpoint_synchronized
  341. def update_endpoint(self, ctxt, endpoint_id, updated_values):
  342. LOG.info(
  343. "Attempting to update endpoint '%s' with payload: %s",
  344. endpoint_id, updated_values)
  345. db_api.update_endpoint(ctxt, endpoint_id, updated_values)
  346. LOG.info("Endpoint updated: %s", endpoint_id)
  347. return db_api.get_endpoint(ctxt, endpoint_id)
  348. def get_endpoints(self, ctxt):
  349. return db_api.get_endpoints(ctxt)
  350. @endpoint_synchronized
  351. def get_endpoint(self, ctxt, endpoint_id):
  352. endpoint = db_api.get_endpoint(ctxt, endpoint_id)
  353. if not endpoint:
  354. raise exception.NotFound(
  355. "Endpoint with ID '%s' not found." % endpoint_id)
  356. return endpoint
  357. @endpoint_synchronized
  358. def delete_endpoint(self, ctxt, endpoint_id):
  359. q_replicas_count = db_api.get_endpoint_replicas_count(
  360. ctxt, endpoint_id)
  361. if q_replicas_count is not 0:
  362. raise exception.NotAuthorized("%s replicas would be orphaned!" %
  363. q_replicas_count)
  364. db_api.delete_endpoint(ctxt, endpoint_id)
  365. def get_endpoint_instances(self, ctxt, endpoint_id, source_environment,
  366. marker, limit, instance_name_pattern):
  367. endpoint = self.get_endpoint(ctxt, endpoint_id)
  368. worker_rpc = self._get_worker_service_rpc_for_specs(
  369. ctxt, enabled=True,
  370. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  371. provider_requirements={
  372. endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_INSTANCES]})
  373. return worker_rpc.get_endpoint_instances(
  374. ctxt, endpoint.type, endpoint.connection_info,
  375. source_environment, marker, limit, instance_name_pattern)
  376. def get_endpoint_instance(
  377. self, ctxt, endpoint_id, source_environment, instance_name):
  378. endpoint = self.get_endpoint(ctxt, endpoint_id)
  379. worker_rpc = self._get_worker_service_rpc_for_specs(
  380. ctxt, enabled=True,
  381. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  382. provider_requirements={
  383. endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_INSTANCES]})
  384. return worker_rpc.get_endpoint_instance(
  385. ctxt, endpoint.type, endpoint.connection_info,
  386. source_environment, instance_name)
  387. def get_endpoint_source_options(
  388. self, ctxt, endpoint_id, env, option_names):
  389. endpoint = self.get_endpoint(ctxt, endpoint_id)
  390. worker_rpc = self._get_worker_service_rpc_for_specs(
  391. ctxt, enabled=True,
  392. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  393. provider_requirements={
  394. endpoint.type: [
  395. constants.PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS]})
  396. return worker_rpc.get_endpoint_source_options(
  397. ctxt, endpoint.type, endpoint.connection_info, env, option_names)
  398. def get_endpoint_destination_options(
  399. self, ctxt, endpoint_id, env, option_names):
  400. endpoint = self.get_endpoint(ctxt, endpoint_id)
  401. worker_rpc = self._get_worker_service_rpc_for_specs(
  402. ctxt, enabled=True,
  403. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  404. provider_requirements={
  405. endpoint.type: [
  406. constants.PROVIDER_TYPE_DESTINATION_ENDPOINT_OPTIONS]})
  407. return worker_rpc.get_endpoint_destination_options(
  408. ctxt, endpoint.type, endpoint.connection_info, env, option_names)
  409. def get_endpoint_source_minion_pool_options(
  410. self, ctxt, endpoint_id, env, option_names):
  411. endpoint = self.get_endpoint(ctxt, endpoint_id)
  412. worker_rpc = self._get_worker_service_rpc_for_specs(
  413. ctxt, enabled=True,
  414. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  415. provider_requirements={
  416. endpoint.type: [
  417. constants.PROVIDER_TYPE_SOURCE_MINION_POOL]})
  418. return worker_rpc.get_endpoint_source_minion_pool_options(
  419. ctxt, endpoint.type, endpoint.connection_info, env, option_names)
  420. def get_endpoint_destination_minion_pool_options(
  421. self, ctxt, endpoint_id, env, option_names):
  422. endpoint = self.get_endpoint(ctxt, endpoint_id)
  423. worker_rpc = self._get_worker_service_rpc_for_specs(
  424. ctxt, enabled=True,
  425. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  426. provider_requirements={
  427. endpoint.type: [
  428. constants.PROVIDER_TYPE_DESTINATION_MINION_POOL]})
  429. return worker_rpc.get_endpoint_destination_minion_pool_options(
  430. ctxt, endpoint.type, endpoint.connection_info, env, option_names)
  431. def get_endpoint_networks(self, ctxt, endpoint_id, env):
  432. endpoint = self.get_endpoint(ctxt, endpoint_id)
  433. worker_rpc = self._get_worker_service_rpc_for_specs(
  434. ctxt, enabled=True,
  435. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  436. provider_requirements={
  437. endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_NETWORKS]})
  438. return worker_rpc.get_endpoint_networks(
  439. ctxt, endpoint.type, endpoint.connection_info, env)
  440. def get_endpoint_storage(self, ctxt, endpoint_id, env):
  441. endpoint = self.get_endpoint(ctxt, endpoint_id)
  442. worker_rpc = self._get_worker_service_rpc_for_specs(
  443. ctxt, enabled=True,
  444. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  445. provider_requirements={
  446. endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_STORAGE]})
  447. return worker_rpc.get_endpoint_storage(
  448. ctxt, endpoint.type, endpoint.connection_info, env)
  449. def validate_endpoint_connection(self, ctxt, endpoint_id):
  450. endpoint = self.get_endpoint(ctxt, endpoint_id)
  451. worker_rpc = self._get_worker_service_rpc_for_specs(
  452. ctxt, enabled=True,
  453. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  454. provider_requirements={
  455. endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT]})
  456. return worker_rpc.validate_endpoint_connection(
  457. ctxt, endpoint.type, endpoint.connection_info)
  458. def validate_endpoint_target_environment(
  459. self, ctxt, endpoint_id, target_env):
  460. endpoint = self.get_endpoint(ctxt, endpoint_id)
  461. worker_rpc = self._get_worker_service_rpc_for_specs(
  462. ctxt, enabled=True,
  463. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  464. provider_requirements={
  465. endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT]})
  466. return worker_rpc.validate_endpoint_target_environment(
  467. ctxt, endpoint.type, target_env)
  468. def validate_endpoint_source_environment(
  469. self, ctxt, endpoint_id, source_env):
  470. endpoint = self.get_endpoint(ctxt, endpoint_id)
  471. worker_rpc = self._get_worker_service_rpc_for_specs(
  472. ctxt, enabled=True,
  473. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  474. provider_requirements={
  475. endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT]})
  476. return worker_rpc.validate_endpoint_source_environment(
  477. ctxt, endpoint.type, source_env)
  478. def validate_endpoint_source_minion_pool_options(
  479. self, ctxt, endpoint_id, pool_environment):
  480. endpoint = self.get_endpoint(ctxt, endpoint_id)
  481. worker_rpc = self._get_worker_service_rpc_for_specs(
  482. ctxt, enabled=True,
  483. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  484. provider_requirements={
  485. endpoint.type: [
  486. constants.PROVIDER_TYPE_SOURCE_MINION_POOL]})
  487. return worker_rpc.validate_endpoint_source_minion_pool_options(
  488. ctxt, endpoint.type, pool_environment)
  489. def validate_endpoint_destination_minion_pool_options(
  490. self, ctxt, endpoint_id, pool_environment):
  491. endpoint = self.get_endpoint(ctxt, endpoint_id)
  492. worker_rpc = self._get_worker_service_rpc_for_specs(
  493. ctxt, enabled=True,
  494. region_sets=[[reg.id for reg in endpoint.mapped_regions]],
  495. provider_requirements={
  496. endpoint.type: [
  497. constants.PROVIDER_TYPE_DESTINATION_MINION_POOL]})
  498. return worker_rpc.validate_endpoint_destination_minion_pool_options(
  499. ctxt, endpoint.type, pool_environment)
  500. def get_available_providers(self, ctxt):
  501. # TODO(aznashwan): merge list of all providers from all
  502. # worker services:
  503. worker_rpc = self._get_worker_rpc_for_host(
  504. self._get_any_worker_service(ctxt)['host'])
  505. return worker_rpc.get_available_providers(ctxt)
  506. def get_provider_schemas(self, ctxt, platform_name, provider_type):
  507. # TODO(aznashwan): merge or version/namespace schemas for each worker?
  508. worker_rpc = self._get_worker_rpc_for_host(
  509. self._get_any_worker_service(ctxt)['host'])
  510. return worker_rpc.get_provider_schemas(
  511. ctxt, platform_name, provider_type)
  512. @staticmethod
  513. def _create_task(instance, task_type, execution, depends_on=None,
  514. on_error=False, on_error_only=False):
  515. """ Creates a task with the given properties.
  516. NOTE: for on_error and on_error_only tasks, the parent dependencies who
  517. are the ones which require cleanup should also be included!
  518. Ex: for the DELETE_OS_MORPHING_RESOURCES task, include both the
  519. OSMorphing task, as well as the DEPLOY_OS_MORPHIN_RESOURCES one!
  520. """
  521. task = models.Task()
  522. task.id = str(uuid.uuid4())
  523. task.instance = instance
  524. task.execution = execution
  525. task.task_type = task_type
  526. task.depends_on = depends_on
  527. task.on_error = on_error
  528. task.index = len(task.execution.tasks) + 1
  529. # non-error tasks are automatically set to scheduled:
  530. if not on_error and not on_error_only:
  531. task.status = constants.TASK_STATUS_SCHEDULED
  532. else:
  533. task.status = constants.TASK_STATUS_ON_ERROR_ONLY
  534. # on-error-only tasks remain marked as such
  535. # regardless of dependencies:
  536. if on_error_only:
  537. task.on_error = True
  538. # plain on-error but depend on already-defined
  539. # scheduled tasks count as scheduled:
  540. elif depends_on:
  541. for task_id in depends_on:
  542. if [t for t in task.execution.tasks if t.id == task_id and
  543. t.status != constants.TASK_STATUS_ON_ERROR_ONLY]:
  544. task.status = constants.TASK_STATUS_SCHEDULED
  545. break
  546. # on_error tasks with no deps are automatically scheduled:
  547. else:
  548. task.status = constants.TASK_STATUS_SCHEDULED
  549. return task
  550. def _get_task_origin(self, ctxt, action):
  551. endpoint = self.get_endpoint(ctxt, action.origin_endpoint_id)
  552. return {
  553. "connection_info": endpoint.connection_info,
  554. "type": endpoint.type,
  555. "source_environment": action.source_environment
  556. }
  557. def _get_task_destination(self, ctxt, action):
  558. endpoint = self.get_endpoint(ctxt, action.destination_endpoint_id)
  559. return {
  560. "connection_info": endpoint.connection_info,
  561. "type": endpoint.type,
  562. "target_environment": action.destination_environment
  563. }
  564. def _get_worker_service_rpc_for_task(
  565. self, ctxt, task, origin_endpoint, destination_endpoint,
  566. retry_count=5, retry_period=2):
  567. LOG.debug(
  568. "Compiling required Worker Service specs for task with "
  569. "ID '%s' (type '%s') from endpoints '%s' to '%s'",
  570. task.id, task.task_type, origin_endpoint.id,
  571. destination_endpoint.id)
  572. task_cls = tasks_factory.get_task_runner_class(task.task_type)
  573. # determine required Coriolis regions based on the endpoints:
  574. required_region_sets = []
  575. origin_endpoint_region_ids = [
  576. r.id for r in origin_endpoint.mapped_regions]
  577. destination_endpoint_region_ids = [
  578. r.id for r in destination_endpoint.mapped_regions]
  579. required_platform = task_cls.get_required_platform()
  580. if required_platform in (
  581. constants.TASK_PLATFORM_SOURCE,
  582. constants.TASK_PLATFORM_BILATERAL):
  583. required_region_sets.append(origin_endpoint_region_ids)
  584. if required_platform in (
  585. constants.TASK_PLATFORM_DESTINATION,
  586. constants.TASK_PLATFORM_BILATERAL):
  587. required_region_sets.append(destination_endpoint_region_ids)
  588. # determine provider requirements:
  589. provider_requirements = {}
  590. required_provider_types = task_cls.get_required_provider_types()
  591. if constants.PROVIDER_PLATFORM_SOURCE in required_provider_types:
  592. provider_requirements[origin_endpoint.type] = (
  593. required_provider_types[
  594. constants.PROVIDER_PLATFORM_SOURCE])
  595. if constants.PROVIDER_PLATFORM_DESTINATION in required_provider_types:
  596. provider_requirements[destination_endpoint.type] = (
  597. required_provider_types[
  598. constants.PROVIDER_PLATFORM_DESTINATION])
  599. worker_rpc = None
  600. for i in range(retry_count):
  601. try:
  602. LOG.debug(
  603. "Requesting Worker Service for task with ID '%s' (type "
  604. "'%s') from endpoints '%s' to '%s'", task.id,
  605. task.task_type, origin_endpoint.id,
  606. destination_endpoint.id)
  607. worker_rpc = self._get_worker_service_rpc_for_specs(
  608. ctxt, provider_requirements=provider_requirements,
  609. region_sets=required_region_sets, enabled=True)
  610. LOG.debug(
  611. "Scheduler has granted Worker Service for task with ID "
  612. "'%s' (type '%s') from endpoints '%s' to '%s'",
  613. task.id, task.task_type, origin_endpoint.id,
  614. destination_endpoint.id)
  615. return worker_rpc
  616. except Exception as ex:
  617. LOG.warn(
  618. "Failed to schedule task with ID '%s' (attempt %d/%d). "
  619. "Waiting %d seconds and then retrying. Error was: %s",
  620. task.id, i+1, retry_count, retry_period,
  621. utils.get_exception_details())
  622. time.sleep(retry_period)
  623. message = (
  624. "Failed to schedule task %s after %d tries. This may indicate that"
  625. " there are no Coriolis Worker services able to perform the task "
  626. "on the platforms and in the Coriolis Regions required by the "
  627. "selected source/destination Coriolis Endpoints. Please review"
  628. " the Conductor and Scheduler logs for more exact details." % (
  629. task.id, retry_count))
  630. db_api.set_task_status(
  631. ctxt, task.id, constants.TASK_STATUS_FAILED_TO_SCHEDULE,
  632. exception_details=message)
  633. raise exception.NoSuitableWorkerServiceError(message)
  634. def _begin_tasks(
  635. self, ctxt, execution, task_info={},
  636. scheduling_retry_count=5, scheduling_retry_period=2):
  637. """ Starts all non-error-only tasks which have no depencies. """
  638. if not ctxt.trust_id:
  639. keystone.create_trust(ctxt)
  640. ctxt.delete_trust_id = True
  641. origin = self._get_task_origin(ctxt, execution.action)
  642. destination = self._get_task_destination(ctxt, execution.action)
  643. origin_endpoint = db_api.get_endpoint(
  644. ctxt, execution.action.origin_endpoint_id)
  645. destination_endpoint = db_api.get_endpoint(
  646. ctxt, execution.action.destination_endpoint_id)
  647. newly_started_tasks = []
  648. for task in execution.tasks:
  649. if (not task.depends_on and
  650. task.status == constants.TASK_STATUS_SCHEDULED):
  651. LOG.info(
  652. "Starting dependency-less task '%s' for execution '%s'",
  653. task.id, execution.id)
  654. db_api.set_task_status(
  655. ctxt, task.id, constants.TASK_STATUS_PENDING)
  656. try:
  657. worker_rpc = self._get_worker_service_rpc_for_task(
  658. ctxt, task, origin_endpoint, destination_endpoint,
  659. retry_count=scheduling_retry_count,
  660. retry_period=scheduling_retry_period)
  661. worker_rpc.begin_task(
  662. ctxt,
  663. task_id=task.id,
  664. task_type=task.task_type,
  665. origin=origin,
  666. destination=destination,
  667. instance=task.instance,
  668. task_info=task_info.get(task.instance, {}))
  669. except Exception as ex:
  670. LOG.warn(
  671. "Error occured while starting new task '%s'. "
  672. "Cancelling execution '%s'. Error was: %s",
  673. task.id, execution.id, utils.get_exception_details())
  674. self._cancel_tasks_execution(
  675. ctxt, execution, requery=True)
  676. raise
  677. newly_started_tasks.append(task.id)
  678. if newly_started_tasks:
  679. LOG.info(
  680. "Started the following tasks for Execution '%s': %s",
  681. execution.id, newly_started_tasks)
  682. self._set_tasks_execution_status(
  683. ctxt, execution.id, constants.TASK_STATUS_RUNNING)
  684. else:
  685. # NOTE: this should never happen if _check_execution_tasks_sanity
  686. # was called before this method:
  687. raise exception.InvalidActionTasksExecutionState(
  688. "No tasks were started at the beginning of execution '%s'" % (
  689. execution.id))
  690. def _check_execution_tasks_sanity(
  691. self, execution, initial_task_info):
  692. """ Checks whether the given execution's tasks are:
  693. - properly odered and not set to deadlock off the bat
  694. - properly manipulate the task_info in the right order
  695. """
  696. all_instances_in_tasks = {
  697. t.instance for t in execution.tasks}
  698. instances_tasks_mapping = {
  699. instance: [
  700. t for t in execution.tasks if t.instance == instance]
  701. for instance in all_instances_in_tasks}
  702. def _check_task_cls_param_requirements(task, instance_task_info_keys):
  703. task_cls = tasks_factory.get_task_runner_class(task.task_type)
  704. missing_params = [
  705. p for p in task_cls.get_required_task_info_properties()
  706. if p not in instance_task_info_keys]
  707. if missing_params:
  708. raise exception.CoriolisException(
  709. "The following task parameters for instance '%s' "
  710. "are missing from the task_info for task '%s' of "
  711. "type '%s': %s" % (
  712. task.instance, task.id, task.task_type,
  713. missing_params))
  714. return task_cls.get_returned_task_info_properties()
  715. for instance, instance_tasks in instances_tasks_mapping.items():
  716. task_info_keys = set(initial_task_info.get(
  717. instance, {}).keys())
  718. # mapping between the ID and associated object of processed tasks:
  719. processed_tasks = {}
  720. tasks_to_process = {
  721. t.id: t for t in instance_tasks}
  722. while tasks_to_process:
  723. queued_tasks = []
  724. # gather all tasks which will be queued to run in parallel:
  725. for task in tasks_to_process.values():
  726. if task.status in (
  727. constants.TASK_STATUS_SCHEDULED,
  728. constants.TASK_STATUS_ON_ERROR_ONLY):
  729. if not task.depends_on:
  730. queued_tasks.append(task)
  731. else:
  732. missing_deps = [
  733. dep_id
  734. for dep_id in task.depends_on
  735. if dep_id not in tasks_to_process and (
  736. dep_id not in processed_tasks)]
  737. if missing_deps:
  738. raise exception.CoriolisException(
  739. "Task '%s' (type '%s') for instance '%s' "
  740. "has non-existent tasks referenced as "
  741. "dependencies: %s" % (
  742. task.id, task.task_type,
  743. instance, missing_deps))
  744. if all(
  745. [dep_id in processed_tasks
  746. for dep_id in task.depends_on]):
  747. queued_tasks.append(task)
  748. else:
  749. raise exception.CoriolisException(
  750. "Invalid initial state '%s' for task '%s' "
  751. "of type '%s'."% (
  752. task.status, task.id, task.task_type))
  753. # check if nothing was left queued:
  754. if not queued_tasks:
  755. remaining_tasks_deps_map = {
  756. (tid, t.task_type): t.depends_on
  757. for tid, t in tasks_to_process.items()}
  758. processed_tasks_type_map = {
  759. tid: t.task_type
  760. for tid, t in processed_tasks.items()}
  761. raise exception.CoriolisException(
  762. "Execution '%s' (type '%s') is bound to be deadlocked:"
  763. " there are leftover tasks for instance '%s' which "
  764. "will never get queued. Already processed tasks are: "
  765. "%s. Tasks left: %s" % (
  766. execution.id, execution.type, instance,
  767. processed_tasks_type_map, remaining_tasks_deps_map))
  768. # mapping for task_info fields modified by each task:
  769. modified_fields_by_queued_tasks = {}
  770. # check that each task has what it needs and
  771. # register what they return/modify:
  772. for task in queued_tasks:
  773. for new_field in _check_task_cls_param_requirements(
  774. task, task_info_keys):
  775. if new_field not in modified_fields_by_queued_tasks:
  776. modified_fields_by_queued_tasks[new_field] = [task]
  777. else:
  778. modified_fields_by_queued_tasks[new_field].append(
  779. task)
  780. # check if any queued tasks would manipulate the same fields:
  781. conflicting_fields = {
  782. new_field: [t.task_type for t in tasks]
  783. for new_field, tasks in (
  784. modified_fields_by_queued_tasks.items())
  785. if len(tasks) > 1}
  786. if conflicting_fields:
  787. raise exception.CoriolisException(
  788. "There are fields which will encounter a state "
  789. "conflict following the parallelized execution of "
  790. "tasks for execution '%s' (type '%s') for instance "
  791. "'%s'. Conflicting fields and tasks will be: : %s" % (
  792. execution.id, execution.type, instance,
  793. conflicting_fields))
  794. # register queued tasks as processed before continuing:
  795. for task in queued_tasks:
  796. processed_tasks[task.id] = task
  797. tasks_to_process.pop(task.id)
  798. # update current state fields at this point:
  799. task_info_keys = task_info_keys.union(set(
  800. modified_fields_by_queued_tasks.keys()))
  801. LOG.debug(
  802. "Successfully processed following tasks for instance '%s' "
  803. "for execution %s (type '%s') for any state conflict "
  804. "checks: %s",
  805. instance, execution.id, execution.type, [
  806. (t.id, t.task_type) for t in queued_tasks])
  807. LOG.debug(
  808. "Successfully checked all tasks for instance '%s' as part of "
  809. "execution '%s' (type '%s') for any state conflicts: %s",
  810. instance, execution.id, execution.type,
  811. [(t.id, t.task_type) for t in instance_tasks])
  812. LOG.debug(
  813. "Successfully checked all tasks for execution '%s' (type '%s') "
  814. "for ordering or state conflicts.",
  815. execution.id, execution.type)
  816. @replica_synchronized
  817. def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances):
  818. replica = self._get_replica(ctxt, replica_id)
  819. self._check_reservation_for_transfer(
  820. replica, licensing_client.RESERVATION_TYPE_REPLICA)
  821. self._check_replica_running_executions(ctxt, replica)
  822. self._check_minion_pools_for_action(ctxt, replica)
  823. execution = models.TasksExecution()
  824. execution.id = str(uuid.uuid4())
  825. execution.action = replica
  826. execution.status = constants.EXECUTION_STATUS_UNEXECUTED
  827. execution.type = constants.EXECUTION_TYPE_REPLICA_EXECUTION
  828. # TODO(aznashwan): have these passed separately to the relevant
  829. # provider methods. They're currently passed directly inside
  830. # dest-env by the API service when accepting the call, but we
  831. # re-overwrite them here in case of Replica updates.
  832. dest_env = copy.deepcopy(replica.destination_environment)
  833. dest_env['network_map'] = replica.network_map
  834. dest_env['storage_mappings'] = replica.storage_mappings
  835. minion_pool_allocations = self._allocate_minion_machines_for_action(
  836. ctxt, replica, include_transfer_minions=True,
  837. include_osmorphing_minions=False)
  838. for instance in execution.action.instances:
  839. instance_minion_machines = minion_pool_allocations.get(
  840. instance, {})
  841. instance_source_minion = instance_minion_machines.get(
  842. 'source_minion')
  843. instance_target_minion = instance_minion_machines.get(
  844. 'target_minion')
  845. # NOTE: we default/convert the volumes info to an empty list
  846. # to preserve backwards-compatibility with older versions
  847. # of Coriolis dating before the scheduling overhaul (PR##114)
  848. if instance not in replica.info:
  849. replica.info[instance] = {'volumes_info': []}
  850. elif replica.info[instance].get('volumes_info') is None:
  851. replica.info[instance]['volumes_info'] = []
  852. # NOTE: we update all of the param values before triggering an
  853. # execution to ensure that the latest parameters are used:
  854. replica.info[instance].update({
  855. "source_environment": replica.source_environment,
  856. "target_environment": dest_env})
  857. # TODO(aznashwan): have these passed separately to the relevant
  858. # provider methods (they're currently passed directly inside
  859. # dest-env by the API service when accepting the call)
  860. # "network_map": network_map,
  861. # "storage_mappings": storage_mappings,
  862. validate_replica_source_inputs_task = self._create_task(
  863. instance,
  864. constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS,
  865. execution)
  866. get_instance_info_task = self._create_task(
  867. instance,
  868. constants.TASK_TYPE_GET_INSTANCE_INFO,
  869. execution)
  870. validate_replica_destination_inputs_task = self._create_task(
  871. instance,
  872. constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS,
  873. execution,
  874. depends_on=[get_instance_info_task.id])
  875. disk_deployment_depends_on = []
  876. validate_source_minion_task = None
  877. if instance_source_minion:
  878. replica.info[instance].update({
  879. "source_minion_machine_id": instance_source_minion.id,
  880. "source_minion_provider_properties": (
  881. instance_source_minion.provider_properties),
  882. "source_minion_connection_info": (
  883. instance_source_minion.connection_info)})
  884. validate_source_minion_task = self._create_task(
  885. instance,
  886. constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY,
  887. execution,
  888. depends_on=[
  889. get_instance_info_task.id,
  890. validate_replica_source_inputs_task.id])
  891. disk_deployment_depends_on.append(
  892. validate_source_minion_task.id)
  893. else:
  894. disk_deployment_depends_on.append(
  895. validate_replica_source_inputs_task.id)
  896. validate_target_minion_task = None
  897. if instance_target_minion:
  898. replica.info[instance].update({
  899. "target_minion_machine_id": instance_target_minion.id,
  900. "target_minion_provider_properties": (
  901. instance_target_minion.provider_properties),
  902. "target_minion_connection_info": (
  903. instance_target_minion.connection_info),
  904. "target_minion_backup_writer_connection_info": (
  905. instance_target_minion.backup_writer_connection_info)})
  906. validate_target_minion_task = self._create_task(
  907. instance,
  908. constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY,
  909. execution,
  910. depends_on=[
  911. validate_replica_destination_inputs_task.id])
  912. disk_deployment_depends_on.append(
  913. validate_target_minion_task.id)
  914. else:
  915. disk_deployment_depends_on.append(
  916. validate_replica_destination_inputs_task.id)
  917. deploy_replica_disks_task = self._create_task(
  918. instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS,
  919. execution, depends_on=disk_deployment_depends_on)
  920. shutdown_deps = []
  921. deploy_replica_source_resources_task = None
  922. if not instance_source_minion:
  923. deploy_replica_source_resources_task = self._create_task(
  924. instance,
  925. constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES,
  926. execution, depends_on=[
  927. deploy_replica_disks_task.id])
  928. shutdown_deps.append(deploy_replica_source_resources_task)
  929. attach_target_minion_disks_task = None
  930. deploy_replica_target_resources_task = None
  931. if instance_target_minion:
  932. ttyp = constants.TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION
  933. attach_target_minion_disks_task = self._create_task(
  934. instance, ttyp, execution, depends_on=[
  935. deploy_replica_disks_task.id])
  936. shutdown_deps.append(attach_target_minion_disks_task)
  937. else:
  938. deploy_replica_target_resources_task = self._create_task(
  939. instance,
  940. constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES,
  941. execution, depends_on=[
  942. deploy_replica_disks_task.id])
  943. shutdown_deps.append(deploy_replica_target_resources_task)
  944. depends_on = [t.id for t in shutdown_deps]
  945. if shutdown_instances:
  946. shutdown_instance_task = self._create_task(
  947. instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
  948. execution, depends_on=depends_on)
  949. depends_on = [shutdown_instance_task.id]
  950. replicate_disks_task = self._create_task(
  951. instance, constants.TASK_TYPE_REPLICATE_DISKS,
  952. execution, depends_on=depends_on)
  953. if instance_source_minion:
  954. self._create_task(
  955. instance,
  956. constants.TASK_TYPE_RELEASE_SOURCE_MINION,
  957. execution,
  958. depends_on=[
  959. validate_source_minion_task.id,
  960. replicate_disks_task.id],
  961. on_error=True)
  962. else:
  963. self._create_task(
  964. instance,
  965. constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
  966. execution,
  967. depends_on=[
  968. deploy_replica_source_resources_task.id,
  969. replicate_disks_task.id],
  970. on_error=True)
  971. if instance_target_minion:
  972. detach_volumes_from_minion_task = self._create_task(
  973. instance,
  974. constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION,
  975. execution,
  976. depends_on=[
  977. attach_target_minion_disks_task.id,
  978. replicate_disks_task.id],
  979. on_error=True)
  980. self._create_task(
  981. instance,
  982. constants.TASK_TYPE_RELEASE_DESTINATION_MINION,
  983. execution,
  984. depends_on=[
  985. validate_target_minion_task.id,
  986. detach_volumes_from_minion_task.id],
  987. on_error=True)
  988. else:
  989. self._create_task(
  990. instance,
  991. constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
  992. execution, depends_on=[
  993. deploy_replica_target_resources_task.id,
  994. replicate_disks_task.id],
  995. on_error=True)
  996. try:
  997. self._check_execution_tasks_sanity(execution, replica.info)
  998. # update the action info for all of the Replicas:
  999. for instance in execution.action.instances:
  1000. db_api.update_transfer_action_info_for_instance(
  1001. ctxt, replica.id, instance, replica.info[instance])
  1002. # add new execution to DB:
  1003. db_api.add_replica_tasks_execution(ctxt, execution)
  1004. LOG.info("Replica tasks execution created: %s", execution.id)
  1005. self._begin_tasks(ctxt, execution, task_info=replica.info)
  1006. except Exception:
  1007. if minion_pool_allocations:
  1008. LOG.warn(
  1009. "Exception occured while verifying/registering Replica "
  1010. "tasks execution for Replica '%s'. Cleaning up all minion "
  1011. "allocations from DB: %s" % (
  1012. replica.id, minion_pool_allocations))
  1013. self._deallocate_minion_machines_for_action(ctxt, replica)
  1014. raise
  1015. return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
  1016. @replica_synchronized
  1017. def get_replica_tasks_executions(self, ctxt, replica_id,
  1018. include_tasks=False):
  1019. return db_api.get_replica_tasks_executions(
  1020. ctxt, replica_id, include_tasks)
  1021. @tasks_execution_synchronized
  1022. def get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
  1023. return self._get_replica_tasks_execution(
  1024. ctxt, replica_id, execution_id)
  1025. @tasks_execution_synchronized
  1026. def delete_replica_tasks_execution(self, ctxt, replica_id, execution_id):
  1027. execution = self._get_replica_tasks_execution(
  1028. ctxt, replica_id, execution_id)
  1029. if execution.status in constants.ACTIVE_EXECUTION_STATUSES:
  1030. raise exception.InvalidMigrationState(
  1031. "Cannot delete execution '%s' for Replica '%s' as it is "
  1032. "currently in '%s' state." % (
  1033. execution_id, replica_id, execution.status))
  1034. db_api.delete_replica_tasks_execution(ctxt, execution_id)
  1035. @tasks_execution_synchronized
  1036. def cancel_replica_tasks_execution(self, ctxt, replica_id, execution_id,
  1037. force):
  1038. execution = self._get_replica_tasks_execution(
  1039. ctxt, replica_id, execution_id)
  1040. if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
  1041. raise exception.InvalidReplicaState(
  1042. "Replica '%s' has no running execution to cancel." % (
  1043. replica_id))
  1044. if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
  1045. not force):
  1046. raise exception.InvalidReplicaState(
  1047. "Replica '%s' is already being cancelled. Please use the "
  1048. "force option if you'd like to force-cancel it." % (
  1049. replica_id))
  1050. self._cancel_tasks_execution(ctxt, execution, force=force)
  1051. def _get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
  1052. execution = db_api.get_replica_tasks_execution(
  1053. ctxt, replica_id, execution_id)
  1054. if not execution:
  1055. raise exception.NotFound(
  1056. "Execution with ID '%s' for Replica '%s' not found." % (
  1057. execution_id, replica_id))
  1058. return execution
  1059. def get_replicas(self, ctxt, include_tasks_executions=False):
  1060. return db_api.get_replicas(ctxt, include_tasks_executions)
  1061. @replica_synchronized
  1062. def get_replica(self, ctxt, replica_id):
  1063. return self._get_replica(ctxt, replica_id)
  1064. @replica_synchronized
  1065. def delete_replica(self, ctxt, replica_id):
  1066. replica = self._get_replica(ctxt, replica_id)
  1067. self._check_replica_running_executions(ctxt, replica)
  1068. self._check_delete_reservation_for_transfer(replica)
  1069. db_api.delete_replica(ctxt, replica_id)
  1070. @replica_synchronized
  1071. def delete_replica_disks(self, ctxt, replica_id):
  1072. replica = self._get_replica(ctxt, replica_id)
  1073. self._check_replica_running_executions(ctxt, replica)
  1074. execution = models.TasksExecution()
  1075. execution.id = str(uuid.uuid4())
  1076. execution.status = constants.EXECUTION_STATUS_UNEXECUTED
  1077. execution.action = replica
  1078. execution.type = constants.EXECUTION_TYPE_REPLICA_DISKS_DELETE
  1079. has_tasks = False
  1080. for instance in replica.instances:
  1081. if (instance in replica.info and
  1082. replica.info[instance].get('volumes_info')):
  1083. source_del_task = self._create_task(
  1084. instance,
  1085. constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS,
  1086. execution)
  1087. self._create_task(
  1088. instance, constants.TASK_TYPE_DELETE_REPLICA_DISKS,
  1089. execution, depends_on=[source_del_task.id])
  1090. has_tasks = True
  1091. if not has_tasks:
  1092. raise exception.InvalidReplicaState(
  1093. "Replica '%s' does not have volumes information for any "
  1094. "instances. Ensure that the replica has been executed "
  1095. "successfully priorly" % replica_id)
  1096. # ensure we're passing the updated target-env options on the
  1097. # parent Replica itself in case of a Replica update:
  1098. dest_env = copy.deepcopy(replica.destination_environment)
  1099. dest_env['network_map'] = replica.network_map
  1100. dest_env['storage_mappings'] = replica.storage_mappings
  1101. for instance in replica.instances:
  1102. replica.info[instance].update({
  1103. "target_environment": dest_env})
  1104. self._check_execution_tasks_sanity(execution, replica.info)
  1105. # update the action info for all of the Replicas' instances:
  1106. for instance in replica.instances:
  1107. db_api.update_transfer_action_info_for_instance(
  1108. ctxt, replica.id, instance, replica.info[instance])
  1109. db_api.add_replica_tasks_execution(ctxt, execution)
  1110. LOG.info("Replica tasks execution created: %s", execution.id)
  1111. self._begin_tasks(ctxt, execution, task_info=replica.info)
  1112. return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
  1113. @staticmethod
  1114. def _check_endpoints(ctxt, origin_endpoint, destination_endpoint):
  1115. if origin_endpoint.id == destination_endpoint.id:
  1116. raise exception.SameDestination(
  1117. "The origin and destination endpoints cannot be the same. "
  1118. "If you need to perform operations across two areas of "
  1119. "the same platform (ex: migrating across public cloud regions)"
  1120. ", please create two separate endpoints.")
  1121. # TODO(alexpilotti): check Barbican secrets content as well
  1122. if (origin_endpoint.connection_info ==
  1123. destination_endpoint.connection_info):
  1124. raise exception.SameDestination()
  1125. def _check_minion_pools_for_action(self, ctxt, action):
  1126. minion_pools = {
  1127. pool.id: pool
  1128. for pool in db_api.get_minion_pool_lifecycles(
  1129. ctxt, include_tasks_executions=False,
  1130. include_info=False, include_machines=False,
  1131. to_dict=False)}
  1132. def _get_pool(pool_id):
  1133. pool = minion_pools.get(pool_id)
  1134. if not pool:
  1135. raise exception.NotFound(
  1136. "Could not find minion pool with ID '%s'." % pool_id)
  1137. return pool
  1138. if action.origin_minion_pool_id:
  1139. origin_pool = _get_pool(action.origin_minion_pool_id)
  1140. if origin_pool.origin_endpoint_id != action.origin_endpoint_id:
  1141. raise exception.InvalidMinionPoolSelection(
  1142. "The selected origin minion pool ('%s') belongs to a "
  1143. "different Coriolis endpoint ('%s') than the requested "
  1144. "origin endpoint ('%s')" % (
  1145. action.origin_minion_pool_id,
  1146. origin_pool.origin_endpoint_id,
  1147. action.origin_endpoint_id))
  1148. if origin_pool.pool_platform != constants.PROVIDER_PLATFORM_SOURCE:
  1149. raise exception.InvalidMinionPoolSelection(
  1150. "The selected origin minion pool ('%s') is configured as a"
  1151. " '%s' pool. The pool must be of type %s to be used for "
  1152. "data exports." % (
  1153. action.origin_minion_pool_id,
  1154. origin_pool.pool_platform,
  1155. constants.PROVIDER_PLATFORM_SOURCE))
  1156. if origin_pool.pool_os_type != constants.OS_TYPE_LINUX:
  1157. raise exception.InvalidMinionPoolSelection(
  1158. "The selected origin minion pool ('%s') is of OS type '%s'"
  1159. " instead of the Linux OS type required for a source "
  1160. "transfer minion pool." % (
  1161. action.origin_minion_pool_id,
  1162. origin_pool.pool_os_type))
  1163. LOG.debug(
  1164. "Successfully validated compatibility of origin minion pool "
  1165. "'%s' for use with action '%s'." % (
  1166. action.origin_minion_pool_id, action.id))
  1167. if action.destination_minion_pool_id:
  1168. destination_pool = _get_pool(action.destination_minion_pool_id)
  1169. if destination_pool.origin_endpoint_id != (
  1170. action.destination_endpoint_id):
  1171. raise exception.InvalidMinionPoolSelection(
  1172. "The selected destination minion pool ('%s') belongs to a "
  1173. "different Coriolis endpoint ('%s') than the requested "
  1174. "destination endpoint ('%s')" % (
  1175. action.destination_minion_pool_id,
  1176. destination_pool.origin_endpoint_id,
  1177. action.destination_endpoint_id))
  1178. if destination_pool.pool_platform != (
  1179. constants.PROVIDER_PLATFORM_DESTINATION):
  1180. raise exception.InvalidMinionPoolSelection(
  1181. "The selected destination minion pool ('%s') is configured"
  1182. " as a '%s'. The pool must be of type %s to be used for "
  1183. "data imports." % (
  1184. action.destination_minion_pool_id,
  1185. destination_pool.pool_platform,
  1186. constants.PROVIDER_PLATFORM_DESTINATION))
  1187. if destination_pool.pool_os_type != constants.OS_TYPE_LINUX:
  1188. raise exception.InvalidMinionPoolSelection(
  1189. "The selected destination minion pool ('%s') is of OS type"
  1190. " '%s' instead of the Linux OS type required for a source "
  1191. "transfer minion pool." % (
  1192. action.destination_minion_pool_id,
  1193. destination_pool.pool_os_type))
  1194. LOG.debug(
  1195. "Successfully validated compatibility of destination minion "
  1196. "pool '%s' for use with action '%s'." % (
  1197. action.origin_minion_pool_id, action.id))
  1198. if action.instance_osmorphing_minion_pool_mappings:
  1199. osmorphing_pool_mappings = {
  1200. instance_id: pool_id
  1201. for (instance_id, pool_id) in (
  1202. action.instance_osmorphing_minion_pool_mappings.items())
  1203. if pool_id}
  1204. for (instance, pool_id) in osmorphing_pool_mappings.items():
  1205. osmorphing_pool = _get_pool(pool_id)
  1206. if osmorphing_pool.origin_endpoint_id != (
  1207. action.destination_endpoint_id):
  1208. raise exception.InvalidMinionPoolSelection(
  1209. "The selected OSMorphing minion pool for instance '%s'"
  1210. " ('%s') belongs to a different Coriolis endpoint "
  1211. "('%s') than the destination endpoint ('%s')" % (
  1212. instance, pool_id,
  1213. osmorphing_pool.origin_endpoint_id,
  1214. action.destination_endpoint_id))
  1215. if osmorphing_pool.pool_platform != (
  1216. constants.PROVIDER_PLATFORM_DESTINATION):
  1217. raise exception.InvalidMinionPoolSelection(
  1218. "The selected OSMorphing minion pool for instance '%s'"
  1219. " ('%s') is configured as a '%s' pool. The pool must "
  1220. "be of type %s to be used for OSMorphing." % (
  1221. instance, pool_id,
  1222. osmorphing_pool.pool_platform,
  1223. constants.PROVIDER_PLATFORM_DESTINATION))
  1224. LOG.debug(
  1225. "Successfully validated compatibility of destination "
  1226. "minion pool '%s' for use as OSMorphing minion for "
  1227. "instance '%s' during action '%s'." % (
  1228. action.origin_minion_pool_id, instance, action.id))
  1229. def create_instances_replica(self, ctxt, origin_endpoint_id,
  1230. destination_endpoint_id,
  1231. origin_minion_pool_id,
  1232. destination_minion_pool_id,
  1233. instance_osmorphing_minion_pool_mappings,
  1234. source_environment,
  1235. destination_environment, instances,
  1236. network_map, storage_mappings, notes=None):
  1237. origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id)
  1238. destination_endpoint = self.get_endpoint(ctxt, destination_endpoint_id)
  1239. self._check_endpoints(ctxt, origin_endpoint, destination_endpoint)
  1240. replica = models.Replica()
  1241. replica.id = str(uuid.uuid4())
  1242. replica.base_id = replica.id
  1243. replica.origin_endpoint_id = origin_endpoint_id
  1244. replica.origin_minion_pool_id = origin_minion_pool_id
  1245. replica.destination_endpoint_id = destination_endpoint_id
  1246. replica.destination_minion_pool_id = destination_minion_pool_id
  1247. replica.destination_environment = destination_environment
  1248. replica.source_environment = source_environment
  1249. replica.last_execution_status = constants.EXECUTION_STATUS_UNEXECUTED
  1250. replica.instances = instances
  1251. replica.executions = []
  1252. replica.info = {instance: {
  1253. 'volumes_info': []} for instance in instances}
  1254. replica.notes = notes
  1255. replica.network_map = network_map
  1256. replica.storage_mappings = storage_mappings
  1257. replica.instance_osmorphing_minion_pool_mappings = (
  1258. instance_osmorphing_minion_pool_mappings)
  1259. self._check_minion_pools_for_action(ctxt, replica)
  1260. self._check_create_reservation_for_transfer(
  1261. replica, licensing_client.RESERVATION_TYPE_REPLICA)
  1262. db_api.add_replica(ctxt, replica)
  1263. LOG.info("Replica created: %s", replica.id)
  1264. return self.get_replica(ctxt, replica.id)
  1265. def _get_replica(self, ctxt, replica_id):
  1266. replica = db_api.get_replica(ctxt, replica_id)
  1267. if not replica:
  1268. raise exception.NotFound(
  1269. "Replica with ID '%s' not found." % replica_id)
  1270. return replica
  1271. def get_migrations(self, ctxt, include_tasks,
  1272. include_info=False):
  1273. return db_api.get_migrations(
  1274. ctxt, include_tasks,
  1275. include_info=include_info)
  1276. @migration_synchronized
  1277. def get_migration(self, ctxt, migration_id):
  1278. # the default serialization mechanism enforces a max_depth of 3
  1279. return utils.to_dict(self._get_migration(ctxt, migration_id))
  1280. @staticmethod
  1281. def _check_running_replica_migrations(ctxt, replica_id):
  1282. migrations = db_api.get_replica_migrations(ctxt, replica_id)
  1283. if [m.id for m in migrations if m.executions[0].status in (
  1284. constants.ACTIVE_EXECUTION_STATUSES)]:
  1285. raise exception.InvalidReplicaState(
  1286. "Replica '%s' is currently being migrated" % replica_id)
  1287. @staticmethod
  1288. def _check_running_executions(action):
  1289. running_executions = [
  1290. e.id for e in action.executions
  1291. if e.status in constants.ACTIVE_EXECUTION_STATUSES]
  1292. if running_executions:
  1293. raise exception.InvalidActionTasksExecutionState(
  1294. "Another tasks execution is in progress: %s" % (
  1295. running_executions))
  1296. def _check_replica_running_executions(self, ctxt, replica):
  1297. self._check_running_executions(replica)
  1298. self._check_running_replica_migrations(ctxt, replica.id)
  1299. @staticmethod
  1300. def _check_valid_replica_tasks_execution(replica, force=False):
  1301. sorted_executions = sorted(
  1302. replica.executions, key=lambda e: e.number, reverse=True)
  1303. if not sorted_executions:
  1304. raise exception.InvalidReplicaState(
  1305. "The Replica has never been executed.")
  1306. if not [e for e in sorted_executions
  1307. if e.type == constants.EXECUTION_TYPE_REPLICA_EXECUTION and (
  1308. e.status == constants.EXECUTION_STATUS_COMPLETED)]:
  1309. if not force:
  1310. raise exception.InvalidReplicaState(
  1311. "A replica must have been executed successfully at least "
  1312. "once in order to be migrated")
  1313. def _get_provider_types(self, ctxt, endpoint):
  1314. provider_types = self.get_available_providers(ctxt).get(endpoint.type)
  1315. if provider_types is None:
  1316. raise exception.NotFound(
  1317. "No provider found for: %s" % endpoint.type)
  1318. return provider_types["types"]
  1319. @replica_synchronized
  1320. def deploy_replica_instances(self, ctxt, replica_id,
  1321. clone_disks, force,
  1322. instance_osmorphing_minion_pool_mappings=None,
  1323. skip_os_morphing=False,
  1324. user_scripts=None):
  1325. replica = self._get_replica(ctxt, replica_id)
  1326. self._check_reservation_for_transfer(
  1327. replica, licensing_client.RESERVATION_TYPE_REPLICA)
  1328. self._check_replica_running_executions(ctxt, replica)
  1329. self._check_valid_replica_tasks_execution(replica, force)
  1330. user_scripts = user_scripts or replica.user_scripts
  1331. destination_endpoint = self.get_endpoint(
  1332. ctxt, replica.destination_endpoint_id)
  1333. destination_provider_types = self._get_provider_types(
  1334. ctxt, destination_endpoint)
  1335. for instance, info in replica.info.items():
  1336. if not info.get("volumes_info"):
  1337. raise exception.InvalidReplicaState(
  1338. "The replica doesn't contain volumes information for "
  1339. "instance: %s. If replicated disks are deleted, the "
  1340. "replica needs to be executed anew before a migration can "
  1341. "occur" % instance)
  1342. instances = replica.instances
  1343. migration = models.Migration()
  1344. migration.id = str(uuid.uuid4())
  1345. migration.base_id = migration.id
  1346. migration.origin_endpoint_id = replica.origin_endpoint_id
  1347. migration.destination_endpoint_id = replica.destination_endpoint_id
  1348. # TODO(aznashwan): have these passed separately to the relevant
  1349. # provider methods instead of through the dest-env:
  1350. dest_env = copy.deepcopy(replica.destination_environment)
  1351. dest_env['network_map'] = replica.network_map
  1352. dest_env['storage_mappings'] = replica.storage_mappings
  1353. migration.destination_environment = dest_env
  1354. migration.source_environment = replica.source_environment
  1355. migration.network_map = replica.network_map
  1356. migration.storage_mappings = replica.storage_mappings
  1357. migration.instances = instances
  1358. migration.replica = replica
  1359. migration.info = replica.info
  1360. migration.user_scripts = user_scripts
  1361. migration.origin_minion_pool_id = replica.origin_minion_pool_id
  1362. migration.destination_minion_pool_id = (
  1363. replica.destination_minion_pool_id)
  1364. migration.instance_osmorphing_minion_pool_mappings = (
  1365. replica.instance_osmorphing_minion_pool_mappings)
  1366. if instance_osmorphing_minion_pool_mappings:
  1367. migration.instance_osmorphing_minion_pool_mappings.update(
  1368. instance_osmorphing_minion_pool_mappings)
  1369. minion_pool_allocations = self._allocate_minion_machines_for_action(
  1370. ctxt, migration, include_transfer_minions=False,
  1371. include_osmorphing_minions=not skip_os_morphing)
  1372. execution = models.TasksExecution()
  1373. migration.executions = [execution]
  1374. execution.status = constants.EXECUTION_STATUS_UNEXECUTED
  1375. execution.number = 1
  1376. execution.type = constants.EXECUTION_TYPE_REPLICA_DEPLOY
  1377. for instance in instances:
  1378. migration.info[instance]["clone_disks"] = clone_disks
  1379. scripts = self._get_instance_scripts(user_scripts, instance)
  1380. migration.info[instance]["user_scripts"] = scripts
  1381. # NOTE: we default/convert the volumes info to an empty list
  1382. # to preserve backwards-compatibility with older versions
  1383. # of Coriolis dating before the scheduling overhaul (PR##114)
  1384. if instance not in migration.info:
  1385. migration.info[instance] = {'volumes_info': []}
  1386. # NOTE: we update all of the param values before triggering an
  1387. # execution to ensure that the params on the Replica are used
  1388. # in case there was a failed Replica update (where the new values
  1389. # could be in the `.info` field instead of the old ones)
  1390. migration.info[instance].update({
  1391. "source_environment": migration.source_environment,
  1392. "target_environment": dest_env})
  1393. # TODO(aznashwan): have these passed separately to the relevant
  1394. # provider methods (they're currently passed directly inside
  1395. # dest-env by the API service when accepting the call)
  1396. # "network_map": network_map,
  1397. # "storage_mappings": storage_mappings,
  1398. instance_minion_machines = minion_pool_allocations.get(
  1399. instance, {})
  1400. instance_osmorphing_minion = instance_minion_machines.get(
  1401. 'osmorphing_minion')
  1402. validate_replica_deployment_inputs_task = self._create_task(
  1403. instance,
  1404. constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS,
  1405. execution)
  1406. validate_osmorphing_minion_task = None
  1407. last_validation_task = validate_replica_deployment_inputs_task
  1408. if not skip_os_morphing and instance_osmorphing_minion:
  1409. migration.info[instance].update({
  1410. "osmorphing_minion_machine_id": instance_osmorphing_minion.id,
  1411. "osmorphing_minion_provider_properties": (
  1412. instance_osmorphing_minion.provider_properties),
  1413. "osmorphing_minion_connection_info": (
  1414. instance_osmorphing_minion.connection_info)})
  1415. validate_osmorphing_minion_task = self._create_task(
  1416. instance,
  1417. constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY,
  1418. execution, depends_on=[
  1419. validate_replica_deployment_inputs_task.id])
  1420. last_validation_task = validate_osmorphing_minion_task
  1421. create_snapshot_task = self._create_task(
  1422. instance, constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS,
  1423. execution, depends_on=[
  1424. last_validation_task.id])
  1425. deploy_replica_task = self._create_task(
  1426. instance,
  1427. constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES,
  1428. execution,
  1429. depends_on=[create_snapshot_task.id])
  1430. depends_on = [deploy_replica_task.id]
  1431. if not skip_os_morphing:
  1432. task_deploy_os_morphing_resources = None
  1433. attach_osmorphing_minion_volumes_task = None
  1434. last_osmorphing_resources_deployment_task = None
  1435. if instance_osmorphing_minion:
  1436. osmorphing_vol_attachment_deps = [
  1437. validate_osmorphing_minion_task.id]
  1438. osmorphing_vol_attachment_deps.extend(depends_on)
  1439. attach_osmorphing_minion_volumes_task = self._create_task(
  1440. instance,
  1441. constants.TASK_TYPE_ATTACH_VOLUMES_TO_OSMORPHING_MINION,
  1442. execution, depends_on=osmorphing_vol_attachment_deps)
  1443. last_osmorphing_resources_deployment_task = (
  1444. attach_osmorphing_minion_volumes_task)
  1445. collect_osmorphing_info_task = self._create_task(
  1446. instance,
  1447. constants.TASK_TYPE_COLLECT_OSMORPHING_INFO,
  1448. execution,
  1449. depends_on=[attach_osmorphing_minion_volumes_task.id])
  1450. last_osmorphing_resources_deployment_task = (
  1451. collect_osmorphing_info_task)
  1452. else:
  1453. task_deploy_os_morphing_resources = self._create_task(
  1454. instance,
  1455. constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
  1456. execution, depends_on=depends_on)
  1457. last_osmorphing_resources_deployment_task = (
  1458. task_deploy_os_morphing_resources)
  1459. task_osmorphing = self._create_task(
  1460. instance, constants.TASK_TYPE_OS_MORPHING,
  1461. execution, depends_on=[
  1462. last_osmorphing_resources_deployment_task.id])
  1463. depends_on = [task_osmorphing.id]
  1464. if instance_osmorphing_minion:
  1465. detach_osmorphing_minion_volumes_task = self._create_task(
  1466. instance,
  1467. constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION,
  1468. execution, depends_on=[
  1469. attach_osmorphing_minion_volumes_task.id,
  1470. task_osmorphing.id],
  1471. on_error=True)
  1472. release_osmorphing_minion_task = self._create_task(
  1473. instance,
  1474. constants.TASK_TYPE_RELEASE_OSMORPHING_MINION,
  1475. execution, depends_on=[
  1476. validate_osmorphing_minion_task.id,
  1477. detach_osmorphing_minion_volumes_task.id],
  1478. on_error=True)
  1479. depends_on.append(release_osmorphing_minion_task.id)
  1480. else:
  1481. task_delete_os_morphing_resources = self._create_task(
  1482. instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
  1483. execution, depends_on=[
  1484. task_deploy_os_morphing_resources.id,
  1485. task_osmorphing.id],
  1486. on_error=True)
  1487. depends_on.append(task_delete_os_morphing_resources.id)
  1488. if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
  1489. destination_provider_types):
  1490. get_optimal_flavor_task = self._create_task(
  1491. instance, constants.TASK_TYPE_GET_OPTIMAL_FLAVOR,
  1492. execution, depends_on=depends_on)
  1493. depends_on = [get_optimal_flavor_task.id]
  1494. finalize_deployment_task = self._create_task(
  1495. instance,
  1496. constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT,
  1497. execution,
  1498. depends_on=depends_on)
  1499. self._create_task(
  1500. instance,
  1501. constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS,
  1502. execution, depends_on=[
  1503. create_snapshot_task.id,
  1504. finalize_deployment_task.id],
  1505. on_error=clone_disks)
  1506. cleanup_deployment_task = self._create_task(
  1507. instance,
  1508. constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT,
  1509. execution,
  1510. depends_on=[
  1511. deploy_replica_task.id,
  1512. finalize_deployment_task.id],
  1513. on_error_only=True)
  1514. if not clone_disks:
  1515. self._create_task(
  1516. instance,
  1517. constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS,
  1518. execution,
  1519. depends_on=[cleanup_deployment_task.id],
  1520. on_error=True)
  1521. try:
  1522. self._check_execution_tasks_sanity(execution, migration.info)
  1523. db_api.add_migration(ctxt, migration)
  1524. LOG.info("Migration created: %s", migration.id)
  1525. self._begin_tasks(ctxt, execution, task_info=migration.info)
  1526. except Exception:
  1527. if minion_pool_allocations:
  1528. LOG.warn(
  1529. "Exception occured while verifying/registering tasks "
  1530. "execution for Migration '%s' from Replica '%s'. "
  1531. "Cleaning up all minion allocations from DB: %s" % (
  1532. migration.id, replica.id, minion_pool_allocations))
  1533. self._deallocate_minion_machines_for_action(ctxt, migration)
  1534. raise
  1535. return self.get_migration(ctxt, migration.id)
  1536. def _get_instance_scripts(self, user_scripts, instance):
  1537. user_scripts = user_scripts or {}
  1538. ret = {
  1539. "global": user_scripts.get("global", {}),
  1540. "instances": {},
  1541. }
  1542. if user_scripts:
  1543. instance_script = user_scripts.get(
  1544. "instances", {}).get(instance)
  1545. if instance_script:
  1546. ret["instances"][instance] = instance_script
  1547. return ret
  1548. def _allocate_minion_machines_for_action(
  1549. self, ctxt, action, include_transfer_minions=True,
  1550. include_osmorphing_minions=True):
  1551. """ Returns a dict of the form:
  1552. {
  1553. "instance_id": {
  1554. "source_minion": <source minion properties>,
  1555. "target_minion": <target minion properties>,
  1556. "osmorphing_minion": <osmorphing minion properties>
  1557. }
  1558. }
  1559. """
  1560. instance_machine_allocations = {
  1561. instance: {} for instance in action.instances}
  1562. minion_pool_ids = set()
  1563. if action.origin_minion_pool_id:
  1564. minion_pool_ids.add(action.origin_minion_pool_id)
  1565. if action.destination_minion_pool_id:
  1566. minion_pool_ids.add(action.destination_minion_pool_id)
  1567. if action.instance_osmorphing_minion_pool_mappings:
  1568. minion_pool_ids = minion_pool_ids.union(set(
  1569. action.instance_osmorphing_minion_pool_mappings.values()))
  1570. if None in minion_pool_ids:
  1571. minion_pool_ids.remove(None)
  1572. if not minion_pool_ids:
  1573. LOG.debug(
  1574. "No minion pool settings found for action '%s'. "
  1575. "Skipping minion machine allocations." % (
  1576. action.id))
  1577. return instance_machine_allocations
  1578. LOG.debug(
  1579. "All minion pool selections for action '%s': %s",
  1580. action.id, minion_pool_ids)
  1581. def _select_machine(minion_pool, exclude=None):
  1582. if not minion_pool.minion_machines:
  1583. raise exception.InvalidMinionPoolSelection(
  1584. "Minion pool with ID '%s' has no machines defined." % (
  1585. minion_pool.id))
  1586. selected_machine = None
  1587. for machine in minion_pool.minion_machines:
  1588. if exclude and machine.id in exclude:
  1589. LOG.debug(
  1590. "Excluding minion machine '%s' from search.",
  1591. machine.id)
  1592. continue
  1593. if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE:
  1594. LOG.debug(
  1595. "Minion machine with ID '%s' is in status '%s' "
  1596. "instead of '%s'. Skipping.", machine.id,
  1597. machine.status,
  1598. constants.MINION_MACHINE_STATUS_AVAILABLE)
  1599. continue
  1600. selected_machine = machine
  1601. break
  1602. if not selected_machine:
  1603. raise exception.InvalidMinionPoolSelection(
  1604. "There are no more available minion machines within minion"
  1605. " pool with ID '%s' (excluding the following ones already "
  1606. "planned for this transfer: %s). Please ensure that the "
  1607. "minion pool has enough minion machines allocated and "
  1608. "available (i.e. not being used for other operations) "
  1609. "to satisfy the number of VMs required by the Migration or"
  1610. " Replica." % (
  1611. minion_pool.id, exclude))
  1612. return selected_machine
  1613. osmorphing_pool_map = (
  1614. action.instance_osmorphing_minion_pool_mappings)
  1615. with contextlib.ExitStack() as stack:
  1616. _ = [
  1617. stack.enter_context(
  1618. lockutils.lock(
  1619. constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
  1620. external=True))
  1621. for pool_id in minion_pool_ids]
  1622. minion_pools = db_api.get_minion_pool_lifecycles(
  1623. ctxt, include_tasks_executions=False, include_info=False,
  1624. include_machines=True, to_dict=False)
  1625. minion_pool_id_mappings = {
  1626. pool.id: pool for pool in minion_pools
  1627. if pool.id in minion_pool_ids}
  1628. missing_pools = [
  1629. pool_id for pool_id in minion_pool_ids
  1630. if pool_id not in minion_pool_id_mappings]
  1631. if missing_pools:
  1632. raise exception.InvalidMinionPoolSelection(
  1633. "The following minion pools could not be found: %s" % (
  1634. missing_pools))
  1635. unallocated_pools = {
  1636. pool_id: pool.pool_status
  1637. for (pool_id, pool) in minion_pool_id_mappings.items()
  1638. if pool.pool_status != constants.MINION_POOL_STATUS_ALLOCATED}
  1639. if unallocated_pools:
  1640. raise exception.InvalidMinionPoolSelection(
  1641. "The following minion pools have not had their machines "
  1642. "allocated and thus cannot be used: %s" % (
  1643. unallocated_pools))
  1644. allocated_source_machine_ids = set()
  1645. allocated_target_machine_ids = set()
  1646. allocated_osmorphing_machine_ids = set()
  1647. for instance in action.instances:
  1648. if include_transfer_minions:
  1649. if action.origin_minion_pool_id:
  1650. origin_pool = minion_pool_id_mappings[
  1651. action.origin_minion_pool_id]
  1652. machine = _select_machine(
  1653. origin_pool, exclude=allocated_source_machine_ids)
  1654. allocated_source_machine_ids.add(machine.id)
  1655. instance_machine_allocations[
  1656. instance]['source_minion'] = machine
  1657. LOG.debug(
  1658. "Selected minion machine '%s' for source-side "
  1659. "syncing of instance '%s' as part of transfer "
  1660. "action '%s'.", machine.id, instance, action.id)
  1661. if action.destination_minion_pool_id:
  1662. dest_pool = minion_pool_id_mappings[
  1663. action.destination_minion_pool_id]
  1664. machine = _select_machine(
  1665. dest_pool, exclude=allocated_target_machine_ids)
  1666. allocated_target_machine_ids.add(machine.id)
  1667. instance_machine_allocations[
  1668. instance]['target_minion'] = machine
  1669. LOG.debug(
  1670. "Selected minion machine '%s' for target-side "
  1671. "syncing of instance '%s' as part of transfer "
  1672. "action '%s'.", machine.id, instance, action.id)
  1673. if include_osmorphing_minions:
  1674. if instance not in osmorphing_pool_map:
  1675. LOG.debug(
  1676. "Instance '%s' is not listed in the OSMorphing "
  1677. "minion pool mappings for action '%s'." % (
  1678. instance, action.id))
  1679. elif osmorphing_pool_map[instance] is None:
  1680. LOG.debug(
  1681. "OSMorphing pool ID for instance '%s' is "
  1682. "None in action '%s'. Ignoring." % (
  1683. instance, action.id))
  1684. else:
  1685. osmorphing_pool_id = osmorphing_pool_map[instance]
  1686. # if the selected target and OSMorphing pools
  1687. # are the same, reuse the same worker:
  1688. ima = instance_machine_allocations[instance]
  1689. if osmorphing_pool_id == (
  1690. action.destination_minion_pool_id) and (
  1691. 'target_minion' in ima):
  1692. allocated_target_machine = ima[
  1693. 'target_minion']
  1694. LOG.debug(
  1695. "Reusing disk sync minion '%s' for the "
  1696. "OSMorphing of instance '%s' as port of "
  1697. "transfer action '%s'",
  1698. allocated_target_machine.id, instance,
  1699. action.id)
  1700. instance_machine_allocations[
  1701. instance]['osmorphing_minion'] = (
  1702. allocated_target_machine)
  1703. # else, allocate a new minion from the selected pool:
  1704. else:
  1705. osmorphing_pool = minion_pool_id_mappings[
  1706. osmorphing_pool_id]
  1707. machine = _select_machine(
  1708. osmorphing_pool,
  1709. exclude=allocated_osmorphing_machine_ids)
  1710. allocated_osmorphing_machine_ids.add(machine.id)
  1711. instance_machine_allocations[
  1712. instance]['osmorphing_minion'] = machine
  1713. LOG.debug(
  1714. "Selected minion machine '%s' for OSMorphing "
  1715. " of instance '%s' as part of transfer "
  1716. "action '%s'.",
  1717. machine.id, instance, action.id)
  1718. # mark the selected machines as allocated:
  1719. all_machine_ids = set(itertools.chain(
  1720. allocated_source_machine_ids,
  1721. allocated_target_machine_ids,
  1722. allocated_osmorphing_machine_ids))
  1723. db_api.set_minion_machines_allocation_statuses(
  1724. ctxt, all_machine_ids, action.id,
  1725. constants.MINION_MACHINE_STATUS_ALLOCATED)
  1726. # filter out redundancies:
  1727. instance_machine_allocations = {
  1728. instance: allocations
  1729. for (instance, allocations) in instance_machine_allocations.items()
  1730. if allocations}
  1731. LOG.debug(
  1732. "Allocated the following minion machines for action '%s': %s",
  1733. action.id, {
  1734. instance: {
  1735. typ: machine.id
  1736. for (typ, machine) in allocation.items()}
  1737. for (instance, allocation) in instance_machine_allocations.items()})
  1738. return instance_machine_allocations
  1739. def _deallocate_minion_machines_for_action(self, ctxt, action):
  1740. minion_pool_ids = set()
  1741. if action.origin_minion_pool_id:
  1742. minion_pool_ids.add(action.origin_minion_pool_id)
  1743. if action.destination_minion_pool_id:
  1744. minion_pool_ids.add(action.destination_minion_pool_id)
  1745. if action.instance_osmorphing_minion_pool_mappings:
  1746. minion_pool_ids = minion_pool_ids.union(set(
  1747. action.instance_osmorphing_minion_pool_mappings.values()))
  1748. if None in minion_pool_ids:
  1749. minion_pool_ids.remove(None)
  1750. if not minion_pool_ids:
  1751. LOG.debug(
  1752. "No minion pools seem to have been used for action with "
  1753. "base_id '%s'. Skipping minion machine deallocation.",
  1754. action.base_id)
  1755. else:
  1756. LOG.debug(
  1757. "Attempting to deallocate all minion pool machine selections "
  1758. "for action '%s'. Afferent pools are: %s",
  1759. action.base_id, minion_pool_ids)
  1760. with contextlib.ExitStack() as stack:
  1761. _ = [
  1762. stack.enter_context(
  1763. lockutils.lock(
  1764. constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
  1765. external=True))
  1766. for pool_id in minion_pool_ids]
  1767. minion_machines = db_api.get_minion_machines(
  1768. ctxt, allocated_action_id=action.base_id)
  1769. machine_ids = [m.id for m in minion_machines]
  1770. if machine_ids:
  1771. LOG.info(
  1772. "Releasing the following minion machines for "
  1773. "action '%s': %s", action.base_id, machine_ids)
  1774. db_api.set_minion_machines_allocation_statuses(
  1775. ctxt, machine_ids, None,
  1776. constants.MINION_MACHINE_STATUS_AVAILABLE)
  1777. else:
  1778. LOG.debug(
  1779. "No minion machines were found to be associated "
  1780. "with action with base_id '%s'.", action.base_id)
  1781. def migrate_instances(self, ctxt, origin_endpoint_id,
  1782. destination_endpoint_id, origin_minion_pool_id,
  1783. destination_minion_pool_id,
  1784. instance_osmorphing_minion_pool_mappings,
  1785. source_environment, destination_environment,
  1786. instances, network_map, storage_mappings,
  1787. replication_count, shutdown_instances=False,
  1788. notes=None, skip_os_morphing=False, user_scripts=None):
  1789. origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id)
  1790. destination_endpoint = self.get_endpoint(ctxt, destination_endpoint_id)
  1791. self._check_endpoints(ctxt, origin_endpoint, destination_endpoint)
  1792. destination_provider_types = self._get_provider_types(
  1793. ctxt, destination_endpoint)
  1794. migration = models.Migration()
  1795. migration.id = str(uuid.uuid4())
  1796. migration.base_id = migration.id
  1797. migration.origin_endpoint_id = origin_endpoint_id
  1798. migration.destination_endpoint_id = destination_endpoint_id
  1799. migration.destination_environment = destination_environment
  1800. migration.source_environment = source_environment
  1801. migration.network_map = network_map
  1802. migration.storage_mappings = storage_mappings
  1803. migration.last_execution_status = constants.EXECUTION_STATUS_UNEXECUTED
  1804. execution = models.TasksExecution()
  1805. execution.status = constants.EXECUTION_STATUS_UNEXECUTED
  1806. execution.number = 1
  1807. execution.type = constants.EXECUTION_TYPE_MIGRATION
  1808. migration.executions = [execution]
  1809. migration.instances = instances
  1810. migration.info = {}
  1811. migration.user_scripts = user_scripts
  1812. migration.notes = notes
  1813. migration.shutdown_instances = shutdown_instances
  1814. migration.replication_count = replication_count
  1815. migration.origin_minion_pool_id = origin_minion_pool_id
  1816. migration.destination_minion_pool_id = destination_minion_pool_id
  1817. migration.instance_osmorphing_minion_pool_mappings = (
  1818. instance_osmorphing_minion_pool_mappings)
  1819. self._check_create_reservation_for_transfer(
  1820. migration, licensing_client.RESERVATION_TYPE_MIGRATION)
  1821. self._check_minion_pools_for_action(ctxt, migration)
  1822. minion_pool_allocations = self._allocate_minion_machines_for_action(
  1823. ctxt, migration, include_transfer_minions=True,
  1824. include_osmorphing_minions=not skip_os_morphing)
  1825. for instance in instances:
  1826. migration.info[instance] = {
  1827. "volumes_info": [],
  1828. "source_environment": source_environment,
  1829. "target_environment": destination_environment,
  1830. "user_scripts": self._get_instance_scripts(
  1831. user_scripts, instance),
  1832. # NOTE: we must explicitly set this in each VM's info
  1833. # to prevent the Replica disks from being cloned:
  1834. "clone_disks": False}
  1835. # TODO(aznashwan): have these passed separately to the relevant
  1836. # provider methods (they're currently passed directly inside
  1837. # dest-env by the API service when accepting the call)
  1838. # "network_map": network_map,
  1839. # "storage_mappings": storage_mappings,
  1840. instance_minion_machines = minion_pool_allocations.get(
  1841. instance, {})
  1842. instance_source_minion = instance_minion_machines.get(
  1843. 'source_minion')
  1844. instance_target_minion = instance_minion_machines.get(
  1845. 'target_minion')
  1846. instance_osmorphing_minion = instance_minion_machines.get(
  1847. 'osmorphing_minion')
  1848. get_instance_info_task = self._create_task(
  1849. instance,
  1850. constants.TASK_TYPE_GET_INSTANCE_INFO,
  1851. execution)
  1852. validate_migration_source_inputs_task = self._create_task(
  1853. instance,
  1854. constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS,
  1855. execution)
  1856. validate_migration_destination_inputs_task = self._create_task(
  1857. instance,
  1858. constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS,
  1859. execution,
  1860. depends_on=[get_instance_info_task.id])
  1861. migration_resources_task_ids = []
  1862. validate_source_minion_task = None
  1863. deploy_migration_source_resources_task = None
  1864. migration_resources_task_deps = [
  1865. get_instance_info_task.id,
  1866. validate_migration_source_inputs_task.id]
  1867. if instance_source_minion:
  1868. migration.info[instance].update({
  1869. "source_minion_machine_id": instance_source_minion.id,
  1870. "source_minion_provider_properties": (
  1871. instance_source_minion.provider_properties),
  1872. "source_minion_connection_info": (
  1873. instance_source_minion.connection_info)})
  1874. validate_source_minion_task = self._create_task(
  1875. instance,
  1876. constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY,
  1877. execution,
  1878. depends_on=migration_resources_task_deps)
  1879. migration_resources_task_ids.append(
  1880. validate_source_minion_task.id)
  1881. else:
  1882. deploy_migration_source_resources_task = self._create_task(
  1883. instance,
  1884. constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES,
  1885. execution, depends_on=migration_resources_task_deps)
  1886. migration_resources_task_ids.append(
  1887. deploy_migration_source_resources_task.id)
  1888. create_instance_disks_task = self._create_task(
  1889. instance, constants.TASK_TYPE_CREATE_INSTANCE_DISKS,
  1890. execution, depends_on=[
  1891. validate_migration_source_inputs_task.id,
  1892. validate_migration_destination_inputs_task.id])
  1893. validate_target_minion_task = None
  1894. attach_target_minion_disks_task = None
  1895. deploy_migration_target_resources_task = None
  1896. if instance_target_minion:
  1897. migration.info[instance].update({
  1898. "target_minion_machine_id": instance_target_minion.id,
  1899. "target_minion_provider_properties": (
  1900. instance_target_minion.provider_properties),
  1901. "target_minion_connection_info": (
  1902. instance_target_minion.connection_info),
  1903. "target_minion_backup_writer_connection_info": (
  1904. instance_target_minion.backup_writer_connection_info)})
  1905. ttyp = (
  1906. constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY)
  1907. validate_target_minion_task = self._create_task(
  1908. instance, ttyp, execution, depends_on=[
  1909. validate_migration_destination_inputs_task.id])
  1910. attach_target_minion_disks_task = self._create_task(
  1911. instance,
  1912. constants.TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION,
  1913. execution, depends_on=[
  1914. validate_target_minion_task.id,
  1915. create_instance_disks_task.id])
  1916. migration_resources_task_ids.append(
  1917. attach_target_minion_disks_task.id)
  1918. else:
  1919. deploy_migration_target_resources_task = self._create_task(
  1920. instance,
  1921. constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES,
  1922. execution, depends_on=[create_instance_disks_task.id])
  1923. migration_resources_task_ids.append(
  1924. deploy_migration_target_resources_task.id)
  1925. validate_osmorphing_minion_task = None
  1926. if not skip_os_morphing and instance_osmorphing_minion:
  1927. migration.info[instance].update({
  1928. "osmorphing_minion_machine_id": instance_osmorphing_minion.id,
  1929. "osmorphing_minion_provider_properties": (
  1930. instance_osmorphing_minion.provider_properties),
  1931. "osmorphing_minion_connection_info": (
  1932. instance_osmorphing_minion.connection_info)})
  1933. validate_osmorphing_minion_task = self._create_task(
  1934. instance,
  1935. constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY,
  1936. execution, depends_on=[
  1937. validate_migration_destination_inputs_task.id])
  1938. migration_resources_task_ids.append(
  1939. validate_osmorphing_minion_task.id)
  1940. # NOTE(aznashwan): re-executing the REPLICATE_DISKS task only works
  1941. # if all the source disk snapshotting and worker setup steps are
  1942. # performed by the source plugin in REPLICATE_DISKS.
  1943. # This should no longer be a problem when worker pooling lands.
  1944. last_sync_task = None
  1945. first_sync_task = None
  1946. for i in range(migration.replication_count):
  1947. # insert SHUTDOWN_INSTANCES task before the last sync:
  1948. if i == (migration.replication_count - 1) and (
  1949. migration.shutdown_instances):
  1950. shutdown_deps = migration_resources_task_ids
  1951. if last_sync_task:
  1952. shutdown_deps = [last_sync_task.id]
  1953. last_sync_task = self._create_task(
  1954. instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
  1955. execution, depends_on=shutdown_deps)
  1956. replication_deps = migration_resources_task_ids
  1957. if last_sync_task:
  1958. replication_deps = [last_sync_task.id]
  1959. last_sync_task = self._create_task(
  1960. instance, constants.TASK_TYPE_REPLICATE_DISKS,
  1961. execution, depends_on=replication_deps)
  1962. if not first_sync_task:
  1963. first_sync_task = last_sync_task
  1964. release_source_minion_task = None
  1965. delete_source_resources_task = None
  1966. source_resource_cleanup_task = None
  1967. if instance_source_minion:
  1968. release_source_minion_task = self._create_task(
  1969. instance,
  1970. constants.TASK_TYPE_RELEASE_SOURCE_MINION,
  1971. execution,
  1972. depends_on=[
  1973. validate_source_minion_task.id,
  1974. last_sync_task.id],
  1975. on_error=True)
  1976. source_resource_cleanup_task = release_source_minion_task
  1977. else:
  1978. delete_source_resources_task = self._create_task(
  1979. instance,
  1980. constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES,
  1981. execution, depends_on=[
  1982. deploy_migration_source_resources_task.id,
  1983. last_sync_task.id],
  1984. on_error=True)
  1985. source_resource_cleanup_task = delete_source_resources_task
  1986. cleanup_source_storage_task = self._create_task(
  1987. instance, constants.TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE,
  1988. execution, depends_on=[
  1989. first_sync_task.id,
  1990. source_resource_cleanup_task.id],
  1991. on_error=True)
  1992. target_resources_cleanup_task = None
  1993. if instance_target_minion:
  1994. detach_volumes_from_target_minion_task = self._create_task(
  1995. instance,
  1996. constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION,
  1997. execution,
  1998. depends_on=[
  1999. attach_target_minion_disks_task.id,
  2000. last_sync_task.id],
  2001. on_error=True)
  2002. release_target_minion_task = self._create_task(
  2003. instance,
  2004. constants.TASK_TYPE_RELEASE_DESTINATION_MINION,
  2005. execution, depends_on=[
  2006. validate_target_minion_task.id,
  2007. detach_volumes_from_target_minion_task.id],
  2008. on_error=True)
  2009. target_resources_cleanup_task = release_target_minion_task
  2010. else:
  2011. delete_destination_resources_task = self._create_task(
  2012. instance,
  2013. constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES,
  2014. execution, depends_on=[
  2015. deploy_migration_target_resources_task.id,
  2016. last_sync_task.id],
  2017. on_error=True)
  2018. target_resources_cleanup_task = (
  2019. delete_destination_resources_task)
  2020. deploy_instance_task = self._create_task(
  2021. instance, constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES,
  2022. execution, depends_on=[
  2023. last_sync_task.id,
  2024. target_resources_cleanup_task.id])
  2025. depends_on = [deploy_instance_task.id]
  2026. osmorphing_resources_cleanup_task = None
  2027. if not skip_os_morphing:
  2028. task_deploy_os_morphing_resources = None
  2029. task_delete_os_morphing_resources = None
  2030. attach_osmorphing_minion_volumes_task = None
  2031. last_osmorphing_resources_deployment_task = None
  2032. if instance_osmorphing_minion:
  2033. osmorphing_vol_attachment_deps = [
  2034. validate_osmorphing_minion_task.id]
  2035. osmorphing_vol_attachment_deps.extend(depends_on)
  2036. attach_osmorphing_minion_volumes_task = self._create_task(
  2037. instance,
  2038. constants.TASK_TYPE_ATTACH_VOLUMES_TO_OSMORPHING_MINION,
  2039. execution, depends_on=osmorphing_vol_attachment_deps)
  2040. last_osmorphing_resources_deployment_task = (
  2041. attach_osmorphing_minion_volumes_task)
  2042. collect_osmorphing_info_task = self._create_task(
  2043. instance,
  2044. constants.TASK_TYPE_COLLECT_OSMORPHING_INFO,
  2045. execution,
  2046. depends_on=[attach_osmorphing_minion_volumes_task.id])
  2047. last_osmorphing_resources_deployment_task = (
  2048. collect_osmorphing_info_task)
  2049. else:
  2050. task_deploy_os_morphing_resources = self._create_task(
  2051. instance,
  2052. constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
  2053. execution, depends_on=depends_on)
  2054. last_osmorphing_resources_deployment_task = (
  2055. task_deploy_os_morphing_resources)
  2056. task_osmorphing = self._create_task(
  2057. instance, constants.TASK_TYPE_OS_MORPHING,
  2058. execution, depends_on=[
  2059. last_osmorphing_resources_deployment_task.id])
  2060. depends_on = [task_osmorphing.id]
  2061. if instance_osmorphing_minion:
  2062. detach_osmorphing_minion_volumes_task = self._create_task(
  2063. instance,
  2064. constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION,
  2065. execution, depends_on=[
  2066. attach_osmorphing_minion_volumes_task.id,
  2067. task_osmorphing.id],
  2068. on_error=True)
  2069. release_osmorphing_minion_task = self._create_task(
  2070. instance,
  2071. constants.TASK_TYPE_RELEASE_OSMORPHING_MINION,
  2072. execution, depends_on=[
  2073. validate_osmorphing_minion_task.id,
  2074. detach_osmorphing_minion_volumes_task.id],
  2075. on_error=True)
  2076. depends_on.append(release_osmorphing_minion_task.id)
  2077. osmorphing_resources_cleanup_task = (
  2078. release_osmorphing_minion_task)
  2079. else:
  2080. task_delete_os_morphing_resources = self._create_task(
  2081. instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
  2082. execution, depends_on=[
  2083. task_deploy_os_morphing_resources.id,
  2084. task_osmorphing.id],
  2085. on_error=True)
  2086. depends_on.append(task_delete_os_morphing_resources.id)
  2087. osmorphing_resources_cleanup_task = (
  2088. task_delete_os_morphing_resources)
  2089. if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
  2090. destination_provider_types):
  2091. get_optimal_flavor_task = self._create_task(
  2092. instance, constants.TASK_TYPE_GET_OPTIMAL_FLAVOR,
  2093. execution, depends_on=depends_on)
  2094. depends_on = [get_optimal_flavor_task.id]
  2095. finalize_deployment_task = self._create_task(
  2096. instance,
  2097. constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT,
  2098. execution, depends_on=depends_on)
  2099. cleanup_failed_deployment_task = self._create_task(
  2100. instance,
  2101. constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT,
  2102. execution, depends_on=[
  2103. deploy_instance_task.id,
  2104. finalize_deployment_task.id],
  2105. on_error_only=True)
  2106. cleanup_deps = [
  2107. create_instance_disks_task.id,
  2108. cleanup_source_storage_task.id,
  2109. target_resources_cleanup_task.id,
  2110. cleanup_failed_deployment_task.id]
  2111. if osmorphing_resources_cleanup_task:
  2112. cleanup_deps.append(osmorphing_resources_cleanup_task.id)
  2113. self._create_task(
  2114. instance, constants.TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE,
  2115. execution, depends_on=cleanup_deps,
  2116. on_error_only=True)
  2117. try:
  2118. self._check_execution_tasks_sanity(execution, migration.info)
  2119. db_api.add_migration(ctxt, migration)
  2120. LOG.info("Migration created: %s", migration.id)
  2121. self._begin_tasks(ctxt, execution, task_info=migration.info)
  2122. except Exception:
  2123. if minion_pool_allocations:
  2124. LOG.warn(
  2125. "Exception occured while verifying/registering tasks "
  2126. "execution for Migration '%s'. Cleaning up all minion "
  2127. "allocations from DB: %s" % (
  2128. migration.id, minion_pool_allocations))
  2129. self._deallocate_minion_machines_for_action(ctxt, migration)
  2130. raise
  2131. return self.get_migration(ctxt, migration.id)
  2132. def _get_migration(self, ctxt, migration_id):
  2133. migration = db_api.get_migration(ctxt, migration_id)
  2134. if not migration:
  2135. raise exception.NotFound(
  2136. "Migration with ID '%s' not found." % migration_id)
  2137. return migration
  2138. @migration_synchronized
  2139. def delete_migration(self, ctxt, migration_id):
  2140. migration = self._get_migration(ctxt, migration_id)
  2141. execution = migration.executions[0]
  2142. if execution.status in constants.ACTIVE_EXECUTION_STATUSES:
  2143. raise exception.InvalidMigrationState(
  2144. "Cannot delete Migration '%s' as it is currently in "
  2145. "'%s' state." % (migration_id, execution.status))
  2146. db_api.delete_migration(ctxt, migration_id)
  2147. @migration_synchronized
  2148. def cancel_migration(self, ctxt, migration_id, force):
  2149. migration = self._get_migration(ctxt, migration_id)
  2150. if len(migration.executions) != 1:
  2151. raise exception.InvalidMigrationState(
  2152. "Migration '%s' has in improper number of tasks "
  2153. "executions: %d" % (migration_id, len(migration.executions)))
  2154. execution = migration.executions[0]
  2155. if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
  2156. raise exception.InvalidMigrationState(
  2157. "Migration '%s' is not currently running" % migration_id)
  2158. if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
  2159. not force):
  2160. raise exception.InvalidMigrationState(
  2161. "Migration '%s' is already being cancelled. Please use the "
  2162. "force option if you'd like to force-cancel it.")
  2163. with lockutils.lock(
  2164. constants.EXECUTION_LOCK_NAME_FORMAT % execution.id,
  2165. external=True):
  2166. self._cancel_tasks_execution(ctxt, execution, force=force)
  2167. self._check_delete_reservation_for_transfer(migration)
  2168. def _cancel_tasks_execution(
  2169. self, ctxt, execution, requery=True, force=False):
  2170. """ Cancels a running Execution by:
  2171. - telling workers to kill any already running non-on-error tasks
  2172. - cancelling any non-on-error tasks which are pending
  2173. - making all on-error-only tasks as scheduled
  2174. NOTE: affects whole execution, only call this
  2175. with a lock on the Execution as a whole!
  2176. """
  2177. if requery:
  2178. execution = db_api.get_tasks_execution(ctxt, execution.id)
  2179. if execution.status == constants.EXECUTION_STATUS_RUNNING:
  2180. LOG.info(
  2181. "Cancelling tasks execution %s. Current status before "
  2182. "cancellation is '%s'", execution.id, execution.status)
  2183. # mark execution as cancelling:
  2184. self._set_tasks_execution_status(
  2185. ctxt, execution.id, constants.EXECUTION_STATUS_CANCELLING)
  2186. elif execution.status == constants.EXECUTION_STATUS_CANCELLING and (
  2187. not force):
  2188. LOG.info(
  2189. "Execution '%s' is already in CANCELLING status and no "
  2190. "force flag was provided, skipping re-cancellation.",
  2191. execution.id)
  2192. self._advance_execution_state(
  2193. ctxt, execution, requery=not requery)
  2194. return
  2195. elif execution.status in constants.FINALIZED_TASK_STATUSES:
  2196. LOG.info(
  2197. "Execution '%s' is in a finalized status '%s'. "
  2198. "Skipping re-cancellation.", execution.id, execution.status)
  2199. return
  2200. # iterate through and kill/cancel any non-error
  2201. # tasks which are running/pending:
  2202. for task in sorted(execution.tasks, key=lambda t: t.index):
  2203. # if force is provided, force-cancel tasks directly:
  2204. if force and task.status in itertools.chain(
  2205. constants.ACTIVE_TASK_STATUSES,
  2206. [constants.TASK_STATUS_FAILED_TO_CANCEL]):
  2207. LOG.warn(
  2208. "Task '%s' is in %s state, but forcibly setting to "
  2209. "'%s' as part of cancellation of execution '%s' "
  2210. "because the 'force' flag was provided.",
  2211. task.id, task.status,
  2212. constants.TASK_STATUS_FORCE_CANCELED,
  2213. execution.id)
  2214. db_api.set_task_status(
  2215. ctxt, task.id, constants.TASK_STATUS_FORCE_CANCELED,
  2216. exception_details=(
  2217. "This task was force-canceled at user request. "
  2218. "Its state prior to its cancellation was '%s'. "
  2219. "Its error details prior to its cancellation "
  2220. "were: %s" % (
  2221. task.status, task.exception_details)))
  2222. continue
  2223. if task.status in (
  2224. constants.TASK_STATUS_PENDING,
  2225. constants.TASK_STATUS_STARTING):
  2226. # any PENDING/STARTING tasks means that they did not have a
  2227. # host assigned to them yet, and presuming the host does not
  2228. # start executing the task until it marks itself as the runner,
  2229. # we can just mark the task as cancelled:
  2230. LOG.debug(
  2231. "Setting currently '%s' task '%s' to '%s' as part of the "
  2232. "cancellation of execution '%s'",
  2233. task.status, task.id,
  2234. constants.TASK_STATUS_UNSCHEDULED, execution.id)
  2235. db_api.set_task_status(
  2236. ctxt, task.id, constants.TASK_STATUS_UNSCHEDULED,
  2237. exception_details=(
  2238. "This task was already pending execution but was "
  2239. "unscheduled during the cancellation of the parent "
  2240. "tasks execution."))
  2241. elif task.status in (
  2242. constants.TASK_STATUS_RUNNING,
  2243. constants.TASK_STATUS_FAILED_TO_CANCEL):
  2244. # cancel any currently running non-error tasks:
  2245. if not task.on_error:
  2246. LOG.debug(
  2247. "Sending cancellation request for %s non-error task "
  2248. "'%s' as part of cancellation of execution '%s'",
  2249. task.status, task.id, execution.id)
  2250. db_api.set_task_status(
  2251. ctxt, task.id, constants.TASK_STATUS_CANCELLING)
  2252. try:
  2253. worker_rpc = self._get_worker_rpc_for_host(
  2254. # NOTE: we intetionally lowball the timeout for the
  2255. # cancellation call to prevent the conductor from
  2256. # hanging an excessive amount of time:
  2257. task.host, timeout=10)
  2258. worker_rpc.cancel_task(
  2259. ctxt, task.id, task.process_id, force)
  2260. except (Exception, KeyboardInterrupt):
  2261. msg = (
  2262. "Failed to send cancellation request for task '%s'"
  2263. "to worker host '%s' as part of cancellation of "
  2264. "execution '%s'. Marking task as '%s' for now and "
  2265. "awaiting any eventual worker replies later." % (
  2266. task.id, task.host, execution.id,
  2267. constants.TASK_STATUS_FAILED_TO_CANCEL))
  2268. LOG.error(
  2269. "%s. Exception was: %s", msg,
  2270. utils.get_exception_details())
  2271. db_api.set_task_status(
  2272. ctxt, task.id,
  2273. constants.TASK_STATUS_FAILED_TO_CANCEL,
  2274. exception_details=msg)
  2275. # let any on-error tasks run to completion but mark
  2276. # them as CANCELLING_AFTER_COMPLETION so they will
  2277. # be marked as cancelled once they are completed:
  2278. else:
  2279. LOG.debug(
  2280. "Marking %s on-error task %s as %s as part of "
  2281. "cancellation of execution %s",
  2282. task.status, task.id,
  2283. constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION,
  2284. execution.id)
  2285. db_api.set_task_status(
  2286. ctxt, task.id,
  2287. constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION,
  2288. exception_details=(
  2289. "Task will be marked as cancelled after completion"
  2290. " as it is a cleanup task."))
  2291. elif task.status == constants.TASK_STATUS_ON_ERROR_ONLY:
  2292. # mark all on-error-only tasks as scheduled:
  2293. LOG.debug(
  2294. "Marking on-error-only task '%s' as scheduled following "
  2295. "cancellation of execution '%s'",
  2296. task.id, execution.id)
  2297. db_api.set_task_status(
  2298. ctxt, task.id, constants.TASK_STATUS_SCHEDULED)
  2299. else:
  2300. LOG.debug(
  2301. "No action currently taken with respect to task '%s' "
  2302. "(status '%s', on_error=%s) during cancellation of "
  2303. "execution '%s'",
  2304. task.id, task.status, task.on_error, execution.id)
  2305. started_tasks = self._advance_execution_state(
  2306. ctxt, execution, requery=True)
  2307. if started_tasks:
  2308. LOG.info(
  2309. "The following tasks were started after state advancement "
  2310. "of execution '%s' after cancellation request: %s",
  2311. execution.id, started_tasks)
  2312. else:
  2313. LOG.debug(
  2314. "No new tasks were started for execution '%s' following "
  2315. "state advancement after cancellation.", execution.id)
  2316. def _set_tasks_execution_status(self, ctxt, execution_id, execution_status):
  2317. execution = db_api.set_execution_status(
  2318. ctxt, execution_id, execution_status)
  2319. LOG.info(
  2320. "Tasks execution %(id)s (action %(action)s) status updated "
  2321. "to: %(status)s",
  2322. {"id": execution_id, "status": execution_status,
  2323. "action": execution.action_id})
  2324. if execution.type in constants.MINION_POOL_EXECUTION_TYPES:
  2325. self._update_minion_pool_status_for_finished_execution(
  2326. ctxt, execution, execution_status)
  2327. else:
  2328. if execution_status in constants.FINALIZED_EXECUTION_STATUSES:
  2329. LOG.debug(
  2330. "Attempting to deallocate minion machines for finalized "
  2331. "Execution '%s' of type '%s' (action '%s') following "
  2332. "its transition into finalized status '%s'",
  2333. execution.id, execution.type, execution.action_id,
  2334. execution_status)
  2335. action = db_api.get_action(ctxt, execution.action_id)
  2336. self._deallocate_minion_machines_for_action(
  2337. ctxt, action)
  2338. if ctxt.delete_trust_id:
  2339. LOG.debug(
  2340. "Deleting Keystone trust following status change "
  2341. "for Execution '%s' (action '%s') to '%s'",
  2342. execution_id, execution.action_id, execution_status)
  2343. keystone.delete_trust(ctxt)
  2344. else:
  2345. LOG.debug(
  2346. "Not deallocating minion machines for Execution '%s' "
  2347. "of type '%s' (action '%s') yet as it is still in an "
  2348. "active Execution status (%s)",
  2349. execution.id, execution.type, execution.action_id,
  2350. execution_status)
  2351. @staticmethod
  2352. def _update_minion_pool_status_for_finished_execution(
  2353. ctxt, execution, new_execution_status):
  2354. # status map if execution is active:
  2355. stat_map = {
  2356. constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
  2357. constants.MINION_POOL_STATUS_ALLOCATING,
  2358. constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
  2359. constants.MINION_POOL_STATUS_DEALLOCATING,
  2360. constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
  2361. constants.MINION_POOL_STATUS_INITIALIZING,
  2362. constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
  2363. constants.MINION_POOL_STATUS_UNINITIALIZING}
  2364. if new_execution_status == constants.EXECUTION_STATUS_COMPLETED:
  2365. stat_map = {
  2366. constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
  2367. constants.MINION_POOL_STATUS_ALLOCATED,
  2368. constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
  2369. constants.MINION_POOL_STATUS_DEALLOCATED,
  2370. constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
  2371. constants.MINION_POOL_STATUS_DEALLOCATED,
  2372. constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
  2373. constants.MINION_POOL_STATUS_UNINITIALIZED}
  2374. elif new_execution_status in constants.FINALIZED_TASK_STATUSES:
  2375. stat_map = {
  2376. constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
  2377. constants.MINION_POOL_STATUS_DEALLOCATED,
  2378. constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
  2379. constants.MINION_POOL_STATUS_ALLOCATED,
  2380. constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
  2381. constants.MINION_POOL_STATUS_UNINITIALIZED,
  2382. constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
  2383. constants.MINION_POOL_STATUS_UNINITIALIZED}
  2384. final_pool_status = stat_map.get(execution.type)
  2385. if not final_pool_status:
  2386. LOG.error(
  2387. "Could not determine pool status following transition of "
  2388. "execution '%s' (type '%s') to status '%s'. Presuming error "
  2389. "has occured. Marking piil as error'd.",
  2390. execution.id, execution.type, new_execution_status)
  2391. final_pool_status = constants.MINION_POOL_STATUS_ERROR
  2392. LOG.info(
  2393. "Marking minion pool '%s' status as '%s' in the DB following the "
  2394. "transition of execution '%s' (type '%s') to status '%s'.",
  2395. execution.action_id, final_pool_status, execution.id,
  2396. execution.type, new_execution_status)
  2397. db_api.set_minion_pool_lifecycle_status(
  2398. ctxt, execution.action_id, final_pool_status)
  2399. @parent_tasks_execution_synchronized
  2400. def set_task_host(self, ctxt, task_id, host):
  2401. """ Saves the ID of the worker host which has accepted
  2402. the task to the DB and marks the task as STARTING. """
  2403. task = db_api.get_task(ctxt, task_id)
  2404. new_status = constants.TASK_STATUS_STARTING
  2405. exception_details = None
  2406. if task.status == constants.TASK_STATUS_CANCELLING:
  2407. raise exception.TaskIsCancelling(task_id=task_id)
  2408. elif task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
  2409. if not task.on_error:
  2410. LOG.warn(
  2411. "Non-error task '%s' was in '%s' status although it should"
  2412. " not have been. Setting a task host for it anyway.",
  2413. task.id, task.status)
  2414. LOG.debug(
  2415. "Task '%s' is in %s status, so it will be allowed to "
  2416. "have a host set for it and run to completion.",
  2417. task.id, task.status)
  2418. new_status = constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION
  2419. exception_details = (
  2420. "This is a cleanup task so it will be allowed to run to "
  2421. "completion despite user-cancellation.")
  2422. elif task.status != constants.TASK_STATUS_PENDING:
  2423. raise exception.InvalidTaskState(
  2424. "Task with ID '%s' is in '%s' status instead of the "
  2425. "expected '%s' required for it to have a task host set." % (
  2426. task_id, task.status, constants.TASK_STATUS_PENDING))
  2427. LOG.info(
  2428. "Setting host for task with ID '%s' to '%s'", task_id, host)
  2429. db_api.set_task_host_properties(ctxt, task_id, host=host)
  2430. db_api.set_task_status(
  2431. ctxt, task_id, new_status,
  2432. exception_details=exception_details)
  2433. LOG.info(
  2434. "Successfully set host for task with ID '%s' to '%s'",
  2435. task_id, host)
  2436. @parent_tasks_execution_synchronized
  2437. def set_task_process(self, ctxt, task_id, process_id):
  2438. """ Sets the ID of the Worker-side process for the given task,
  2439. and marks the task as actually 'RUNNING'. """
  2440. task = db_api.get_task(ctxt, task_id)
  2441. if not task.host:
  2442. raise exception.InvalidTaskState(
  2443. "Task with ID '%s' (current status '%s') has no host set "
  2444. "for it. Cannot set host process." % (
  2445. task_id, task.status))
  2446. acceptable_statuses = [
  2447. constants.TASK_STATUS_STARTING,
  2448. constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION]
  2449. if task.status not in acceptable_statuses:
  2450. raise exception.InvalidTaskState(
  2451. "Task with ID '%s' is in '%s' status instead of the "
  2452. "expected statuses (%s) required for it to have a task "
  2453. "process set." % (
  2454. task_id, task.status, acceptable_statuses))
  2455. LOG.info(
  2456. "Setting process '%s' (host %s) for task '%s' and transitioning "
  2457. "it from status '%s' to '%s'", process_id, task.host, task_id,
  2458. task.status, constants.TASK_STATUS_RUNNING)
  2459. db_api.set_task_host_properties(ctxt, task_id, process_id=process_id)
  2460. db_api.set_task_status(ctxt, task_id, constants.TASK_STATUS_RUNNING)
  2461. LOG.info(
  2462. "Successfully set task process for task with ID '%s' to '%s'",
  2463. task_id, process_id)
  2464. def _check_clean_execution_deadlock(
  2465. self, ctxt, execution, task_statuses=None, requery=True):
  2466. """ Checks whether an execution is deadlocked.
  2467. Deadlocked executions have no currently running/pending tasks
  2468. but some remaining scheduled tasks.
  2469. If this occurs, all pending/error-only tasks are marked
  2470. as DEADLOCKED, and the execution is marked as such too.
  2471. Returns the state of the execution when the check occured
  2472. (either RUNNING or DEADLOCKED)
  2473. """
  2474. if requery:
  2475. execution = db_api.get_tasks_execution(ctxt, execution.id)
  2476. if not task_statuses:
  2477. task_statuses = {}
  2478. for task in execution.tasks:
  2479. task_statuses[task.id] = task.status
  2480. determined_state = constants.EXECUTION_STATUS_RUNNING
  2481. status_vals = task_statuses.values()
  2482. if constants.TASK_STATUS_SCHEDULED in status_vals and not (
  2483. any([stat in status_vals
  2484. for stat in constants.ACTIVE_TASK_STATUSES])):
  2485. LOG.warn(
  2486. "Execution '%s' is deadlocked. Cleaning up now. "
  2487. "Task statuses are: %s",
  2488. execution.id, task_statuses)
  2489. for task_id, stat in task_statuses.items():
  2490. if stat in (
  2491. constants.TASK_STATUS_SCHEDULED,
  2492. constants.TASK_STATUS_ON_ERROR_ONLY):
  2493. LOG.warn(
  2494. "Marking deadlocked task '%s' as that (current "
  2495. "state: %s)", task_id, stat)
  2496. db_api.set_task_status(
  2497. ctxt, task_id,
  2498. constants.TASK_STATUS_CANCELED_FROM_DEADLOCK,
  2499. exception_details=TASK_DEADLOCK_ERROR_MESSAGE)
  2500. LOG.warn(
  2501. "Marking deadlocked execution '%s' as DEADLOCKED", execution.id)
  2502. self._set_tasks_execution_status(
  2503. ctxt, execution.id, constants.EXECUTION_STATUS_DEADLOCKED)
  2504. LOG.error(
  2505. "Execution '%s' is deadlocked. Cleanup has been performed. "
  2506. "Task statuses at time of deadlock were: %s",
  2507. execution.id, task_statuses)
  2508. determined_state = constants.EXECUTION_STATUS_DEADLOCKED
  2509. return determined_state
  2510. def _get_execution_status(self, ctxt, execution, requery=False):
  2511. """ Returns the global status of an execution.
  2512. RUNNING - at least one task is RUNNING, STARTING, PENDING or CANCELLING
  2513. COMPLETED - all non-error-only tasks are COMPLETED
  2514. CANCELED - no more RUNNING/PENDING/SCHEDULED tasks but some CANCELED
  2515. CANCELIING - at least one task in CANCELLING status
  2516. ERROR - not RUNNING and at least one is ERROR'd
  2517. DEADLOCKED - has SCHEDULED tasks but none RUNNING/PENDING/CANCELLING
  2518. """
  2519. is_running = False
  2520. is_canceled = False
  2521. is_cancelling = False
  2522. is_errord = False
  2523. has_scheduled_tasks = False
  2524. task_stat_map = {}
  2525. if requery:
  2526. execution = db_api.get_tasks_execution(ctxt, execution.id)
  2527. for task in execution.tasks:
  2528. task_stat_map[task.id] = task.status
  2529. if task.status in constants.ACTIVE_TASK_STATUSES:
  2530. is_running = True
  2531. if task.status in constants.CANCELED_TASK_STATUSES:
  2532. is_canceled = True
  2533. if task.status in (
  2534. constants.TASK_STATUS_ERROR,
  2535. constants.TASK_STATUS_FAILED_TO_SCHEDULE):
  2536. is_errord = True
  2537. if task.status in (
  2538. constants.TASK_STATUS_CANCELLING,
  2539. constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION):
  2540. is_cancelling = True
  2541. if task.status == constants.TASK_STATUS_SCHEDULED:
  2542. has_scheduled_tasks = True
  2543. status = constants.EXECUTION_STATUS_COMPLETED
  2544. if has_scheduled_tasks and not is_running:
  2545. status = constants.EXECUTION_STATUS_DEADLOCKED
  2546. elif is_cancelling:
  2547. status = constants.EXECUTION_STATUS_CANCELLING
  2548. elif is_running:
  2549. if is_canceled:
  2550. # means that a cancel was issued but some cleanup tasks are
  2551. # currently being run, so the final status is CANCELLING:
  2552. status = constants.EXECUTION_STATUS_CANCELLING
  2553. else:
  2554. status = constants.EXECUTION_STATUS_RUNNING
  2555. elif is_errord:
  2556. status = constants.EXECUTION_STATUS_ERROR
  2557. # NOTE: user-canceled executions should never have ERROR'd tasks
  2558. # (they should also be marked as CANCELED) so this comes last:
  2559. elif is_canceled:
  2560. status = constants.EXECUTION_STATUS_CANCELED
  2561. LOG.debug(
  2562. "Overall status for Execution '%s' determined to be '%s'."
  2563. "Task statuses at time of decision: %s",
  2564. execution.id, status, task_stat_map)
  2565. return status
  2566. def _advance_execution_state(
  2567. self, ctxt, execution, requery=True, instance=None):
  2568. """ Advances the state of the execution by starting/refreshing
  2569. the state of all child tasks.
  2570. If the execution has finalized (either completed or error'd),
  2571. updates its state to the finalized one.
  2572. Returns a list of all the tasks which were started.
  2573. NOTE: should only be called with a lock on the Execution!
  2574. Requirements for a task to be started:
  2575. - any SCHEDULED task with no deps will be instantly started
  2576. - any task where all parent tasks got UNSCHEDULED will
  2577. also be UNSCHEDULED
  2578. - normal tasks (task.on_error==False & task.status==SCHEDULED):
  2579. * started if all parent dependency tasks have been COMPLETED
  2580. * instantly unscheduled if all parents finalized but some
  2581. didn't complete successfuly.
  2582. - on-error tasks (task.on_error==True & task.status==SCHEDULED):
  2583. * all parent tasks (including on-error parents) must have
  2584. reached a terminal state
  2585. * at least one non-error parent task must have been COMPLETED
  2586. """
  2587. if requery:
  2588. execution = db_api.get_tasks_execution(ctxt, execution.id)
  2589. if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
  2590. LOG.warn(
  2591. "Execution state advancement called on Execution '%s' which "
  2592. "is not in an active status in the DB (it's currently '%s'). "
  2593. "Double-checking for deadlock and returning early.",
  2594. execution.id, execution.status)
  2595. if self._check_clean_execution_deadlock(
  2596. ctxt, execution, task_statuses=None,
  2597. requery=not requery) == (
  2598. constants.EXECUTION_STATUS_DEADLOCKED):
  2599. LOG.error(
  2600. "Execution '%s' deadlocked even before Replica state "
  2601. "advancement . Cleanup has been perfomed. Returning.",
  2602. execution.id)
  2603. return []
  2604. tasks_to_process = execution.tasks
  2605. if instance:
  2606. tasks_to_process = [
  2607. task for task in execution.tasks
  2608. if task.instance == instance]
  2609. if not tasks_to_process:
  2610. raise exception.InvalidActionTasksExecutionState(
  2611. "State advancement requested for execution '%s' for "
  2612. "instance '%s', which has no tasks defined for it." % (
  2613. execution.id, instance))
  2614. LOG.debug(
  2615. "State of execution '%s' before state advancement is: %s",
  2616. execution.id, execution.status)
  2617. origin = self._get_task_origin(ctxt, execution.action)
  2618. destination = self._get_task_destination(ctxt, execution.action)
  2619. action = db_api.get_action(ctxt, execution.action_id)
  2620. origin_endpoint = db_api.get_endpoint(
  2621. ctxt, execution.action.origin_endpoint_id)
  2622. destination_endpoint = db_api.get_endpoint(
  2623. ctxt, execution.action.destination_endpoint_id)
  2624. started_tasks = []
  2625. def _start_task(task):
  2626. task_info = None
  2627. if task.instance not in action.info:
  2628. LOG.error(
  2629. "No info present for instance '%s' in action '%s' for task"
  2630. " '%s' (type '%s') of execution '%s' (type '%s'). "
  2631. "Defaulting to empty dict." % (
  2632. task.instance, action.id, task.id, task.task_type,
  2633. execution.id, execution.type))
  2634. task_info = {}
  2635. else:
  2636. task_info = action.info[task.instance]
  2637. db_api.set_task_status(
  2638. ctxt, task.id, constants.TASK_STATUS_PENDING)
  2639. try:
  2640. worker_rpc = self._get_worker_service_rpc_for_task(
  2641. ctxt, task, origin_endpoint, destination_endpoint)
  2642. worker_rpc.begin_task(
  2643. ctxt,
  2644. task_id=task.id,
  2645. task_type=task.task_type,
  2646. origin=origin,
  2647. destination=destination,
  2648. instance=task.instance,
  2649. task_info=task_info)
  2650. LOG.debug(
  2651. "Successfully started task with ID '%s' (type '%s') "
  2652. "for execution '%s'", task.id, task.task_type,
  2653. execution.id)
  2654. started_tasks.append(task.id)
  2655. return constants.TASK_STATUS_PENDING
  2656. except Exception as ex:
  2657. LOG.warn(
  2658. "Error occured while starting new task '%s'. "
  2659. "Cancelling execution '%s'. Error was: %s",
  2660. task.id, execution.id, utils.get_exception_details())
  2661. self._cancel_tasks_execution(
  2662. ctxt, execution, requery=True)
  2663. raise
  2664. # aggregate all tasks and statuses:
  2665. task_statuses = {}
  2666. task_deps = {}
  2667. on_error_tasks = []
  2668. for task in execution.tasks:
  2669. task_statuses[task.id] = task.status
  2670. if task.depends_on:
  2671. task_deps[task.id] = task.depends_on
  2672. else:
  2673. task_deps[task.id] = []
  2674. if task.on_error:
  2675. on_error_tasks.append(task.id)
  2676. LOG.debug(
  2677. "All task statuses before execution '%s' lifecycle iteration "
  2678. "(for tasks of instance '%s'): %s",
  2679. execution.id, instance, task_statuses)
  2680. # NOTE: the tasks are saved in a random order in the DB, which
  2681. # complicates the processing logic so we just pre-sort:
  2682. for task in sorted(tasks_to_process, key=lambda t: t.index):
  2683. if task_statuses[task.id] == constants.TASK_STATUS_SCHEDULED:
  2684. # immediately start depency-less tasks (on-error or otherwise)
  2685. if not task_deps[task.id]:
  2686. LOG.info(
  2687. "Starting depency-less task '%s'", task.id)
  2688. task_statuses[task.id] = _start_task(task)
  2689. continue
  2690. parent_task_statuses = {
  2691. dep_id: task_statuses[dep_id]
  2692. for dep_id in task_deps[task.id]}
  2693. # immediately unschedule tasks (on-error or otherwise)
  2694. # if all of their parent tasks got un-scheduled:
  2695. if task_deps[task.id] and all([
  2696. dep_stat == constants.TASK_STATUS_UNSCHEDULED
  2697. for dep_stat in parent_task_statuses.values()]):
  2698. LOG.info(
  2699. "Unscheduling task '%s' as all parent "
  2700. "tasks got unscheduled: %s",
  2701. task.id, parent_task_statuses)
  2702. db_api.set_task_status(
  2703. ctxt, task.id, constants.TASK_STATUS_UNSCHEDULED,
  2704. exception_details=(
  2705. "Unscheduled due to the unscheduling of all "
  2706. "parent tasks."))
  2707. task_statuses[task.id] = constants.TASK_STATUS_UNSCHEDULED
  2708. continue
  2709. # check all parents have finalized:
  2710. if all([
  2711. dep_stat in constants.FINALIZED_TASK_STATUSES
  2712. for dep_stat in parent_task_statuses.values()]):
  2713. # handle non-error tasks:
  2714. if task.id not in on_error_tasks:
  2715. # start non-error tasks whose parents have
  2716. # all completed successfully:
  2717. if all([
  2718. dep_stat == constants.TASK_STATUS_COMPLETED
  2719. for dep_stat in (
  2720. parent_task_statuses.values())]):
  2721. LOG.info(
  2722. "Starting task '%s' as all dependencies have "
  2723. "completed successfully: %s",
  2724. task.id, parent_task_statuses)
  2725. task_statuses[task.id] = _start_task(task)
  2726. else:
  2727. # it means one/more parents error'd/unscheduled
  2728. # so we mark this task as unscheduled:
  2729. LOG.info(
  2730. "Unscheduling plain task '%s' as not all "
  2731. "parent tasks completed successfully: %s",
  2732. task.id, parent_task_statuses)
  2733. db_api.set_task_status(
  2734. ctxt, task.id,
  2735. constants.TASK_STATUS_UNSCHEDULED,
  2736. exception_details=(
  2737. "Unscheduled due to some parent tasks not "
  2738. "having completed successfully."))
  2739. task_statuses[task.id] = (
  2740. constants.TASK_STATUS_UNSCHEDULED)
  2741. # handle on-error tasks:
  2742. else:
  2743. non_error_parents = {
  2744. dep_id: task_statuses[dep_id]
  2745. for dep_id in parent_task_statuses.keys()
  2746. if dep_id not in on_error_tasks}
  2747. # start on-error tasks only if at least one non-error
  2748. # parent task has completed successfully:
  2749. if constants.TASK_STATUS_COMPLETED in (
  2750. non_error_parents.values()):
  2751. LOG.info(
  2752. "Starting on-error task '%s' as all parent "
  2753. "tasks have been finalized and at least one "
  2754. "non-error parent (%s) was completed: %s",
  2755. task.id, list(non_error_parents.keys()),
  2756. parent_task_statuses)
  2757. task_statuses[task.id] = _start_task(task)
  2758. else:
  2759. LOG.info(
  2760. "Unscheduling on-error task '%s' as none of "
  2761. "its parent non-error tasks (%s) have "
  2762. "completed successfully: %s",
  2763. task.id, list(non_error_parents.keys()),
  2764. parent_task_statuses)
  2765. db_api.set_task_status(
  2766. ctxt, task.id,
  2767. constants.TASK_STATUS_UNSCHEDULED,
  2768. exception_details=(
  2769. "Unscheduled due to no non-error parent "
  2770. "tasks having completed successfully."))
  2771. task_statuses[task.id] = (
  2772. constants.TASK_STATUS_UNSCHEDULED)
  2773. else:
  2774. LOG.debug(
  2775. "No lifecycle decision was taken with respect to task "
  2776. "%s of execution %s as not all parent tasks have "
  2777. "reached a terminal state: %s",
  2778. task.id, execution.id, parent_task_statuses)
  2779. else:
  2780. LOG.debug(
  2781. "No lifecycle decision to make for task '%s' of execution "
  2782. "'%s' as it is not in a position to be scheduled: %s",
  2783. task.id, execution.id, task_statuses[task.id])
  2784. if started_tasks:
  2785. LOG.debug(
  2786. "Started the following tasks for execution '%s': %s",
  2787. execution.id, started_tasks)
  2788. else:
  2789. # check for deadlock:
  2790. if self._check_clean_execution_deadlock(
  2791. ctxt, execution, task_statuses=task_statuses) == (
  2792. constants.EXECUTION_STATUS_DEADLOCKED):
  2793. LOG.error(
  2794. "Execution '%s' deadlocked after Replica state advancement"
  2795. ". Cleanup has been perfomed. Returning early.",
  2796. execution.id)
  2797. return []
  2798. LOG.debug(
  2799. "No new tasks were started for execution '%s'", execution.id)
  2800. # check if execution status has changed:
  2801. latest_execution_status = self._get_execution_status(
  2802. ctxt, execution, requery=True)
  2803. if latest_execution_status != execution.status:
  2804. LOG.info(
  2805. "Execution '%s' transitioned from status %s to %s "
  2806. "following the updated task statuses: %s",
  2807. execution.id, execution.status,
  2808. latest_execution_status, task_statuses)
  2809. self._set_tasks_execution_status(
  2810. ctxt, execution.id, latest_execution_status)
  2811. else:
  2812. LOG.debug(
  2813. "Execution '%s' has remained in status '%s' following "
  2814. "state advancement. task statuses are: %s",
  2815. execution.id, latest_execution_status, task_statuses)
  2816. return started_tasks
  2817. def _update_replica_volumes_info(self, ctxt, replica_id, instance,
  2818. updated_task_info):
  2819. """ WARN: the lock for the Replica must be pre-acquired. """
  2820. db_api.update_transfer_action_info_for_instance(
  2821. ctxt, replica_id, instance,
  2822. updated_task_info)
  2823. def _update_volumes_info_for_migration_parent_replica(
  2824. self, ctxt, migration_id, instance, updated_task_info):
  2825. migration = db_api.get_migration(ctxt, migration_id)
  2826. replica_id = migration.replica_id
  2827. with lockutils.lock(
  2828. constants.REPLICA_LOCK_NAME_FORMAT % replica_id,
  2829. external=True):
  2830. LOG.debug(
  2831. "Updating volume_info in replica due to snapshot "
  2832. "restore during migration. replica id: %s", replica_id)
  2833. self._update_replica_volumes_info(
  2834. ctxt, replica_id, instance, updated_task_info)
  2835. def _handle_post_task_actions(self, ctxt, task, execution, task_info):
  2836. task_type = task.task_type
  2837. def _check_other_tasks_running(execution, current_task):
  2838. still_running = False
  2839. for other_task in execution.tasks:
  2840. if other_task.id == current_task.id:
  2841. continue
  2842. if other_task.status in constants.ACTIVE_TASK_STATUSES:
  2843. still_running = True
  2844. break
  2845. return still_running
  2846. if task_type == constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
  2847. # When restoring a snapshot in some import providers (OpenStack),
  2848. # a new volume_id is generated. This needs to be updated in the
  2849. # Replica instance as well.
  2850. volumes_info = task_info.get('volumes_info')
  2851. if not volumes_info:
  2852. LOG.warn(
  2853. "No volumes_info was provided by task '%s' (type '%s') "
  2854. "after completion. NOT updating parent action '%s'",
  2855. task.id, task_type, execution.action_id)
  2856. else:
  2857. LOG.debug(
  2858. "Updating volumes_info for instance '%s' in parent action "
  2859. "'%s' following completion of task '%s' (type '%s'): %s",
  2860. task.instance, execution.action_id, task.id, task_type,
  2861. utils.sanitize_task_info(
  2862. {'volumes_info': volumes_info}))
  2863. self._update_volumes_info_for_migration_parent_replica(
  2864. ctxt, execution.action_id, task.instance,
  2865. {"volumes_info": volumes_info})
  2866. elif task_type == (
  2867. constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS):
  2868. if not task_info.get("clone_disks"):
  2869. # The migration completed. If the replica is executed again,
  2870. # new volumes need to be deployed in place of the migrated
  2871. # ones.
  2872. LOG.info(
  2873. "Unsetting 'volumes_info' for instance '%s' in Replica "
  2874. "'%s' after completion of Replica task '%s' (type '%s') "
  2875. "with clone_disks=False.",
  2876. task.instance, execution.action_id, task.id,
  2877. task_type)
  2878. self._update_volumes_info_for_migration_parent_replica(
  2879. ctxt, execution.action_id, task.instance,
  2880. {"volumes_info": []})
  2881. elif task_type in (
  2882. constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT,
  2883. constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT):
  2884. # set 'transfer_result' in the 'base_transfer_action'
  2885. # table if the task returned a result.
  2886. if "transfer_result" in task_info:
  2887. transfer_result = task_info.get("transfer_result")
  2888. try:
  2889. schemas.validate_value(
  2890. transfer_result,
  2891. schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
  2892. LOG.debug(
  2893. "Setting result for transfer action '%s': %s",
  2894. execution.action_id, transfer_result)
  2895. db_api.set_transfer_action_result(
  2896. ctxt, execution.action_id, task.instance,
  2897. transfer_result)
  2898. except exception.SchemaValidationException:
  2899. LOG.warn(
  2900. "Could not validate transfer result '%s' against the "
  2901. "VM export info schema. NOT saving value in Database. "
  2902. "Exception details: %s",
  2903. transfer_result, utils.get_exception_details())
  2904. else:
  2905. LOG.debug(
  2906. "No 'transfer_result' was returned for task type '%s' "
  2907. "for transfer action '%s'", task_type, execution.action_id)
  2908. elif task_type in (
  2909. constants.TASK_TYPE_UPDATE_SOURCE_REPLICA,
  2910. constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA):
  2911. # NOTE: remember to update the `volumes_info`:
  2912. # NOTE: considering this method is only called with a lock on the
  2913. # `execution.action_id` (in a Replica update tasks' case that's the
  2914. # ID of the Replica itself) we can safely call
  2915. # `_update_replica_volumes_info` below:
  2916. self._update_replica_volumes_info(
  2917. ctxt, execution.action_id, task.instance,
  2918. {"volumes_info": task_info.get("volumes_info", [])})
  2919. if task_type == constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
  2920. # check if this was the last task in the update execution:
  2921. still_running = _check_other_tasks_running(execution, task)
  2922. if not still_running:
  2923. # it means this was the last update task in the Execution
  2924. # and we may safely update the params of the Replica
  2925. # as they are in the DB:
  2926. LOG.info(
  2927. "All tasks of the '%s' Replica update procedure have "
  2928. "completed successfully. Setting the updated parameter "
  2929. "values on the parent Replica itself.",
  2930. execution.action_id)
  2931. # NOTE: considering all the instances of the Replica get
  2932. # the same params, it doesn't matter which instance's
  2933. # update task finishes last:
  2934. db_api.update_replica(
  2935. ctxt, execution.action_id, task_info)
  2936. elif task_type in (
  2937. constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES,
  2938. constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES):
  2939. still_running = _check_other_tasks_running(execution, task)
  2940. if not still_running:
  2941. LOG.info(
  2942. "Updating 'pool_shared_resources' for pool %s after "
  2943. "completion of task '%s' (type '%s').",
  2944. execution.action_id, task.id, task_type)
  2945. db_api.update_minion_pool_lifecycle(
  2946. ctxt, execution.action_id, {
  2947. "pool_shared_resources": task_info.get(
  2948. "pool_shared_resources", {})})
  2949. elif task_type in (
  2950. constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES,
  2951. constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES):
  2952. still_running = _check_other_tasks_running(execution, task)
  2953. if not still_running:
  2954. LOG.info(
  2955. "Clearing 'pool_shared_resources' for pool %s following "
  2956. "completion of task '%s' (type %s)",
  2957. execution.action_id, task.id, task_type)
  2958. db_api.update_minion_pool_lifecycle(
  2959. ctxt, execution.action_id, {
  2960. "pool_shared_resources": {}})
  2961. elif task_type in (
  2962. constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE,
  2963. constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE):
  2964. LOG.info(
  2965. "Adding DB entry for Minion Machine '%s' of pool %s "
  2966. "following completion of task '%s' (type %s).",
  2967. task.instance, execution.action_id, task.id, task_type)
  2968. minion_machine = models.MinionMachine()
  2969. minion_machine.id = task.instance
  2970. minion_machine.pool_id = execution.action_id
  2971. minion_machine.status = (
  2972. constants.MINION_MACHINE_STATUS_AVAILABLE)
  2973. minion_machine.connection_info = task_info[
  2974. "minion_connection_info"]
  2975. minion_machine.provider_properties = task_info[
  2976. "minion_provider_properties"]
  2977. minion_machine.backup_writer_connection_info = task_info[
  2978. "minion_backup_writer_connection_info"]
  2979. db_api.add_minion_machine(ctxt, minion_machine)
  2980. elif task_type in (
  2981. constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE,
  2982. constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE):
  2983. LOG.info(
  2984. "%s task for Minon Machine '%s' has completed successfully. "
  2985. "Deleting minion machine from DB.",
  2986. task_type, task.instance)
  2987. db_api.delete_minion_machine(ctxt, task.instance)
  2988. elif task_type in (
  2989. constants.TASK_TYPE_ATTACH_VOLUMES_TO_SOURCE_MINION,
  2990. constants.TASK_TYPE_DETACH_VOLUMES_FROM_SOURCE_MINION):
  2991. updated_values = {
  2992. "provider_properties": task_info[
  2993. "source_minion_provider_properties"]}
  2994. LOG.debug(
  2995. "Updating minion provider properties of minion machine '%s' "
  2996. "following the completion of task '%s' (type '%s') to %s",
  2997. task_info['source_minion_machine_id'],
  2998. task.id, task_type, updated_values)
  2999. db_api.update_minion_machine(
  3000. ctxt, task_info['source_minion_machine_id'], updated_values)
  3001. elif task_type in (
  3002. constants.TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION,
  3003. constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION):
  3004. updated_values = {
  3005. "provider_properties": task_info[
  3006. "target_minion_provider_properties"]}
  3007. LOG.debug(
  3008. "Updating minion provider properties of minion machine '%s' "
  3009. "following the completion of task '%s' (type '%s') to %s",
  3010. task_info['target_minion_machine_id'],
  3011. task.id, task_type, updated_values)
  3012. db_api.update_minion_machine(
  3013. ctxt, task_info['target_minion_machine_id'], updated_values)
  3014. elif task_type in (
  3015. constants.TASK_TYPE_ATTACH_VOLUMES_TO_OSMORPHING_MINION,
  3016. constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION):
  3017. updated_values = {
  3018. "provider_properties": task_info[
  3019. "osmorphing_minion_provider_properties"]}
  3020. LOG.debug(
  3021. "Updating minion provider properties of minion machine '%s' "
  3022. "following the completion of task '%s' (type '%s') to %s",
  3023. task_info['osmorphing_minion_machine_id'],
  3024. task.id, task_type, updated_values)
  3025. db_api.update_minion_machine(
  3026. ctxt, task_info['osmorphing_minion_machine_id'],
  3027. updated_values)
  3028. else:
  3029. LOG.debug(
  3030. "No post-task actions required for task '%s' of type '%s'",
  3031. task.id, task_type)
  3032. @parent_tasks_execution_synchronized
  3033. def task_completed(self, ctxt, task_id, task_result):
  3034. LOG.info("Task completed: %s", task_id)
  3035. task = db_api.get_task(ctxt, task_id)
  3036. if task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
  3037. if not task.on_error:
  3038. LOG.warn(
  3039. "Non-error task '%s' was marked as %s although it should "
  3040. "not have. It was run to completion anyway.",
  3041. task.id, task.status)
  3042. LOG.info(
  3043. "On-error task '%s' which was '%s' has just completed "
  3044. "successfully. Marking it as '%s' as a final status, "
  3045. "but processing its result as if it completed successfully.",
  3046. task_id, task.status,
  3047. constants.TASK_STATUS_CANCELED_AFTER_COMPLETION)
  3048. db_api.set_task_status(
  3049. ctxt, task_id, constants.TASK_STATUS_CANCELED_AFTER_COMPLETION,
  3050. exception_details=(
  3051. "This is a cleanup task so it was allowed to run to "
  3052. "completion after user-cancellation."))
  3053. elif task.status == constants.TASK_STATUS_CANCELLING:
  3054. LOG.error(
  3055. "Received confirmation that presumably cancelling task '%s' "
  3056. "(status '%s') has just completed successfully. "
  3057. "This should have never happened and indicates that its worker "
  3058. "host ('%s') has either failed to cancel it properly, or it "
  3059. "was completed before the cancellation request was received. "
  3060. "Please check the worker logs for more details. "
  3061. "Marking as %s and processing its result as if it completed "
  3062. "successfully.",
  3063. task.id, task.status, task.host,
  3064. constants.TASK_STATUS_CANCELED_AFTER_COMPLETION)
  3065. db_api.set_task_status(
  3066. ctxt, task_id, constants.TASK_STATUS_CANCELED_AFTER_COMPLETION,
  3067. exception_details=(
  3068. "The worker host for this task ('%s') has either failed "
  3069. "at cancelling it or the cancellation request arrived "
  3070. "after it was already completed so this task was run to "
  3071. "completion. Please review the worker logs for "
  3072. "more relevant details." % (
  3073. task.host)))
  3074. elif task.status == constants.TASK_STATUS_FAILED_TO_CANCEL:
  3075. LOG.error(
  3076. "Received confirmation that presumably '%s' task '%s' has just "
  3077. "completed successfully. Marking as '%s' and processing its "
  3078. "result as if it had completed normally.",
  3079. task.status, task.id,
  3080. constants.TASK_STATUS_CANCELED_AFTER_COMPLETION)
  3081. db_api.set_task_status(
  3082. ctxt, task_id, constants.TASK_STATUS_CANCELED_AFTER_COMPLETION,
  3083. exception_details=(
  3084. "The worker host for this task ('%s') had not either not "
  3085. "accepted task cancellation request when it was asked to "
  3086. "or had failed to receive the request, so this task was "
  3087. "run to completion. Please review the worker logs for "
  3088. "more relevant details." % (
  3089. task.host)))
  3090. elif task.status in constants.FINALIZED_TASK_STATUSES:
  3091. LOG.error(
  3092. "Received confirmation that presumably finalized task '%s' "
  3093. "(status '%s') has just completed successfully from worker "
  3094. "host '%s'. This should have never happened and indicates "
  3095. "that there is an inconsistency with the task scheduling. "
  3096. "Check the rest of the logs for further details. "
  3097. "The results of this task will NOT be processed.",
  3098. task.id, task.status, task.host)
  3099. return
  3100. else:
  3101. if task.status != constants.TASK_STATUS_RUNNING:
  3102. LOG.warn(
  3103. "Just-completed task '%s' was in '%s' state instead of "
  3104. "the expected '%s' state. Marking as '%s' anyway.",
  3105. task_id, task.status, constants.TASK_STATUS_RUNNING,
  3106. constants.TASK_STATUS_COMPLETED)
  3107. db_api.set_task_status(
  3108. ctxt, task_id, constants.TASK_STATUS_COMPLETED)
  3109. execution = db_api.get_tasks_execution(ctxt, task.execution_id)
  3110. with lockutils.lock(
  3111. constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
  3112. execution.type] % execution.action_id,
  3113. external=True):
  3114. action_id = execution.action_id
  3115. action = db_api.get_action(ctxt, action_id)
  3116. updated_task_info = None
  3117. if task_result:
  3118. LOG.info(
  3119. "Setting task %(task_id)s (type %(task_type)s)result for "
  3120. "instance %(instance)s into action %(action_id)s info: "
  3121. "%(task_result)s", {
  3122. "task_id": task_id,
  3123. "instance": task.instance,
  3124. "task_type": task.task_type,
  3125. "action_id": action_id,
  3126. "task_result": utils.sanitize_task_info(
  3127. task_result)})
  3128. updated_task_info = (
  3129. db_api.update_transfer_action_info_for_instance(
  3130. ctxt, action_id, task.instance, task_result))
  3131. else:
  3132. action = db_api.get_action(ctxt, action_id)
  3133. updated_task_info = action.info[task.instance]
  3134. LOG.info(
  3135. "Task '%s' for instance '%s' of transfer action '%s' "
  3136. "has completed successfuly but has not returned "
  3137. "any result.", task.id, task.instance, action_id)
  3138. # NOTE: refresh the execution just in case:
  3139. execution = db_api.get_tasks_execution(ctxt, task.execution_id)
  3140. self._handle_post_task_actions(
  3141. ctxt, task, execution, updated_task_info)
  3142. newly_started_tasks = self._advance_execution_state(
  3143. ctxt, execution, instance=task.instance, requery=False)
  3144. if newly_started_tasks:
  3145. LOG.info(
  3146. "The following tasks were started for execution '%s' "
  3147. "following the completion of task '%s' for instance %s: "
  3148. "%s" % (
  3149. execution.id, task.id, task.instance,
  3150. newly_started_tasks))
  3151. else:
  3152. LOG.debug(
  3153. "No new tasks were started for execution '%s' for instance "
  3154. "'%s' following the successful completion of task '%s'.",
  3155. execution.id, task.instance, task.id)
  3156. def _cancel_execution_for_osmorphing_debugging(self, ctxt, execution):
  3157. # go through all scheduled tasks and cancel them:
  3158. for subtask in execution.tasks:
  3159. if subtask.task_type == constants.TASK_TYPE_OS_MORPHING:
  3160. continue
  3161. if subtask.status in constants.ACTIVE_TASK_STATUSES:
  3162. raise exception.CoriolisException(
  3163. "Task %s is still in an active state (%s) although "
  3164. "it should not!" % (subtask.id, subtask.status))
  3165. if subtask.status in [
  3166. constants.TASK_STATUS_SCHEDULED,
  3167. constants.TASK_STATUS_ON_ERROR_ONLY]:
  3168. msg = (
  3169. "This task was unscheduled for debugging an error in the "
  3170. "OSMorphing process.")
  3171. if subtask.on_error:
  3172. msg = (
  3173. "%s Please note that any cleanup operations this task "
  3174. "should have included will need to performed manually "
  3175. "once the debugging process has been completed." % (
  3176. msg))
  3177. db_api.set_task_status(
  3178. ctxt, subtask.id,
  3179. constants.TASK_STATUS_CANCELED_FOR_DEBUGGING,
  3180. exception_details=msg)
  3181. @parent_tasks_execution_synchronized
  3182. def confirm_task_cancellation(self, ctxt, task_id, cancellation_details):
  3183. LOG.info(
  3184. "Received confirmation of cancellation for task '%s': %s",
  3185. task_id, cancellation_details)
  3186. task = db_api.get_task(ctxt, task_id)
  3187. final_status = constants.TASK_STATUS_CANCELED
  3188. exception_details = (
  3189. "This task was user-cancelled. Additional cancellation "
  3190. "info from worker service: '%s'" % cancellation_details)
  3191. if task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
  3192. LOG.error(
  3193. "Received cancellation confirmation for task '%s' which was "
  3194. "in '%s' state. This likely means that a double-cancellation "
  3195. "occurred. Marking task as '%s' either way.",
  3196. task.id, task.status, final_status)
  3197. elif task.status == constants.TASK_STATUS_FORCE_CANCELED:
  3198. # it means a force cancel has been issued before the
  3199. # confirmation that the task was canceled came in:
  3200. LOG.warn(
  3201. "Only just received cancellation confirmation for "
  3202. "force-canceled task '%s'. Leaving marked as "
  3203. "force-cancelled.", task.id)
  3204. final_status = constants.TASK_STATUS_FORCE_CANCELED
  3205. exception_details = (
  3206. "This task was force-cancelled. Confirmation of its "
  3207. "cancellation did eventually come in. Additional details on "
  3208. "the cancellation: %s" % cancellation_details)
  3209. elif task.status == constants.TASK_STATUS_FAILED_TO_CANCEL:
  3210. final_status = constants.TASK_STATUS_CANCELED
  3211. LOG.warn(
  3212. "Only just received cancellation confirmation for task '%s' "
  3213. "despite it being marked as '%s'. Marking as '%s' anyway. "
  3214. "Error reported by worker was: %s",
  3215. task.id, task.status, final_status, exception_details)
  3216. exception_details = (
  3217. "The worker service for this task (%s) had either failed to "
  3218. "cancel this task when it was initially asked to or did not "
  3219. "receive the cancellation request, but it did eventually "
  3220. "confirm the task's cancellation with the following "
  3221. "details: %s" % (task.host, cancellation_details))
  3222. elif task.status in constants.FINALIZED_TASK_STATUSES:
  3223. LOG.warn(
  3224. "Received confirmation of cancellation for already finalized "
  3225. "task '%s' (status '%s') from host '%s'. NOT modifying "
  3226. "its status.", task.id, task.status, task.host)
  3227. final_status = task.status
  3228. elif task.status != constants.TASK_STATUS_CANCELLING:
  3229. LOG.warn(
  3230. "Received confirmation of cancellation for non-CANCELLING "
  3231. "task '%s' (status '%s'). Marking as '%s' anyway.",
  3232. task.id, task.status, final_status)
  3233. if final_status == task.status:
  3234. LOG.debug(
  3235. "NOT altering state of finalized task '%s' ('%s') following "
  3236. "confirmation of cancellation. Updating its exception "
  3237. "details though: %s", task.id, task.status, exception_details)
  3238. db_api.set_task_status(
  3239. ctxt, task.id, final_status,
  3240. exception_details=exception_details)
  3241. else:
  3242. LOG.info(
  3243. "Transitioning canceled task '%s' from '%s' to '%s' following "
  3244. "confirmation of its cancellation.",
  3245. task.id, task.status, final_status)
  3246. db_api.set_task_status(
  3247. ctxt, task.id, final_status,
  3248. exception_details=exception_details)
  3249. execution = db_api.get_tasks_execution(ctxt, task.execution_id)
  3250. self._advance_execution_state(ctxt, execution, requery=False)
  3251. @parent_tasks_execution_synchronized
  3252. def set_task_error(self, ctxt, task_id, exception_details):
  3253. LOG.error(
  3254. "Received error confirmation for task: %(task_id)s - %(ex)s",
  3255. {"task_id": task_id, "ex": exception_details})
  3256. task = db_api.get_task(ctxt, task_id)
  3257. final_status = constants.TASK_STATUS_ERROR
  3258. if task.status == constants.TASK_STATUS_CANCELLING:
  3259. final_status = constants.TASK_STATUS_CANCELED
  3260. elif task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
  3261. final_status = constants.TASK_STATUS_CANCELED
  3262. if not task.on_error:
  3263. LOG.warn(
  3264. "Non-error '%s' was in '%s' status although it should "
  3265. "never have been marked as such. Marking as '%s' anyway.",
  3266. task.id, task.status, final_status)
  3267. else:
  3268. LOG.warn(
  3269. "On-error task '%s' which was in '%s' status ended up "
  3270. "error-ing. Marking as '%s'",
  3271. task.id, task.status, final_status)
  3272. exception_details = (
  3273. "This is a cleanup task so was allowed to complete "
  3274. "following user-cancellation, but encountered an "
  3275. "error: %s" % exception_details)
  3276. elif task.status == constants.TASK_STATUS_FORCE_CANCELED:
  3277. # it means a force cancel has been issued priorly but
  3278. # the task has error'd anyway:
  3279. LOG.warn(
  3280. "Only just received error confirmation for force-cancelled "
  3281. "task '%s'. Leaving marked as force-cancelled.", task.id)
  3282. final_status = constants.TASK_STATUS_FORCE_CANCELED
  3283. exception_details = (
  3284. "This task was force-cancelled but ended up errorring anyway. "
  3285. "The error details were: '%s'" % exception_details)
  3286. elif task.status == constants.TASK_STATUS_FAILED_TO_CANCEL:
  3287. final_status = constants.TASK_STATUS_CANCELED
  3288. LOG.warn(
  3289. "Only just received error confirmation for task '%s' despite "
  3290. "it being marked as '%s'. Marking as '%s' anyway. Error "
  3291. "reported by worker was: %s",
  3292. task.id, task.status, final_status, exception_details)
  3293. exception_details = (
  3294. "The worker service for this task (%s) has failed to cancel "
  3295. "this task when it was asked to, and the task eventually "
  3296. "exited with an error. The error details were: %s" % (
  3297. task.host, exception_details))
  3298. elif task.status in constants.FINALIZED_TASK_STATUSES:
  3299. LOG.error(
  3300. "Error confirmation on task '%s' arrived despite it being "
  3301. "in a terminal state ('%s'). This should never happen and "
  3302. "indicates an issue with its scheduling/handling. Error "
  3303. "was: %s", task.id, task.status, exception_details)
  3304. exception_details = (
  3305. "Error confirmation came in for this task despite it having "
  3306. "already been marked as %s. Please notify support of this "
  3307. "occurence and share the Conductor and Worker logs. "
  3308. "Error message in confirmation was: %s" % (
  3309. task.status, exception_details))
  3310. LOG.debug(
  3311. "Transitioning errored task '%s' from '%s' to '%s'",
  3312. task.id, task.status, final_status)
  3313. db_api.set_task_status(
  3314. ctxt, task_id, final_status, exception_details)
  3315. task = db_api.get_task(ctxt, task_id)
  3316. execution = db_api.get_tasks_execution(ctxt, task.execution_id)
  3317. action_id = execution.action_id
  3318. action = db_api.get_action(ctxt, action_id)
  3319. with lockutils.lock(
  3320. constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
  3321. execution.type] % action_id,
  3322. external=True):
  3323. if task.task_type == constants.TASK_TYPE_OS_MORPHING and (
  3324. CONF.conductor.debug_os_morphing_errors):
  3325. LOG.debug(
  3326. "Attempting to cancel execution '%s' of action '%s' "
  3327. "for OSMorphing debugging.", execution.id, action_id)
  3328. # NOTE: the OSMorphing task always runs by itself so no
  3329. # further tasks should be running, but we double-check here:
  3330. running = [
  3331. t for t in execution.tasks
  3332. if t.status in constants.ACTIVE_TASK_STATUSES
  3333. and t.task_type != constants.TASK_TYPE_OS_MORPHING]
  3334. if not running:
  3335. self._cancel_execution_for_osmorphing_debugging(
  3336. ctxt, execution)
  3337. db_api.set_task_status(
  3338. ctxt, task_id, final_status,
  3339. exception_details=(
  3340. "An error has occured during OSMorphing. Cleanup "
  3341. "will not be performed for debugging reasons. "
  3342. "Please review the Conductor logs for the debug "
  3343. "connection info. Original error was: %s" % (
  3344. exception_details)))
  3345. LOG.warn(
  3346. "All subtasks for Migration '%s' have been cancelled "
  3347. "to allow for OSMorphing debugging. The connection "
  3348. "info for the worker VM is: %s",
  3349. action_id, action.info.get(task.instance, {}).get(
  3350. 'osmorphing_connection_info', {}))
  3351. self._set_tasks_execution_status(
  3352. ctxt, execution.id,
  3353. constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING)
  3354. else:
  3355. LOG.warn(
  3356. "Some tasks are running in parallel with the "
  3357. "OSMorphing task, a debug setup cannot be safely "
  3358. "achieved. Proceeding with cleanup tasks as usual.")
  3359. self._cancel_tasks_execution(ctxt, execution)
  3360. else:
  3361. self._cancel_tasks_execution(ctxt, execution)
  3362. # NOTE: if this was a migration, make sure to delete
  3363. # its associated reservation.
  3364. if execution.type == constants.EXECUTION_TYPE_MIGRATION:
  3365. self._check_delete_reservation_for_transfer(action)
  3366. @task_synchronized
  3367. def task_event(self, ctxt, task_id, level, message):
  3368. LOG.info("Task event: %s", task_id)
  3369. task = db_api.get_task(ctxt, task_id)
  3370. if task.status not in constants.ACTIVE_TASK_STATUSES:
  3371. raise exception.InvalidTaskState(
  3372. "Task with ID '%s' is in a non-running state ('%s') but it "
  3373. "has received a task event from its task host ('%s'). "
  3374. "Refusing task event. The event string was: %s" % (
  3375. task.id, task.status, task.host, message))
  3376. db_api.add_task_event(ctxt, task_id, level, message)
  3377. @task_synchronized
  3378. def add_task_progress_update(self, ctxt, task_id, total_steps, message):
  3379. LOG.info("Adding task progress update: %s", task_id)
  3380. task = db_api.get_task(ctxt, task_id)
  3381. if task.status not in constants.ACTIVE_TASK_STATUSES:
  3382. raise exception.InvalidTaskState(
  3383. "Task with ID '%s' is in a non-running state ('%s') but it "
  3384. "has received a progress update from its task host ('%s'). "
  3385. "Refusing progress update. The progress update string "
  3386. "was: %s" % (
  3387. task.id, task.status, task.host, message))
  3388. db_api.add_task_progress_update(ctxt, task_id, total_steps, message)
  3389. @task_synchronized
  3390. def update_task_progress_update(self, ctxt, task_id, step,
  3391. total_steps, message):
  3392. LOG.info("Updating task progress update: %s", task_id)
  3393. db_api.update_task_progress_update(
  3394. ctxt, task_id, step, total_steps, message)
  3395. @task_synchronized
  3396. def get_task_progress_step(self, ctxt, task_id):
  3397. return db_api.get_task_progress_step(ctxt, task_id)
  3398. def _get_replica_schedule(self, ctxt, replica_id,
  3399. schedule_id, expired=True):
  3400. schedule = db_api.get_replica_schedule(
  3401. ctxt, replica_id, schedule_id, expired=expired)
  3402. if not schedule:
  3403. raise exception.NotFound(
  3404. "Schedule with ID '%s' for Replica '%s' not found." % (
  3405. schedule_id, replica_id))
  3406. return schedule
  3407. def create_replica_schedule(self, ctxt, replica_id,
  3408. schedule, enabled, exp_date,
  3409. shutdown_instance):
  3410. keystone.create_trust(ctxt)
  3411. replica = self._get_replica(ctxt, replica_id)
  3412. replica_schedule = models.ReplicaSchedule()
  3413. replica_schedule.id = str(uuid.uuid4())
  3414. replica_schedule.replica = replica
  3415. replica_schedule.replica_id = replica_id
  3416. replica_schedule.schedule = schedule
  3417. replica_schedule.expiration_date = exp_date
  3418. replica_schedule.enabled = enabled
  3419. replica_schedule.shutdown_instance = shutdown_instance
  3420. replica_schedule.trust_id = ctxt.trust_id
  3421. db_api.add_replica_schedule(
  3422. ctxt, replica_schedule,
  3423. lambda ctxt, sched: self._replica_cron_client.register(
  3424. ctxt, sched))
  3425. return self.get_replica_schedule(
  3426. ctxt, replica_id, replica_schedule.id)
  3427. @schedule_synchronized
  3428. def update_replica_schedule(self, ctxt, replica_id, schedule_id,
  3429. updated_values):
  3430. db_api.update_replica_schedule(
  3431. ctxt, replica_id, schedule_id, updated_values, None,
  3432. lambda ctxt, sched: self._replica_cron_client.register(
  3433. ctxt, sched))
  3434. return self._get_replica_schedule(ctxt, replica_id, schedule_id)
  3435. def _cleanup_schedule_resources(self, ctxt, schedule):
  3436. self._replica_cron_client.unregister(ctxt, schedule)
  3437. if schedule.trust_id:
  3438. tmp_trust = context.get_admin_context(
  3439. trust_id=schedule.trust_id)
  3440. keystone.delete_trust(tmp_trust)
  3441. @schedule_synchronized
  3442. def delete_replica_schedule(self, ctxt, replica_id, schedule_id):
  3443. db_api.delete_replica_schedule(
  3444. ctxt, replica_id, schedule_id, None,
  3445. lambda ctxt, sched: self._cleanup_schedule_resources(
  3446. ctxt, sched))
  3447. @replica_synchronized
  3448. def get_replica_schedules(self, ctxt, replica_id=None, expired=True):
  3449. return db_api.get_replica_schedules(
  3450. ctxt, replica_id=replica_id, expired=expired)
  3451. @schedule_synchronized
  3452. def get_replica_schedule(self, ctxt, replica_id,
  3453. schedule_id, expired=True):
  3454. return self._get_replica_schedule(
  3455. ctxt, replica_id, schedule_id, expired=expired)
  3456. @replica_synchronized
  3457. def update_replica(
  3458. self, ctxt, replica_id, updated_properties):
  3459. replica = self._get_replica(ctxt, replica_id)
  3460. minion_pool_fields = [
  3461. "origin_minion_pool_id", "destination_minion_pool_id",
  3462. "instance_osmorphing_minion_pool_mappings"]
  3463. if any([mpf in updated_properties for mpf in minion_pool_fields]):
  3464. # NOTE: this is just a dummy Replica model to use for validation:
  3465. dummy = models.Replica()
  3466. dummy.id = replica.id
  3467. dummy.instances = replica.instances
  3468. dummy.origin_endpoint_id = replica.origin_endpoint_id
  3469. dummy.destination_endpoint_id = replica.destination_endpoint_id
  3470. dummy.origin_minion_pool_id = updated_properties.get(
  3471. 'origin_minion_pool_id')
  3472. dummy.destination_minion_pool_id = updated_properties.get(
  3473. 'destination_minion_pool_id')
  3474. dummy.instance_osmorphing_minion_pool_mappings = (
  3475. updated_properties.get(
  3476. 'instance_osmorphing_minion_pool_mappings'))
  3477. self._check_minion_pools_for_action(ctxt, dummy)
  3478. self._check_replica_running_executions(ctxt, replica)
  3479. self._check_valid_replica_tasks_execution(replica, force=True)
  3480. execution = models.TasksExecution()
  3481. execution.id = str(uuid.uuid4())
  3482. execution.status = constants.EXECUTION_STATUS_UNEXECUTED
  3483. execution.action = replica
  3484. execution.type = constants.EXECUTION_TYPE_REPLICA_UPDATE
  3485. for instance in replica.instances:
  3486. LOG.debug(
  3487. "Pre-replica-update task_info for instance '%s' of Replica "
  3488. "'%s': %s", instance, replica_id,
  3489. utils.sanitize_task_info(
  3490. replica.info[instance]))
  3491. # NOTE: "circular assignment" would lead to a `None` value
  3492. # so we must operate on a copy:
  3493. inst_info_copy = copy.deepcopy(replica.info[instance])
  3494. # NOTE: we update the various values in the task info itself
  3495. # As a result, the values within the task_info will be the updated
  3496. # values which will be checked. The old values will be sent to the
  3497. # tasks through the origin/destination parameters for them to be
  3498. # compared to the new ones.
  3499. # The actual values on the Replica object itself will be set
  3500. # during _handle_post_task_actions once the final destination-side
  3501. # update task will be completed.
  3502. inst_info_copy.update({
  3503. key: updated_properties[key]
  3504. for key in updated_properties
  3505. if key != "destination_environment"})
  3506. # NOTE: the API service labels the target-env as the
  3507. # "destination_environment":
  3508. if "destination_environment" in updated_properties:
  3509. inst_info_copy["target_environment"] = updated_properties[
  3510. "destination_environment"]
  3511. replica.info[instance] = inst_info_copy
  3512. LOG.debug(
  3513. "Updated task_info for instance '%s' of Replica "
  3514. "'%s' which will be verified during update procedure: %s",
  3515. instance, replica_id, utils.sanitize_task_info(
  3516. replica.info[instance]))
  3517. get_instance_info_task = self._create_task(
  3518. instance, constants.TASK_TYPE_GET_INSTANCE_INFO,
  3519. execution)
  3520. update_source_replica_task = self._create_task(
  3521. instance, constants.TASK_TYPE_UPDATE_SOURCE_REPLICA,
  3522. execution)
  3523. self._create_task(
  3524. instance, constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA,
  3525. execution,
  3526. depends_on=[
  3527. get_instance_info_task.id,
  3528. # NOTE: the dest-side update task must be done after
  3529. # the source-side one as both can potentially modify
  3530. # the 'volumes_info' together:
  3531. update_source_replica_task.id])
  3532. self._check_execution_tasks_sanity(execution, replica.info)
  3533. # update the action info for all of the instances in the Replica:
  3534. for instance in execution.action.instances:
  3535. db_api.update_transfer_action_info_for_instance(
  3536. ctxt, replica.id, instance, replica.info[instance])
  3537. db_api.add_replica_tasks_execution(ctxt, execution)
  3538. LOG.debug("Execution for Replica update tasks created: %s",
  3539. execution.id)
  3540. self._begin_tasks(ctxt, execution, task_info=replica.info)
  3541. return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
  3542. def get_diagnostics(self, ctxt):
  3543. return utils.get_diagnostics_info()
  3544. def create_region(self, ctxt, region_name, description="", enabled=True):
  3545. region = models.Region()
  3546. region.id = str(uuid.uuid4())
  3547. region.name = region_name
  3548. region.description = description
  3549. region.enabled = enabled
  3550. db_api.add_region(ctxt, region)
  3551. return self.get_region(ctxt, region.id)
  3552. def get_regions(self, ctxt):
  3553. return db_api.get_regions(ctxt)
  3554. @region_synchronized
  3555. def get_region(self, ctxt, region_id):
  3556. region = db_api.get_region(ctxt, region_id)
  3557. if not region:
  3558. raise exception.NotFound(
  3559. "Region with ID '%s' not found." % region_id)
  3560. return region
  3561. @region_synchronized
  3562. def update_region(self, ctxt, region_id, updated_values):
  3563. LOG.info(
  3564. "Attempting to update region '%s' with payload: %s",
  3565. region_id, updated_values)
  3566. db_api.update_region(ctxt, region_id, updated_values)
  3567. LOG.info("Region '%s' successfully updated", region_id)
  3568. return db_api.get_region(ctxt, region_id)
  3569. @region_synchronized
  3570. def delete_region(self, ctxt, region_id):
  3571. # TODO(aznashwan): add checks for endpoints/services
  3572. # associated to the region before deletion:
  3573. db_api.delete_region(ctxt, region_id)
  3574. def register_service(
  3575. self, ctxt, host, binary, topic, enabled, mapped_regions=None,
  3576. providers=None, specs=None):
  3577. service = db_api.find_service(ctxt, host, binary, topic=topic)
  3578. if service:
  3579. raise exception.Conflict(
  3580. "A Service with the specified parameters (host %s, binary %s, "
  3581. "topic %s) has already been registered under ID: %s" % (
  3582. host, binary, topic, service.id))
  3583. service = models.Service()
  3584. service.id = str(uuid.uuid4())
  3585. service.host = host
  3586. service.binary = binary
  3587. service.enabled = enabled
  3588. service.topic = topic
  3589. service.status = constants.SERVICE_STATUS_UP
  3590. if None in (providers, specs):
  3591. worker_rpc = self._get_worker_rpc_for_host(service['host'])
  3592. status = worker_rpc.get_service_status(ctxt)
  3593. service.providers = status["providers"]
  3594. service.specs = status["specs"]
  3595. else:
  3596. service.providers = providers
  3597. service.specs = specs
  3598. # create the service:
  3599. db_api.add_service(ctxt, service)
  3600. LOG.debug(
  3601. "Added new service to DB: %s", service.id)
  3602. # add region associations:
  3603. if mapped_regions:
  3604. try:
  3605. db_api.update_service(
  3606. ctxt, service.id, {
  3607. "mapped_regions": mapped_regions})
  3608. except Exception as ex:
  3609. LOG.warn(
  3610. "Error adding region mappings during new service "
  3611. "registration (host: %s), cleaning up endpoint and "
  3612. "all created mappings for regions: %s",
  3613. service.host, mapped_regions)
  3614. db_api.delete_service(ctxt, service.id)
  3615. raise
  3616. return self.get_service(ctxt, service.id)
  3617. def check_service_registered(self, ctxt, host, binary, topic):
  3618. props = "host='%s', binary='%s', topic='%s'" % (host, binary, topic)
  3619. LOG.debug(
  3620. "Checking for existence of service with properties: %s", props)
  3621. service = db_api.find_service(ctxt, host, binary, topic=topic)
  3622. if service:
  3623. LOG.debug(
  3624. "Found service '%s' for properties %s", service.id, props)
  3625. else:
  3626. LOG.debug(
  3627. "Could not find any service with the specified "
  3628. "properties: %s", props)
  3629. return service
  3630. @service_synchronized
  3631. def refresh_service_status(self, ctxt, service_id):
  3632. LOG.debug("Updating registration for worker service '%s'", service_id)
  3633. service = db_api.get_service(ctxt, service_id)
  3634. worker_rpc = self._get_worker_rpc_for_host(service['host'])
  3635. status = worker_rpc.get_service_status(ctxt)
  3636. updated_values = {
  3637. "providers": status["providers"],
  3638. "specs": status["specs"],
  3639. "status": constants.SERVICE_STATUS_UP}
  3640. db_api.update_service(ctxt, service_id, updated_values)
  3641. LOG.debug("Successfully refreshed status of service '%s'", service_id)
  3642. return db_api.get_service(ctxt, service_id)
  3643. def get_services(self, ctxt):
  3644. return db_api.get_services(ctxt)
  3645. @service_synchronized
  3646. def get_service(self, ctxt, service_id):
  3647. service = db_api.get_service(ctxt, service_id)
  3648. if not service:
  3649. raise exception.NotFound(
  3650. "Service with ID '%s' not found." % service_id)
  3651. return service
  3652. @service_synchronized
  3653. def update_service(self, ctxt, service_id, updated_values):
  3654. LOG.info(
  3655. "Attempting to update service '%s' with payload: %s",
  3656. service_id, updated_values)
  3657. db_api.update_service(ctxt, service_id, updated_values)
  3658. LOG.info("Successfully updated service '%s'", service_id)
  3659. return db_api.get_service(ctxt, service_id)
  3660. @service_synchronized
  3661. def delete_service(self, ctxt, service_id):
  3662. db_api.delete_service(ctxt, service_id)
  3663. def create_minion_pool(
  3664. self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
  3665. environment_options, minimum_minions, maximum_minions,
  3666. minion_max_idle_time, minion_retention_strategy, notes=None):
  3667. endpoint = db_api.get_endpoint(ctxt, endpoint_id)
  3668. minion_pool = models.MinionPoolLifecycle()
  3669. minion_pool.id = str(uuid.uuid4())
  3670. minion_pool.pool_name = name
  3671. minion_pool.notes = notes
  3672. minion_pool.pool_platform = pool_platform
  3673. minion_pool.pool_os_type = pool_os_type
  3674. minion_pool.pool_status = constants.MINION_POOL_STATUS_UNINITIALIZED
  3675. minion_pool.minimum_minions = minimum_minions
  3676. minion_pool.maximum_minions = maximum_minions
  3677. minion_pool.minion_max_idle_time = minion_max_idle_time
  3678. minion_pool.minion_retention_strategy = minion_retention_strategy
  3679. # TODO(aznashwan): These field redundancies should be
  3680. # eliminated once the DB model hirearchy is overhauled:
  3681. minion_pool.origin_endpoint_id = endpoint_id
  3682. minion_pool.destination_endpoint_id = endpoint_id
  3683. minion_pool.source_environment = environment_options
  3684. minion_pool.destination_environment = environment_options
  3685. minion_pool.instances = []
  3686. minion_pool.info = {}
  3687. db_api.add_minion_pool_lifecycle(ctxt, minion_pool)
  3688. return self.get_minion_pool(ctxt, minion_pool.id)
  3689. def get_minion_pools(self, ctxt, include_tasks_executions=False):
  3690. return db_api.get_minion_pool_lifecycles(
  3691. ctxt, include_tasks_executions=include_tasks_executions,
  3692. include_machines=True)
  3693. def _get_minion_pool(
  3694. self, ctxt, minion_pool_id, include_tasks_executions=True,
  3695. include_machines=True):
  3696. minion_pool = db_api.get_minion_pool_lifecycle(
  3697. ctxt, minion_pool_id, include_machines=include_machines,
  3698. include_tasks_executions=include_tasks_executions)
  3699. if not minion_pool:
  3700. raise exception.NotFound(
  3701. "Minion pool with ID '%s' not found." % minion_pool_id)
  3702. return minion_pool
  3703. @minion_pool_synchronized
  3704. def set_up_shared_minion_pool_resources(self, ctxt, minion_pool_id):
  3705. LOG.info(
  3706. "Attempting to set up shared resources for Minion Pool '%s'.",
  3707. minion_pool_id)
  3708. minion_pool = db_api.get_minion_pool_lifecycle(
  3709. ctxt, minion_pool_id, include_tasks_executions=False,
  3710. include_machines=False)
  3711. if minion_pool.pool_status != constants.MINION_POOL_STATUS_UNINITIALIZED:
  3712. raise exception.InvalidMinionPoolState(
  3713. "Minion Pool '%s' cannot have shared resources set up as it "
  3714. "is in '%s' state instead of the expected %s."% (
  3715. minion_pool_id, minion_pool.pool_status,
  3716. constants.MINION_POOL_STATUS_UNINITIALIZED))
  3717. execution = models.TasksExecution()
  3718. execution.id = str(uuid.uuid4())
  3719. execution.action = minion_pool
  3720. execution.status = constants.EXECUTION_STATUS_UNEXECUTED
  3721. execution.type = (
  3722. constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES)
  3723. minion_pool.info[minion_pool_id] = {
  3724. "pool_os_type": minion_pool.pool_os_type,
  3725. "pool_identifier": minion_pool.id,
  3726. # TODO(aznashwan): remove redundancy once transfer
  3727. # action DB models have been overhauled:
  3728. "pool_environment_options": minion_pool.source_environment}
  3729. validate_task_type = (
  3730. constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS)
  3731. set_up_task_type = (
  3732. constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES)
  3733. if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
  3734. validate_task_type = (
  3735. constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS)
  3736. set_up_task_type = (
  3737. constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES)
  3738. validate_pool_options_task = self._create_task(
  3739. minion_pool.id, validate_task_type, execution)
  3740. setup_pool_resources_task = self._create_task(
  3741. minion_pool.id,
  3742. set_up_task_type,
  3743. execution,
  3744. depends_on=[validate_pool_options_task.id])
  3745. self._check_execution_tasks_sanity(execution, minion_pool.info)
  3746. # update the action info for the pool's instance:
  3747. db_api.update_transfer_action_info_for_instance(
  3748. ctxt, minion_pool.id, minion_pool.id,
  3749. minion_pool.info[minion_pool.id])
  3750. # add new execution to DB:
  3751. db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
  3752. LOG.info(
  3753. "Minion pool shared resource creation execution created: %s",
  3754. execution.id)
  3755. self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
  3756. db_api.set_minion_pool_lifecycle_status(
  3757. ctxt, minion_pool.id, constants.MINION_POOL_STATUS_INITIALIZING)
  3758. return self._get_minion_pool_lifecycle_execution(
  3759. ctxt, minion_pool_id, execution.id).to_dict()
  3760. @minion_pool_synchronized
  3761. def tear_down_shared_minion_pool_resources(
  3762. self, ctxt, minion_pool_id, force=False):
  3763. minion_pool = db_api.get_minion_pool_lifecycle(
  3764. ctxt, minion_pool_id, include_tasks_executions=False,
  3765. include_machines=False)
  3766. if minion_pool.pool_status != (
  3767. constants.MINION_POOL_STATUS_DEALLOCATED) and not force:
  3768. raise exception.InvalidMinionPoolState(
  3769. "Minion Pool '%s' cannot have shared resources torn down as it"
  3770. " is in '%s' state instead of the expected %s. "
  3771. "Please use the force flag if you are certain you want "
  3772. "to tear down the shared resources for this pool." % (
  3773. minion_pool_id, minion_pool.pool_status,
  3774. constants.MINION_POOL_STATUS_DEALLOCATED))
  3775. LOG.info(
  3776. "Attempting to tear down shared resources for Minion Pool '%s'.",
  3777. minion_pool_id)
  3778. execution = models.TasksExecution()
  3779. execution.id = str(uuid.uuid4())
  3780. execution.action = minion_pool
  3781. execution.status = constants.EXECUTION_STATUS_UNEXECUTED
  3782. execution.type = (
  3783. constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES)
  3784. tear_down_task_type = (
  3785. constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES)
  3786. if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
  3787. tear_down_task_type = (
  3788. constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES)
  3789. self._create_task(
  3790. minion_pool.id, tear_down_task_type, execution)
  3791. self._check_execution_tasks_sanity(execution, minion_pool.info)
  3792. # update the action info for the pool's instance:
  3793. db_api.update_transfer_action_info_for_instance(
  3794. ctxt, minion_pool.id, minion_pool.id,
  3795. minion_pool.info[minion_pool.id])
  3796. # add new execution to DB:
  3797. db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
  3798. LOG.info(
  3799. "Minion pool shared resource teardown execution created: %s",
  3800. execution.id)
  3801. self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
  3802. db_api.set_minion_pool_lifecycle_status(
  3803. ctxt, minion_pool.id, constants.MINION_POOL_STATUS_UNINITIALIZING)
  3804. return self._get_minion_pool_lifecycle_execution(
  3805. ctxt, minion_pool_id, execution.id).to_dict()
  3806. @minion_pool_synchronized
  3807. def allocate_minion_pool_machines(self, ctxt, minion_pool_id):
  3808. LOG.info("Attempting to allocate Minion Pool '%s'.", minion_pool_id)
  3809. minion_pool = self._get_minion_pool(
  3810. ctxt, minion_pool_id, include_tasks_executions=False,
  3811. include_machines=True)
  3812. if minion_pool.pool_status != constants.MINION_POOL_STATUS_DEALLOCATED:
  3813. raise exception.InvalidMinionPoolState(
  3814. "Minion machines for pool '%s' cannot be allocated as the pool"
  3815. " is in '%s' state instead of the expected %s."% (
  3816. minion_pool_id, minion_pool.pool_status,
  3817. constants.MINION_POOL_STATUS_DEALLOCATED))
  3818. execution = models.TasksExecution()
  3819. execution.id = str(uuid.uuid4())
  3820. execution.action = minion_pool
  3821. execution.status = constants.EXECUTION_STATUS_UNEXECUTED
  3822. execution.type = constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS
  3823. new_minion_machine_ids = [
  3824. str(uuid.uuid4()) for _ in range(minion_pool.minimum_minions)]
  3825. create_minion_task_type = (
  3826. constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE)
  3827. delete_minion_task_type = (
  3828. constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
  3829. if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
  3830. create_minion_task_type = (
  3831. constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE)
  3832. delete_minion_task_type = (
  3833. constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
  3834. for minion_machine_id in new_minion_machine_ids:
  3835. minion_pool.info[minion_machine_id] = {
  3836. "pool_identifier": minion_pool_id,
  3837. "pool_os_type": minion_pool.pool_os_type,
  3838. "pool_shared_resources": minion_pool.pool_shared_resources,
  3839. "pool_environment_options": minion_pool.source_environment,
  3840. # NOTE: we default this to an empty dict here to avoid possible
  3841. # task info conflicts on the cleanup task below for minions
  3842. # which were slower to deploy:
  3843. "minion_provider_properties": {}}
  3844. create_minion_task = self._create_task(
  3845. minion_machine_id, create_minion_task_type, execution)
  3846. self._create_task(
  3847. minion_machine_id,
  3848. delete_minion_task_type,
  3849. execution, on_error_only=True,
  3850. depends_on=[create_minion_task.id])
  3851. self._check_execution_tasks_sanity(execution, minion_pool.info)
  3852. # update the action info for all of the pool's minions:
  3853. for minion_machine_id in new_minion_machine_ids:
  3854. db_api.update_transfer_action_info_for_instance(
  3855. ctxt, minion_pool.id, minion_machine_id,
  3856. minion_pool.info[minion_machine_id])
  3857. # add new execution to DB:
  3858. db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
  3859. LOG.info("Minion pool allocation execution created: %s", execution.id)
  3860. self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
  3861. db_api.set_minion_pool_lifecycle_status(
  3862. ctxt, minion_pool.id, constants.MINION_POOL_STATUS_ALLOCATING)
  3863. return self._get_minion_pool_lifecycle_execution(
  3864. ctxt, minion_pool_id, execution.id).to_dict()
  3865. def _check_all_pool_minion_machines_available(self, minion_pool):
  3866. if not minion_pool.minion_machines:
  3867. LOG.debug(
  3868. "Minion pool '%s' does not have any allocated machines.",
  3869. minion_pool.id)
  3870. return
  3871. allocated_machine_statuses = {
  3872. machine.id: machine.status
  3873. for machine in minion_pool.minion_machines
  3874. if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE}
  3875. if allocated_machine_statuses:
  3876. raise exception.InvalidMinionPoolState(
  3877. "Minion pool with ID '%s' has one or more machines which are "
  3878. "in-use or otherwise unmodifiable: %s" % (
  3879. minion_pool.id,
  3880. allocated_machine_statuses))
  3881. @minion_pool_synchronized
  3882. def deallocate_minion_pool_machines(self, ctxt, minion_pool_id, force=False):
  3883. LOG.info("Attempting to deallocate Minion Pool '%s'.", minion_pool_id)
  3884. minion_pool = db_api.get_minion_pool_lifecycle(
  3885. ctxt, minion_pool_id, include_tasks_executions=False,
  3886. include_machines=True)
  3887. if minion_pool.pool_status not in (
  3888. constants.MINION_POOL_STATUS_ALLOCATED) and not force:
  3889. raise exception.InvalidMinionPoolState(
  3890. "Minion Pool '%s' cannot be deallocated as it is in '%s' "
  3891. "state instead of the expected '%s'. Please use the "
  3892. "force flag if you are certain you want to deallocate "
  3893. "the minion pool's machines." % (
  3894. minion_pool_id, minion_pool.pool_status,
  3895. constants.MINION_POOL_STATUS_ALLOCATED))
  3896. if not force:
  3897. self._check_all_pool_minion_machines_available(minion_pool)
  3898. execution = models.TasksExecution()
  3899. execution.id = str(uuid.uuid4())
  3900. execution.action = minion_pool
  3901. execution.status = constants.EXECUTION_STATUS_UNEXECUTED
  3902. execution.type = (
  3903. constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS)
  3904. delete_minion_task_type = (
  3905. constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
  3906. if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
  3907. delete_minion_task_type = (
  3908. constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
  3909. for minion_machine in minion_pool.minion_machines:
  3910. minion_machine_id = minion_machine.id
  3911. minion_pool.info[minion_machine_id] = {
  3912. "pool_environment_options": minion_pool.source_environment,
  3913. "minion_provider_properties": (
  3914. minion_machine.provider_properties)}
  3915. self._create_task(
  3916. minion_machine_id, delete_minion_task_type,
  3917. # NOTE: we set 'on_error=True' to allow for the completion of
  3918. # already running deletion tasks to prevent partial deletes:
  3919. execution, on_error=True)
  3920. self._check_execution_tasks_sanity(execution, minion_pool.info)
  3921. # update the action info for all of the pool's minions:
  3922. for minion_machine in minion_pool.minion_machines:
  3923. db_api.update_transfer_action_info_for_instance(
  3924. ctxt, minion_pool.id, minion_machine.id,
  3925. minion_pool.info[minion_machine.id])
  3926. # add new execution to DB:
  3927. db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
  3928. LOG.info(
  3929. "Minion pool deallocation execution created: %s", execution.id)
  3930. self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
  3931. db_api.set_minion_pool_lifecycle_status(
  3932. ctxt, minion_pool.id, constants.MINION_POOL_STATUS_DEALLOCATING)
  3933. return self._get_minion_pool_lifecycle_execution(
  3934. ctxt, minion_pool_id, execution.id).to_dict()
  3935. @minion_pool_synchronized
  3936. def get_minion_pool(self, ctxt, minion_pool_id):
  3937. return self._get_minion_pool(
  3938. ctxt, minion_pool_id, include_tasks_executions=True,
  3939. include_machines=True)
  3940. @minion_pool_synchronized
  3941. def update_minion_pool(self, ctxt, minion_pool_id, updated_values):
  3942. minion_pool = self._get_minion_pool(
  3943. ctxt, minion_pool_id, include_tasks_executions=False,
  3944. include_machines=False)
  3945. if minion_pool.pool_status != constants.MINION_POOL_STATUS_UNINITIALIZED:
  3946. raise exception.InvalidMinionPoolState(
  3947. "Minion Pool '%s' cannot be updated as it is in '%s' status "
  3948. "instead of the expected '%s'. Please ensure the pool machines"
  3949. "have been deallocated and the pool's supporting resources "
  3950. "have been torn down before updating the pool." % (
  3951. minion_pool_id, minion_pool.pool_status,
  3952. constants.MINION_POOL_STATUS_UNINITIALIZED))
  3953. LOG.info(
  3954. "Attempting to update minion_pool '%s' with payload: %s",
  3955. minion_pool_id, updated_values)
  3956. db_api.update_minion_pool_lifecycle(ctxt, minion_pool_id, updated_values)
  3957. LOG.info("Minion Pool '%s' successfully updated", minion_pool_id)
  3958. return db_api.get_minion_pool_lifecycle(ctxt, minion_pool_id)
  3959. @minion_pool_synchronized
  3960. def delete_minion_pool(self, ctxt, minion_pool_id):
  3961. minion_pool = self._get_minion_pool(
  3962. ctxt, minion_pool_id, include_tasks_executions=False,
  3963. include_machines=True)
  3964. acceptable_deletion_statuses = [
  3965. constants.MINION_POOL_STATUS_UNINITIALIZED,
  3966. constants.MINION_POOL_STATUS_ERROR]
  3967. if minion_pool.pool_status not in acceptable_deletion_statuses:
  3968. raise exception.InvalidMinionPoolState(
  3969. "Minion Pool '%s' cannot be deleted as it is in '%s' status "
  3970. "instead of one of the expected '%s'. Please ensure the pool "
  3971. "machines have been deallocated and the pool's supporting "
  3972. "resources have been torn down before deleting the pool." % (
  3973. minion_pool_id, minion_pool.pool_status,
  3974. acceptable_deletion_statuses))
  3975. LOG.info("Deleting minion pool with ID '%s'" % minion_pool_id)
  3976. db_api.delete_minion_pool_lifecycle(ctxt, minion_pool_id)
  3977. @minion_pool_synchronized
  3978. def get_minion_pool_lifecycle_executions(
  3979. self, ctxt, minion_pool_id, include_tasks=False):
  3980. return db_api.get_minion_pool_lifecycle_executions(
  3981. ctxt, minion_pool_id, include_tasks)
  3982. def _get_minion_pool_lifecycle_execution(
  3983. self, ctxt, minion_pool_id, execution_id):
  3984. execution = db_api.get_minion_pool_lifecycle_execution(
  3985. ctxt, minion_pool_id, execution_id)
  3986. if not execution:
  3987. raise exception.NotFound(
  3988. "Execution with ID '%s' for Minion Pool '%s' not found." % (
  3989. execution_id, minion_pool_id))
  3990. return execution
  3991. @minion_pool_tasks_execution_synchronized
  3992. def get_minion_pool_lifecycle_execution(
  3993. self, ctxt, minion_pool_id, execution_id):
  3994. return self._get_minion_pool_lifecycle_execution(
  3995. ctxt, minion_pool_id, execution_id).to_dict()
  3996. @minion_pool_tasks_execution_synchronized
  3997. def delete_minion_pool_lifecycle_execution(
  3998. self, ctxt, minion_pool_id, execution_id):
  3999. execution = self._get_minion_pool_lifecycle_execution(
  4000. ctxt, minion_pool_id, execution_id)
  4001. if execution.status in constants.ACTIVE_EXECUTION_STATUSES:
  4002. raise exception.InvalidMigrationState(
  4003. "Cannot delete execution '%s' for Minion pool '%s' as it is "
  4004. "currently in '%s' state." % (
  4005. execution_id, minion_pool_id, execution.status))
  4006. db_api.delete_minion_pool_lifecycle_execution(ctxt, execution_id)
  4007. @minion_pool_tasks_execution_synchronized
  4008. def cancel_minion_pool_lifecycle_execution(
  4009. self, ctxt, minion_pool_id, execution_id, force):
  4010. execution = self._get_minion_pool_lifecycle_execution(
  4011. ctxt, minion_pool_id, execution_id)
  4012. if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
  4013. raise exception.InvalidMinionPoolState(
  4014. "Minion pool '%s' has no running execution to cancel." % (
  4015. minion_pool_id))
  4016. if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
  4017. not force):
  4018. raise exception.InvalidMinionPoolState(
  4019. "Execution for Minion Pool '%s' is already being cancelled. "
  4020. "Please use the force option if you'd like to force-cancel "
  4021. "it." % (minion_pool_id))
  4022. self._cancel_tasks_execution(ctxt, execution, force=force)