|
|
@@ -12,6 +12,8 @@ from coriolis.db import api as db_api
|
|
|
from coriolis.db.sqlalchemy import models
|
|
|
from coriolis import exception
|
|
|
from coriolis import keystone
|
|
|
+from coriolis.providers import factory as providers_factory
|
|
|
+from coriolis import schemas
|
|
|
from coriolis import utils
|
|
|
from coriolis.worker.rpc import client as rpc_worker_client
|
|
|
|
|
|
@@ -20,6 +22,16 @@ VERSION = "1.0"
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
+def endpoint_synchronized(func):
|
|
|
+ @functools.wraps(func)
|
|
|
+ def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
|
|
|
+ @lockutils.synchronized(endpoint_id)
|
|
|
+ def inner():
|
|
|
+ return func(self, ctxt, endpoint_id, *args, **kwargs)
|
|
|
+ return inner()
|
|
|
+ return wrapper
|
|
|
+
|
|
|
+
|
|
|
def replica_synchronized(func):
|
|
|
@functools.wraps(func)
|
|
|
def wrapper(self, ctxt, replica_id, *args, **kwargs):
|
|
|
@@ -64,6 +76,51 @@ class ConductorServerEndpoint(object):
|
|
|
def __init__(self):
|
|
|
self._rpc_worker_client = rpc_worker_client.WorkerClient()
|
|
|
|
|
|
+ def create_endpoint(self, ctxt, name, endpoint_type, description,
|
|
|
+ connection_info):
|
|
|
+ endpoint = models.Endpoint()
|
|
|
+ endpoint.name = name
|
|
|
+ endpoint.type = endpoint_type
|
|
|
+ endpoint.description = description
|
|
|
+ endpoint.connection_info = connection_info
|
|
|
+
|
|
|
+ db_api.add_endpoint(ctxt, endpoint)
|
|
|
+ LOG.info("Endpoint created: %s", endpoint.id)
|
|
|
+ return self.get_endpoint(ctxt, endpoint.id)
|
|
|
+
|
|
|
+ def get_endpoints(self, ctxt):
|
|
|
+ return db_api.get_endpoints(ctxt)
|
|
|
+
|
|
|
+ @endpoint_synchronized
|
|
|
+ def get_endpoint(self, ctxt, endpoint_id):
|
|
|
+ endpoint = db_api.get_endpoint(ctxt, endpoint_id)
|
|
|
+ if not endpoint:
|
|
|
+ raise exception.NotFound("Endpoint not found")
|
|
|
+ return endpoint
|
|
|
+
|
|
|
+ @endpoint_synchronized
|
|
|
+ def delete_endpoint(self, ctxt, endpoint_id):
|
|
|
+ db_api.delete_endpoint(ctxt, endpoint_id)
|
|
|
+
|
|
|
+ def get_endpoint_instances(self, ctxt, endpoint_id, marker, limit,
|
|
|
+ instance_name_pattern):
|
|
|
+ endpoint = self.get_endpoint(ctxt, endpoint_id)
|
|
|
+
|
|
|
+ export_provider = providers_factory.get_provider(
|
|
|
+ endpoint.type, constants.PROVIDER_TYPE_ENDPOINT, None)
|
|
|
+
|
|
|
+ connection_info = utils.get_secret_connection_info(
|
|
|
+ ctxt, endpoint.connection_info)
|
|
|
+
|
|
|
+ instances_info = export_provider.get_instances(
|
|
|
+ ctxt, connection_info, last_seen_id=marker, limit=limit,
|
|
|
+ instance_name_pattern=instance_name_pattern)
|
|
|
+ for instance_info in instances_info:
|
|
|
+ schemas.validate_value(
|
|
|
+ instance_info, schemas.CORIOLIS_VM_INSTANCE_INFO_SCHEMA)
|
|
|
+
|
|
|
+ return instances_info
|
|
|
+
|
|
|
@staticmethod
|
|
|
def _create_task(instance, task_type, execution, depends_on=None,
|
|
|
on_error=False):
|
|
|
@@ -87,9 +144,27 @@ class ConductorServerEndpoint(object):
|
|
|
break
|
|
|
return task
|
|
|
|
|
|
+ def _get_task_origin(self, ctxt, action):
|
|
|
+ endpoint = self.get_endpoint(ctxt, action.origin_endpoint_id)
|
|
|
+ return {
|
|
|
+ "connection_info": endpoint.connection_info,
|
|
|
+ "type": endpoint.type
|
|
|
+ }
|
|
|
+
|
|
|
+ def _get_task_destination(self, ctxt, action):
|
|
|
+ endpoint = self.get_endpoint(ctxt, action.destination_endpoint_id)
|
|
|
+ return {
|
|
|
+ "connection_info": endpoint.connection_info,
|
|
|
+ "type": endpoint.type,
|
|
|
+ "target_environment": action.destination_environment
|
|
|
+ }
|
|
|
+
|
|
|
def _begin_tasks(self, ctxt, execution, task_info={}):
|
|
|
keystone.create_trust(ctxt)
|
|
|
|
|
|
+ origin = self._get_task_origin(ctxt, execution.action)
|
|
|
+ destination = self._get_task_destination(ctxt, execution.action)
|
|
|
+
|
|
|
for task in execution.tasks:
|
|
|
if (not task.depends_on and
|
|
|
task.status == constants.TASK_STATUS_PENDING):
|
|
|
@@ -97,8 +172,8 @@ class ConductorServerEndpoint(object):
|
|
|
ctxt, server=None,
|
|
|
task_id=task.id,
|
|
|
task_type=task.task_type,
|
|
|
- origin=execution.action.origin,
|
|
|
- destination=execution.action.destination,
|
|
|
+ origin=origin,
|
|
|
+ destination=destination,
|
|
|
instance=task.instance,
|
|
|
task_info=task_info.get(task.instance, {}))
|
|
|
|
|
|
@@ -243,18 +318,24 @@ class ConductorServerEndpoint(object):
|
|
|
return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
|
|
|
|
|
|
@staticmethod
|
|
|
- def _check_endpoints(ctxt, origin, destination):
|
|
|
+ def _check_endpoints(ctxt, origin_endpoint, destination_endpoint):
|
|
|
# TODO(alexpilotti): check Barbican secrets content as well
|
|
|
- if origin.get("connection_info") == destination.get("connection_info"):
|
|
|
+ if (origin_endpoint.connection_info ==
|
|
|
+ destination_endpoint.connection_info):
|
|
|
raise exception.SameDestination()
|
|
|
|
|
|
- def create_instances_replica(self, ctxt, origin, destination, instances):
|
|
|
- self._check_endpoints(ctxt, origin, destination)
|
|
|
+ def create_instances_replica(self, ctxt, origin_endpoint_id,
|
|
|
+ destination_endpoint_id,
|
|
|
+ destination_environment, instances):
|
|
|
+ origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id)
|
|
|
+ destination_endpoint = self.get_endpoint(ctxt, destination_endpoint_id)
|
|
|
+ self._check_endpoints(ctxt, origin_endpoint, destination_endpoint)
|
|
|
|
|
|
replica = models.Replica()
|
|
|
replica.id = str(uuid.uuid4())
|
|
|
- replica.origin = origin
|
|
|
- replica.destination = destination
|
|
|
+ replica.origin_endpoint = origin_endpoint
|
|
|
+ replica.destination_endpoint = destination_endpoint
|
|
|
+ replica.destination_environment = destination_environment
|
|
|
replica.instances = instances
|
|
|
replica.executions = []
|
|
|
replica.info = {}
|
|
|
@@ -331,8 +412,9 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
migration = models.Migration()
|
|
|
migration.id = str(uuid.uuid4())
|
|
|
- migration.origin = replica.origin
|
|
|
- migration.destination = replica.destination
|
|
|
+ migration.origin_endpoint_id = replica.origin_endpoint_id
|
|
|
+ migration.destination_endpoint_id = replica.destination_endpoint_id
|
|
|
+ migration.destination_environment = replica.destination_environment
|
|
|
migration.instances = instances
|
|
|
migration.replica = replica
|
|
|
migration.info = replica.info
|
|
|
@@ -387,13 +469,18 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
return self.get_migration(ctxt, migration.id)
|
|
|
|
|
|
- def migrate_instances(self, ctxt, origin, destination, instances):
|
|
|
- self._check_endpoints(ctxt, origin, destination)
|
|
|
+ def migrate_instances(self, ctxt, origin_endpoint_id,
|
|
|
+ destination_endpoint_id, destination_environment,
|
|
|
+ instances):
|
|
|
+ origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id)
|
|
|
+ destination_endpoint = self.get_endpoint(ctxt, destination_endpoint_id)
|
|
|
+ self._check_endpoints(ctxt, origin_endpoint, destination_endpoint)
|
|
|
|
|
|
migration = models.Migration()
|
|
|
migration.id = str(uuid.uuid4())
|
|
|
- migration.origin = origin
|
|
|
- migration.destination = destination
|
|
|
+ migration.origin_endpoint = origin_endpoint
|
|
|
+ migration.destination_endpoint = destination_endpoint
|
|
|
+ migration.destination_environment = destination_environment
|
|
|
execution = models.TasksExecution()
|
|
|
execution.status = constants.EXECUTION_STATUS_RUNNING
|
|
|
execution.number = 1
|
|
|
@@ -467,23 +554,32 @@ class ConductorServerEndpoint(object):
|
|
|
ctxt, task.id, constants.TASK_STATUS_CANCELED)
|
|
|
|
|
|
if not has_running_tasks:
|
|
|
- for task in execution.tasks:
|
|
|
- if task.status in [constants.TASK_STATUS_PENDING,
|
|
|
- constants.TASK_STATUS_ON_ERROR_ONLY]:
|
|
|
- if task.on_error:
|
|
|
- action = db_api.get_action(ctxt, execution.action_id)
|
|
|
- task_info = action.info.get(task.instance, {})
|
|
|
-
|
|
|
- self._rpc_worker_client.begin_task(
|
|
|
- ctxt, server=None,
|
|
|
- task_id=task.id,
|
|
|
- task_type=task.task_type,
|
|
|
- origin=action.origin,
|
|
|
- destination=action.destination,
|
|
|
- instance=task.instance,
|
|
|
- task_info=task_info)
|
|
|
-
|
|
|
- has_running_tasks = True
|
|
|
+ try:
|
|
|
+ origin = self._get_task_origin(ctxt, execution.action)
|
|
|
+ destination = self._get_task_destination(
|
|
|
+ ctxt, execution.action)
|
|
|
+
|
|
|
+ for task in execution.tasks:
|
|
|
+ if task.status in [constants.TASK_STATUS_PENDING,
|
|
|
+ constants.TASK_STATUS_ON_ERROR_ONLY]:
|
|
|
+ if task.on_error:
|
|
|
+ action = db_api.get_action(
|
|
|
+ ctxt, execution.action_id)
|
|
|
+ task_info = action.info.get(task.instance, {})
|
|
|
+
|
|
|
+ self._rpc_worker_client.begin_task(
|
|
|
+ ctxt, server=None,
|
|
|
+ task_id=task.id,
|
|
|
+ task_type=task.task_type,
|
|
|
+ origin=origin,
|
|
|
+ destination=destination,
|
|
|
+ instance=task.instance,
|
|
|
+ task_info=task_info)
|
|
|
+
|
|
|
+ has_running_tasks = True
|
|
|
+ except exception.NotFound as ex:
|
|
|
+ LOG.error("A required endpoint could not be found")
|
|
|
+ LOG.exception(ex)
|
|
|
|
|
|
if not has_running_tasks:
|
|
|
self._set_tasks_execution_status(
|
|
|
@@ -503,6 +599,9 @@ class ConductorServerEndpoint(object):
|
|
|
ctxt, task_id, constants.TASK_STATUS_RUNNING)
|
|
|
|
|
|
def _start_pending_tasks(self, ctxt, execution, parent_task, task_info):
|
|
|
+ origin = self._get_task_origin(ctxt, execution.action)
|
|
|
+ destination = self._get_task_destination(ctxt, execution.action)
|
|
|
+
|
|
|
for task in execution.tasks:
|
|
|
if task.status == constants.TASK_STATUS_PENDING:
|
|
|
if task.depends_on and parent_task.id in task.depends_on:
|
|
|
@@ -521,13 +620,12 @@ class ConductorServerEndpoint(object):
|
|
|
constants.TASK_TYPE_IMPORT_INSTANCE):
|
|
|
server = parent_task.host
|
|
|
|
|
|
- action = execution.action
|
|
|
self._rpc_worker_client.begin_task(
|
|
|
ctxt, server=server,
|
|
|
task_id=task.id,
|
|
|
task_type=task.task_type,
|
|
|
- origin=action.origin,
|
|
|
- destination=action.destination,
|
|
|
+ origin=origin,
|
|
|
+ destination=destination,
|
|
|
instance=task.instance,
|
|
|
task_info=task_info)
|
|
|
|