|
|
@@ -3,6 +3,8 @@
|
|
|
|
|
|
import copy
|
|
|
import functools
|
|
|
+import random
|
|
|
+import time
|
|
|
import uuid
|
|
|
|
|
|
from oslo_concurrency import lockutils
|
|
|
@@ -17,6 +19,7 @@ from coriolis import exception
|
|
|
from coriolis import keystone
|
|
|
from coriolis.licensing import client as licensing_client
|
|
|
from coriolis.replica_cron.rpc import client as rpc_cron_client
|
|
|
+from coriolis.scheduler.rpc import client as rpc_scheduler_client
|
|
|
from coriolis import schemas
|
|
|
from coriolis.tasks import factory as tasks_factory
|
|
|
from coriolis import utils
|
|
|
@@ -43,6 +46,14 @@ TASK_DEADLOCK_ERROR_MESSAGE = (
|
|
|
"A fatal deadlock has occurred. Further debugging is required. "
|
|
|
"Please review the Conductor logs and contact support for assistance.")
|
|
|
|
|
|
+RPC_TOPIC_TO_CLIENT_CLASS_MAP = {
|
|
|
+ constants.WORKER_MAIN_MESSAGING_TOPIC: rpc_worker_client.WorkerClient,
|
|
|
+ constants.SCHEDULER_MAIN_MESSAGING_TOPIC: (
|
|
|
+ rpc_scheduler_client.SchedulerClient),
|
|
|
+ constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC: (
|
|
|
+ rpc_cron_client.ReplicaCronClient)
|
|
|
+}
|
|
|
+
|
|
|
|
|
|
def endpoint_synchronized(func):
|
|
|
@functools.wraps(func)
|
|
|
@@ -132,21 +143,131 @@ def tasks_execution_synchronized(func):
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
+def region_synchronized(func):
|
|
|
+ @functools.wraps(func)
|
|
|
+ def wrapper(self, ctxt, region_id, *args, **kwargs):
|
|
|
+ @lockutils.synchronized(
|
|
|
+ constants.REGION_LOCK_NAME_FORMAT % region_id,
|
|
|
+ external=True)
|
|
|
+ def inner():
|
|
|
+ return func(self, ctxt, region_id, *args, **kwargs)
|
|
|
+ return inner()
|
|
|
+ return wrapper
|
|
|
+
|
|
|
+
|
|
|
+def service_synchronized(func):
|
|
|
+ @functools.wraps(func)
|
|
|
+ def wrapper(self, ctxt, service_id, *args, **kwargs):
|
|
|
+ @lockutils.synchronized(
|
|
|
+ constants.SERVICE_LOCK_NAME_FORMAT % service_id,
|
|
|
+ external=True)
|
|
|
+ def inner():
|
|
|
+ return func(self, ctxt, service_id, *args, **kwargs)
|
|
|
+ return inner()
|
|
|
+ return wrapper
|
|
|
+
|
|
|
+
|
|
|
class ConductorServerEndpoint(object):
|
|
|
def __init__(self):
|
|
|
self._licensing_client = licensing_client.LicensingClient.from_env()
|
|
|
- self._rpc_worker_client = rpc_worker_client.WorkerClient()
|
|
|
- self._replica_cron_client = rpc_cron_client.ReplicaCronClient()
|
|
|
+
|
|
|
+ # NOTE(aznashwan): it is unsafe to fork processes with pre-instantiated
|
|
|
+ # oslo_messaging clients as the underlying eventlet thread queues will
|
|
|
+ # be invalidated. Considering this class both serves from a "main
|
|
|
+ # process" as well as forking child processes, it is safest to
|
|
|
+ # re-instantiate the clients every time:
|
|
|
+ @property
|
|
|
+ def _rpc_worker_client(self):
|
|
|
+ return rpc_worker_client.WorkerClient()
|
|
|
+
|
|
|
+ @property
|
|
|
+ def _scheduler_client(self):
|
|
|
+ return rpc_scheduler_client.SchedulerClient()
|
|
|
+
|
|
|
+ @property
|
|
|
+ def _replica_cron_client(self):
|
|
|
+ return rpc_cron_client.ReplicaCronClient()
|
|
|
|
|
|
def get_all_diagnostics(self, ctxt):
|
|
|
- conductor = self.get_diagnostics(ctxt)
|
|
|
- cron = self._replica_cron_client.get_diagnostics(ctxt)
|
|
|
- worker = self._rpc_worker_client.get_diagnostics(ctxt)
|
|
|
- return [
|
|
|
- conductor,
|
|
|
- cron,
|
|
|
- worker,
|
|
|
- ]
|
|
|
+ diagnostics = [
|
|
|
+ self.get_diagnostics(ctxt),
|
|
|
+ self._replica_cron_client.get_diagnostics(ctxt),
|
|
|
+ self._scheduler_client.get_diagnostics(ctxt)]
|
|
|
+ worker_diagnostics = []
|
|
|
+ for worker_service in self._scheduler_client.get_workers_for_specs(
|
|
|
+ ctxt):
|
|
|
+ worker_rpc = self._get_rpc_client_for_service(worker_service)
|
|
|
+ diagnostics.append(worker_rpc.get_diagnostics(ctxt))
|
|
|
+
|
|
|
+ return diagnostics
|
|
|
+
|
|
|
+ def _get_rpc_client_for_service(self, service, *client_args, **client_kwargs):
|
|
|
+ rpc_client_class = RPC_TOPIC_TO_CLIENT_CLASS_MAP.get(service.topic)
|
|
|
+ if not rpc_client_class:
|
|
|
+ raise exception.NotFound(
|
|
|
+ "No RPC client class for service with topic '%s'." % (
|
|
|
+ service.topic))
|
|
|
+
|
|
|
+ topic = service.topic
|
|
|
+ if service.topic == constants.WORKER_MAIN_MESSAGING_TOPIC:
|
|
|
+ # NOTE: coriolis.service.MessagingService-type services (such
|
|
|
+ # as the worker), always have a dedicated per-host queue
|
|
|
+ # which can be used to target the service:
|
|
|
+ topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % ({
|
|
|
+ "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
|
|
|
+ "host": service.host})
|
|
|
+
|
|
|
+ return rpc_client_class(*client_args, topic=topic, **client_kwargs)
|
|
|
+
|
|
|
+ def _get_any_worker_service(self, ctxt, random_choice=False, raw_dict=False):
|
|
|
+ services = self._scheduler_client.get_workers_for_specs(ctxt)
|
|
|
+ if not services:
|
|
|
+ raise exception.NoWorkerServiceError()
|
|
|
+ service = services[0]
|
|
|
+ if random_choice:
|
|
|
+ service = random.choice(services)
|
|
|
+ if raw_dict:
|
|
|
+ return service
|
|
|
+ return db_api.get_service(ctxt, service['id'])
|
|
|
+
|
|
|
+ def _get_worker_rpc_for_host(self, host, *client_args, **client_kwargs):
|
|
|
+ rpc_client_class = RPC_TOPIC_TO_CLIENT_CLASS_MAP[
|
|
|
+ constants.WORKER_MAIN_MESSAGING_TOPIC]
|
|
|
+ topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % ({
|
|
|
+ "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
|
|
|
+ "host": host})
|
|
|
+ return rpc_client_class(topic=topic, *client_args, **client_kwargs)
|
|
|
+
|
|
|
+ def _get_worker_service_rpc_for_specs(
|
|
|
+ self, ctxt, provider_requirements=None, region_sets=None,
|
|
|
+ enabled=True, random_choice=False, raise_on_no_matches=True):
|
|
|
+ requirements_str = (
|
|
|
+ "enabled=%s; region_sets=%s; provider_requirements=%s" % (
|
|
|
+ enabled, region_sets, provider_requirements))
|
|
|
+ LOG.info(
|
|
|
+ "Requesting Worker Service from scheduler with the following "
|
|
|
+ "specifications: %s", requirements_str)
|
|
|
+ services = self._scheduler_client.get_workers_for_specs(
|
|
|
+ ctxt, provider_requirements=provider_requirements,
|
|
|
+ region_sets=region_sets, enabled=enabled)
|
|
|
+ if not services:
|
|
|
+ if raise_on_no_matches:
|
|
|
+ raise exception.NoSuitableWorkerServiceError()
|
|
|
+ return None
|
|
|
+ LOG.debug(
|
|
|
+ "Was offered Worker Services with the following IDs for "
|
|
|
+ "requirements '%s': %s",
|
|
|
+ requirements_str, [s["id"] for s in services])
|
|
|
+
|
|
|
+ selected_service = services[0]
|
|
|
+ if random_choice:
|
|
|
+ selected_service = random.choice(services)
|
|
|
+ service = db_api.get_service(ctxt, selected_service["id"])
|
|
|
+
|
|
|
+ LOG.info(
|
|
|
+ "Was offered Worker Service with ID '%s' for requirements: %s",
|
|
|
+ service.id, requirements_str)
|
|
|
+ return self._get_rpc_client_for_service(service)
|
|
|
|
|
|
def _check_delete_reservation_for_transfer(self, transfer_action):
|
|
|
action_id = transfer_action.base_id
|
|
|
@@ -204,8 +325,9 @@ class ConductorServerEndpoint(object):
|
|
|
"all reservation licensing checks.", action_id)
|
|
|
|
|
|
def create_endpoint(self, ctxt, name, endpoint_type, description,
|
|
|
- connection_info):
|
|
|
+ connection_info, mapped_regions=None):
|
|
|
endpoint = models.Endpoint()
|
|
|
+ endpoint.id = str(uuid.uuid4())
|
|
|
endpoint.name = name
|
|
|
endpoint.type = endpoint_type
|
|
|
endpoint.description = description
|
|
|
@@ -213,12 +335,31 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
db_api.add_endpoint(ctxt, endpoint)
|
|
|
LOG.info("Endpoint created: %s", endpoint.id)
|
|
|
+
|
|
|
+ # add region associations:
|
|
|
+ if mapped_regions:
|
|
|
+ try:
|
|
|
+ db_api.update_endpoint(
|
|
|
+ ctxt, endpoint.id, {
|
|
|
+ "mapped_regions": mapped_regions})
|
|
|
+ except Exception as ex:
|
|
|
+ LOG.warn(
|
|
|
+ "Error adding region mappings during new endpoint creation "
|
|
|
+ "(name: %s), cleaning up endpoint and all created "
|
|
|
+ "mappings for regions: %s", endpoint.name, mapped_regions)
|
|
|
+ db_api.delete_endpoint(ctxt, endpoint.id)
|
|
|
+ raise
|
|
|
+
|
|
|
return self.get_endpoint(ctxt, endpoint.id)
|
|
|
|
|
|
+ @endpoint_synchronized
|
|
|
def update_endpoint(self, ctxt, endpoint_id, updated_values):
|
|
|
+ LOG.info(
|
|
|
+ "Attempting to update endpoint '%s' with payload: %s",
|
|
|
+ endpoint_id, updated_values)
|
|
|
db_api.update_endpoint(ctxt, endpoint_id, updated_values)
|
|
|
LOG.info("Endpoint updated: %s", endpoint_id)
|
|
|
- return self.get_endpoint(ctxt, endpoint_id)
|
|
|
+ return db_api.get_endpoint(ctxt, endpoint_id)
|
|
|
|
|
|
def get_endpoints(self, ctxt):
|
|
|
return db_api.get_endpoints(ctxt)
|
|
|
@@ -244,7 +385,12 @@ class ConductorServerEndpoint(object):
|
|
|
marker, limit, instance_name_pattern):
|
|
|
endpoint = self.get_endpoint(ctxt, endpoint_id)
|
|
|
|
|
|
- return self._rpc_worker_client.get_endpoint_instances(
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_specs(
|
|
|
+ ctxt, enabled=True,
|
|
|
+ region_sets=[[reg.id for reg in endpoint.mapped_regions]],
|
|
|
+ provider_requirements={
|
|
|
+ endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_INSTANCES]})
|
|
|
+ return worker_rpc.get_endpoint_instances(
|
|
|
ctxt, endpoint.type, endpoint.connection_info,
|
|
|
source_environment, marker, limit, instance_name_pattern)
|
|
|
|
|
|
@@ -252,7 +398,13 @@ class ConductorServerEndpoint(object):
|
|
|
self, ctxt, endpoint_id, source_environment, instance_name):
|
|
|
endpoint = self.get_endpoint(ctxt, endpoint_id)
|
|
|
|
|
|
- return self._rpc_worker_client.get_endpoint_instance(
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_specs(
|
|
|
+ ctxt, enabled=True,
|
|
|
+ region_sets=[[reg.id for reg in endpoint.mapped_regions]],
|
|
|
+ provider_requirements={
|
|
|
+ endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_INSTANCES]})
|
|
|
+
|
|
|
+ return worker_rpc.get_endpoint_instance(
|
|
|
ctxt, endpoint.type, endpoint.connection_info,
|
|
|
source_environment, instance_name)
|
|
|
|
|
|
@@ -260,50 +412,102 @@ class ConductorServerEndpoint(object):
|
|
|
self, ctxt, endpoint_id, env, option_names):
|
|
|
endpoint = self.get_endpoint(ctxt, endpoint_id)
|
|
|
|
|
|
- return self._rpc_worker_client.get_endpoint_source_options(
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_specs(
|
|
|
+ ctxt, enabled=True,
|
|
|
+ region_sets=[[reg.id for reg in endpoint.mapped_regions]],
|
|
|
+ provider_requirements={
|
|
|
+ endpoint.type: [
|
|
|
+ constants.PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS]})
|
|
|
+
|
|
|
+ return worker_rpc.get_endpoint_source_options(
|
|
|
ctxt, endpoint.type, endpoint.connection_info, env, option_names)
|
|
|
|
|
|
def get_endpoint_destination_options(
|
|
|
self, ctxt, endpoint_id, env, option_names):
|
|
|
endpoint = self.get_endpoint(ctxt, endpoint_id)
|
|
|
|
|
|
- return self._rpc_worker_client.get_endpoint_destination_options(
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_specs(
|
|
|
+ ctxt, enabled=True,
|
|
|
+ region_sets=[[reg.id for reg in endpoint.mapped_regions]],
|
|
|
+ provider_requirements={
|
|
|
+ endpoint.type: [
|
|
|
+ constants.PROVIDER_TYPE_DESTINATION_ENDPOINT_OPTIONS]})
|
|
|
+ return worker_rpc.get_endpoint_destination_options(
|
|
|
ctxt, endpoint.type, endpoint.connection_info, env, option_names)
|
|
|
|
|
|
def get_endpoint_networks(self, ctxt, endpoint_id, env):
|
|
|
endpoint = self.get_endpoint(ctxt, endpoint_id)
|
|
|
|
|
|
- return self._rpc_worker_client.get_endpoint_networks(
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_specs(
|
|
|
+ ctxt, enabled=True,
|
|
|
+ region_sets=[[reg.id for reg in endpoint.mapped_regions]],
|
|
|
+ provider_requirements={
|
|
|
+ endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_NETWORKS]})
|
|
|
+
|
|
|
+ return worker_rpc.get_endpoint_networks(
|
|
|
ctxt, endpoint.type, endpoint.connection_info, env)
|
|
|
|
|
|
def get_endpoint_storage(self, ctxt, endpoint_id, env):
|
|
|
endpoint = self.get_endpoint(ctxt, endpoint_id)
|
|
|
|
|
|
- return self._rpc_worker_client.get_endpoint_storage(
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_specs(
|
|
|
+ ctxt, enabled=True,
|
|
|
+ region_sets=[[reg.id for reg in endpoint.mapped_regions]],
|
|
|
+ provider_requirements={
|
|
|
+ endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_STORAGE]})
|
|
|
+
|
|
|
+ return worker_rpc.get_endpoint_storage(
|
|
|
ctxt, endpoint.type, endpoint.connection_info, env)
|
|
|
|
|
|
def validate_endpoint_connection(self, ctxt, endpoint_id):
|
|
|
endpoint = self.get_endpoint(ctxt, endpoint_id)
|
|
|
- return self._rpc_worker_client.validate_endpoint_connection(
|
|
|
+
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_specs(
|
|
|
+ ctxt, enabled=True,
|
|
|
+ region_sets=[[reg.id for reg in endpoint.mapped_regions]],
|
|
|
+ provider_requirements={
|
|
|
+ endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT]})
|
|
|
+
|
|
|
+ return worker_rpc.validate_endpoint_connection(
|
|
|
ctxt, endpoint.type, endpoint.connection_info)
|
|
|
|
|
|
def validate_endpoint_target_environment(
|
|
|
self, ctxt, endpoint_id, target_env):
|
|
|
endpoint = self.get_endpoint(ctxt, endpoint_id)
|
|
|
- return self._rpc_worker_client.validate_endpoint_target_environment(
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_specs(
|
|
|
+ ctxt, enabled=True,
|
|
|
+ region_sets=[[reg.id for reg in endpoint.mapped_regions]],
|
|
|
+ provider_requirements={
|
|
|
+ endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT]})
|
|
|
+
|
|
|
+ return worker_rpc.validate_endpoint_target_environment(
|
|
|
ctxt, endpoint.type, target_env)
|
|
|
|
|
|
def validate_endpoint_source_environment(
|
|
|
self, ctxt, endpoint_id, source_env):
|
|
|
endpoint = self.get_endpoint(ctxt, endpoint_id)
|
|
|
- return self._rpc_worker_client.validate_endpoint_source_environment(
|
|
|
+
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_specs(
|
|
|
+ ctxt, enabled=True,
|
|
|
+ region_sets=[[reg.id for reg in endpoint.mapped_regions]],
|
|
|
+ provider_requirements={
|
|
|
+ endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT]})
|
|
|
+
|
|
|
+ return worker_rpc.validate_endpoint_source_environment(
|
|
|
ctxt, endpoint.type, source_env)
|
|
|
|
|
|
def get_available_providers(self, ctxt):
|
|
|
- return self._rpc_worker_client.get_available_providers(ctxt)
|
|
|
+ # TODO(aznashwan): merge list of all providers from all
|
|
|
+ # worker services:
|
|
|
+ worker_rpc = self._get_rpc_client_for_service(
|
|
|
+ self._get_any_worker_service(ctxt))
|
|
|
+ return worker_rpc.get_available_providers(ctxt)
|
|
|
|
|
|
def get_provider_schemas(self, ctxt, platform_name, provider_type):
|
|
|
- return self._rpc_worker_client.get_provider_schemas(
|
|
|
+ # TODO(aznashwan): merge or version/namespace schemas for each worker?
|
|
|
+ worker_rpc = self._get_rpc_client_for_service(
|
|
|
+ self._get_any_worker_service(ctxt))
|
|
|
+ return worker_rpc.get_provider_schemas(
|
|
|
ctxt, platform_name, provider_type)
|
|
|
|
|
|
@staticmethod
|
|
|
@@ -360,7 +564,85 @@ class ConductorServerEndpoint(object):
|
|
|
"target_environment": action.destination_environment
|
|
|
}
|
|
|
|
|
|
- def _begin_tasks(self, ctxt, execution, task_info={}):
|
|
|
+ def _get_worker_service_rpc_for_task(
|
|
|
+ self, ctxt, task, origin_endpoint, destination_endpoint,
|
|
|
+ retry_count=5, retry_period=2):
|
|
|
+ LOG.debug(
|
|
|
+ "Compiling required Worker Service specs for task with "
|
|
|
+ "ID '%s' (type '%s') from endpoints '%s' to '%s'",
|
|
|
+ task.id, task.task_type, origin_endpoint.id,
|
|
|
+ destination_endpoint.id)
|
|
|
+ task_cls = tasks_factory.get_task_runner_class(task.task_type)
|
|
|
+
|
|
|
+ # determine required Coriolis regions based on the endpoints:
|
|
|
+ required_region_sets = []
|
|
|
+ origin_endpoint_region_ids = [
|
|
|
+ r.id for r in origin_endpoint.mapped_regions]
|
|
|
+ destination_endpoint_region_ids = [
|
|
|
+ r.id for r in destination_endpoint.mapped_regions]
|
|
|
+
|
|
|
+ required_platform = task_cls.get_required_platform()
|
|
|
+ if required_platform in (
|
|
|
+ constants.TASK_PLATFORM_SOURCE,
|
|
|
+ constants.TASK_PLATFORM_BILATERAL):
|
|
|
+ required_region_sets.append(origin_endpoint_region_ids)
|
|
|
+ if required_platform in (
|
|
|
+ constants.TASK_PLATFORM_DESTINATION,
|
|
|
+ constants.TASK_PLATFORM_BILATERAL):
|
|
|
+ required_region_sets.append(destination_endpoint_region_ids)
|
|
|
+
|
|
|
+ # determine provider requirements:
|
|
|
+ provider_requirements = {}
|
|
|
+ required_provider_types = task_cls.get_required_provider_types()
|
|
|
+ if constants.PROVIDER_PLATFORM_SOURCE in required_provider_types:
|
|
|
+ provider_requirements[origin_endpoint.type] = (
|
|
|
+ required_provider_types[
|
|
|
+ constants.PROVIDER_PLATFORM_SOURCE])
|
|
|
+ if constants.PROVIDER_PLATFORM_DESTINATION in required_provider_types:
|
|
|
+ provider_requirements[destination_endpoint.type] = (
|
|
|
+ required_provider_types[
|
|
|
+ constants.PROVIDER_PLATFORM_DESTINATION])
|
|
|
+
|
|
|
+ worker_rpc = None
|
|
|
+ for i in range(retry_count):
|
|
|
+ try:
|
|
|
+ LOG.debug(
|
|
|
+ "Requesting Worker Service for task with ID '%s' (type "
|
|
|
+ "'%s') from endpoints '%s' to '%s'", task.id,
|
|
|
+ task.task_type, origin_endpoint.id,
|
|
|
+ destination_endpoint.id)
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_specs(
|
|
|
+ ctxt, provider_requirements=provider_requirements,
|
|
|
+ region_sets=required_region_sets, enabled=True)
|
|
|
+ LOG.debug(
|
|
|
+ "Scheduler has granted Worker Service for task with ID "
|
|
|
+ "'%s' (type '%s') from endpoints '%s' to '%s'",
|
|
|
+ task.id, task.task_type, origin_endpoint.id,
|
|
|
+ destination_endpoint.id)
|
|
|
+ return worker_rpc
|
|
|
+ except Exception as ex:
|
|
|
+ LOG.warn(
|
|
|
+ "Failed to schedule task with ID '%s' (attempt %d/%d). "
|
|
|
+ "Waiting %d seconds and then retrying. Error was: %s",
|
|
|
+ task.id, i+1, retry_count, retry_period,
|
|
|
+ utils.get_exception_details())
|
|
|
+ time.sleep(retry_period)
|
|
|
+
|
|
|
+ message = (
|
|
|
+ "Failed to schedule task %s after %d tries. This may indicate that"
|
|
|
+ " there are no Coriolis Worker services able to perform the task "
|
|
|
+ "on the platforms and in the Coriolis Regions required by the "
|
|
|
+ "selected source/destination Coriolis Endpoints. Please review"
|
|
|
+ " the Conductor and Scheduler logs for more exact details." % (
|
|
|
+ task.id, retry_count))
|
|
|
+ db_api.set_task_status(
|
|
|
+ ctxt, task.id, constants.TASK_STATUS_FAILED_TO_SCHEDULE,
|
|
|
+ exception_details=message)
|
|
|
+ raise exception.NoSuitableWorkerServiceError(message)
|
|
|
+
|
|
|
+ def _begin_tasks(
|
|
|
+ self, ctxt, execution, task_info={},
|
|
|
+ scheduling_retry_count=5, scheduling_retry_period=2):
|
|
|
""" Starts all non-error-only tasks which have no depencies. """
|
|
|
if not ctxt.trust_id:
|
|
|
keystone.create_trust(ctxt)
|
|
|
@@ -368,6 +650,10 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
origin = self._get_task_origin(ctxt, execution.action)
|
|
|
destination = self._get_task_destination(ctxt, execution.action)
|
|
|
+ origin_endpoint = db_api.get_endpoint(
|
|
|
+ ctxt, execution.action.origin_endpoint_id)
|
|
|
+ destination_endpoint = db_api.get_endpoint(
|
|
|
+ ctxt, execution.action.destination_endpoint_id)
|
|
|
|
|
|
newly_started_tasks = []
|
|
|
for task in execution.tasks:
|
|
|
@@ -378,14 +664,27 @@ class ConductorServerEndpoint(object):
|
|
|
task.id, execution.id)
|
|
|
db_api.set_task_status(
|
|
|
ctxt, task.id, constants.TASK_STATUS_PENDING)
|
|
|
- self._rpc_worker_client.begin_task(
|
|
|
- ctxt, server=None,
|
|
|
- task_id=task.id,
|
|
|
- task_type=task.task_type,
|
|
|
- origin=origin,
|
|
|
- destination=destination,
|
|
|
- instance=task.instance,
|
|
|
- task_info=task_info.get(task.instance, {}))
|
|
|
+ try:
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_task(
|
|
|
+ ctxt, task, origin_endpoint, destination_endpoint,
|
|
|
+ retry_count=scheduling_retry_count,
|
|
|
+ retry_period=scheduling_retry_period)
|
|
|
+ worker_rpc.begin_task(
|
|
|
+ ctxt, server=None,
|
|
|
+ task_id=task.id,
|
|
|
+ task_type=task.task_type,
|
|
|
+ origin=origin,
|
|
|
+ destination=destination,
|
|
|
+ instance=task.instance,
|
|
|
+ task_info=task_info.get(task.instance, {}))
|
|
|
+ except Exception as ex:
|
|
|
+ LOG.warn(
|
|
|
+ "Error occured while starting new task '%s'. "
|
|
|
+ "Cancelling execution '%s'. Error was: %s",
|
|
|
+ task.id, execution.id, utils.get_exception_details())
|
|
|
+ self._cancel_tasks_execution(
|
|
|
+ ctxt, execution, requery=True)
|
|
|
+ raise
|
|
|
newly_started_tasks.append(task.id)
|
|
|
|
|
|
# NOTE: this should never happen if _check_execution_tasks_sanity
|
|
|
@@ -409,9 +708,9 @@ class ConductorServerEndpoint(object):
|
|
|
for instance in all_instances_in_tasks}
|
|
|
|
|
|
def _check_task_cls_param_requirements(task, instance_task_info_keys):
|
|
|
- task_cls = tasks_factory.get_task_runner_class(task.task_type)()
|
|
|
+ task_cls = tasks_factory.get_task_runner_class(task.task_type)
|
|
|
missing_params = [
|
|
|
- p for p in task_cls.required_task_info_properties
|
|
|
+ p for p in task_cls.get_required_task_info_properties()
|
|
|
if p not in instance_task_info_keys]
|
|
|
if missing_params:
|
|
|
raise exception.CoriolisException(
|
|
|
@@ -420,7 +719,7 @@ class ConductorServerEndpoint(object):
|
|
|
"type '%s': %s" % (
|
|
|
task.instance, task.id, task.task_type,
|
|
|
missing_params))
|
|
|
- return task_cls.returned_task_info_properties
|
|
|
+ return task_cls.get_returned_task_info_properties()
|
|
|
|
|
|
for instance, instance_tasks in instances_tasks_mapping.items():
|
|
|
task_info_keys = set(initial_task_info.get(
|
|
|
@@ -746,7 +1045,7 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
self._check_execution_tasks_sanity(execution, replica.info)
|
|
|
|
|
|
- # update the action info for all of the Replicas' instnaces:
|
|
|
+ # update the action info for all of the Replicas' instances:
|
|
|
for instance in replica.instances:
|
|
|
db_api.update_transfer_action_info_for_instance(
|
|
|
ctxt, replica.id, instance, replica.info[instance])
|
|
|
@@ -758,6 +1057,12 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
@staticmethod
|
|
|
def _check_endpoints(ctxt, origin_endpoint, destination_endpoint):
|
|
|
+ if origin_endpoint.id == destination_endpoint.id:
|
|
|
+ raise exception.SameDestination(
|
|
|
+ "The origin and destination endpoints cannot be the same. "
|
|
|
+ "If you need to perform operations across two areas of "
|
|
|
+ "the same platform (ex: migrating across public cloud regions)"
|
|
|
+ ", please create two separate endpoints.")
|
|
|
# TODO(alexpilotti): check Barbican secrets content as well
|
|
|
if (origin_endpoint.connection_info ==
|
|
|
destination_endpoint.connection_info):
|
|
|
@@ -773,8 +1078,8 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
replica = models.Replica()
|
|
|
replica.id = str(uuid.uuid4())
|
|
|
- replica.origin_endpoint = origin_endpoint
|
|
|
- replica.destination_endpoint = destination_endpoint
|
|
|
+ replica.origin_endpoint_id = origin_endpoint_id
|
|
|
+ replica.destination_endpoint_id = destination_endpoint_id
|
|
|
replica.destination_environment = destination_environment
|
|
|
replica.source_environment = source_environment
|
|
|
replica.instances = instances
|
|
|
@@ -1036,8 +1341,8 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
migration = models.Migration()
|
|
|
migration.id = str(uuid.uuid4())
|
|
|
- migration.origin_endpoint = origin_endpoint
|
|
|
- migration.destination_endpoint = destination_endpoint
|
|
|
+ migration.origin_endpoint_id = origin_endpoint_id
|
|
|
+ migration.destination_endpoint_id = destination_endpoint_id
|
|
|
migration.destination_environment = destination_environment
|
|
|
migration.source_environment = source_environment
|
|
|
migration.network_map = network_map
|
|
|
@@ -1318,8 +1623,24 @@ class ConductorServerEndpoint(object):
|
|
|
continue
|
|
|
|
|
|
if task.status in (
|
|
|
- constants.TASK_STATUS_RUNNING,
|
|
|
- constants.TASK_STATUS_PENDING):
|
|
|
+ constants.TASK_STATUS_PENDING,
|
|
|
+ constants.TASK_STATUS_STARTING):
|
|
|
+ # any PENDING/STARTING tasks means that they did not have a
|
|
|
+ # host assigned to them yet, and presuming the host does not
|
|
|
+ # start executing the task until it marks itself as the runner,
|
|
|
+ # we can just mark the task as cancelled:
|
|
|
+ LOG.debug(
|
|
|
+ "Setting currently '%s' task '%s' to '%s' as part of the "
|
|
|
+ "cancellation of execution '%s'",
|
|
|
+ task.status, task.id,
|
|
|
+ constants.TASK_STATUS_UNSCHEDULED, execution.id)
|
|
|
+ db_api.set_task_status(
|
|
|
+ ctxt, task.id, constants.TASK_STATUS_UNSCHEDULED,
|
|
|
+ exception_details=(
|
|
|
+ "This task was already pending execution but was "
|
|
|
+ "unscheduled during the cancellation of the parent "
|
|
|
+ "tasks execution."))
|
|
|
+ elif task.status == constants.TASK_STATUS_RUNNING:
|
|
|
# cancel any currently running/pending non-error tasks:
|
|
|
if not task.on_error:
|
|
|
LOG.debug(
|
|
|
@@ -1328,7 +1649,8 @@ class ConductorServerEndpoint(object):
|
|
|
task.status, task.id, execution.id)
|
|
|
db_api.set_task_status(
|
|
|
ctxt, task.id, constants.TASK_STATUS_CANCELLING)
|
|
|
- self._rpc_worker_client.cancel_task(
|
|
|
+ worker_rpc = self._get_worker_rpc_for_host(task.host)
|
|
|
+ worker_rpc.cancel_task(
|
|
|
ctxt, task.host, task.id, task.process_id, force)
|
|
|
# let any on-error tasks run to completion but mark
|
|
|
# them as CANCELLING_AFTER_COMPLETION so they will
|
|
|
@@ -1361,7 +1683,8 @@ class ConductorServerEndpoint(object):
|
|
|
"execution '%s'",
|
|
|
task.id, task.status, task.on_error, execution.id)
|
|
|
|
|
|
- started_tasks = self._advance_execution_state(ctxt, execution)
|
|
|
+ started_tasks = self._advance_execution_state(
|
|
|
+ ctxt, execution, requery=True)
|
|
|
if started_tasks:
|
|
|
LOG.info(
|
|
|
"The following tasks were started after state advancement "
|
|
|
@@ -1382,11 +1705,11 @@ class ConductorServerEndpoint(object):
|
|
|
keystone.delete_trust(ctxt)
|
|
|
|
|
|
@parent_tasks_execution_synchronized
|
|
|
- def set_task_host(self, ctxt, task_id, host, process_id):
|
|
|
- """ Saves the ID of the worker host which has accepted and started
|
|
|
- the task to the DB and marks the task as 'RUNNING'. """
|
|
|
+ def set_task_host(self, ctxt, task_id, host):
|
|
|
+ """ Saves the ID of the worker host which has accepted
|
|
|
+ the task to the DB and marks the task as STARTING. """
|
|
|
task = db_api.get_task(ctxt, task_id)
|
|
|
- new_status = constants.TASK_STATUS_RUNNING
|
|
|
+ new_status = constants.TASK_STATUS_STARTING
|
|
|
exception_details = None
|
|
|
if task.status == constants.TASK_STATUS_CANCELLING:
|
|
|
raise exception.TaskIsCancelling(task_id=task_id)
|
|
|
@@ -1409,10 +1732,45 @@ class ConductorServerEndpoint(object):
|
|
|
"Task with ID '%s' is in '%s' status instead of the "
|
|
|
"expected '%s' required for it to have a task host set." % (
|
|
|
task_id, task.status, constants.TASK_STATUS_PENDING))
|
|
|
- db_api.set_task_host(ctxt, task_id, host, process_id)
|
|
|
+ LOG.info(
|
|
|
+ "Setting host for task with ID '%s' to '%s'", task_id, host)
|
|
|
+ db_api.set_task_host_properties(ctxt, task_id, host=host)
|
|
|
db_api.set_task_status(
|
|
|
ctxt, task_id, new_status,
|
|
|
exception_details=exception_details)
|
|
|
+ LOG.info(
|
|
|
+ "Successfully set host for task with ID '%s' to '%s'",
|
|
|
+ task_id, host)
|
|
|
+
|
|
|
+ @parent_tasks_execution_synchronized
|
|
|
+ def set_task_process(self, ctxt, task_id, process_id):
|
|
|
+ """ Sets the ID of the Worker-side process for the given task,
|
|
|
+ and marks the task as actually 'RUNNING'. """
|
|
|
+ task = db_api.get_task(ctxt, task_id)
|
|
|
+ if not task.host:
|
|
|
+ raise exception.InvalidTaskState(
|
|
|
+ "Task with ID '%s' (current status '%s') has no host set "
|
|
|
+ "for it. Cannot set host process." % (
|
|
|
+ task_id, task.status))
|
|
|
+ acceptable_statuses = [
|
|
|
+ constants.TASK_STATUS_STARTING,
|
|
|
+ constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION]
|
|
|
+ if task.status not in acceptable_statuses:
|
|
|
+ raise exception.InvalidTaskState(
|
|
|
+ "Task with ID '%s' is in '%s' status instead of the "
|
|
|
+ "expected statuses (%s) required for it to have a task "
|
|
|
+ "process set." % (
|
|
|
+ task_id, task.status, acceptable_statuses))
|
|
|
+
|
|
|
+ LOG.info(
|
|
|
+ "Setting process '%s' (host %s) for task '%s' and transitioning "
|
|
|
+ "it from status '%s' to '%s'", process_id, task.host, task_id,
|
|
|
+ task.status, constants.TASK_STATUS_RUNNING)
|
|
|
+ db_api.set_task_host_properties(ctxt, task_id, process_id=process_id)
|
|
|
+ db_api.set_task_status(ctxt, task_id, constants.TASK_STATUS_RUNNING)
|
|
|
+ LOG.info(
|
|
|
+ "Successfully set task process for task with ID '%s' to '%s'",
|
|
|
+ task_id, process_id)
|
|
|
|
|
|
def _check_clean_execution_deadlock(
|
|
|
self, ctxt, execution, task_statuses=None, requery=True):
|
|
|
@@ -1464,7 +1822,7 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
def _get_execution_status(self, ctxt, execution, requery=False):
|
|
|
""" Returns the global status of an execution.
|
|
|
- RUNNING - at least one task is RUNNING, PENDING or CANCELLING
|
|
|
+ RUNNING - at least one task is RUNNING, STARTING, PENDING or CANCELLING
|
|
|
COMPLETED - all non-error-only tasks are COMPLETED
|
|
|
CANCELED - no more RUNNING/PENDING/SCHEDULED tasks but some CANCELED
|
|
|
CANCELIING - at least one task in CANCELLING status
|
|
|
@@ -1485,7 +1843,9 @@ class ConductorServerEndpoint(object):
|
|
|
is_running = True
|
|
|
if task.status in constants.CANCELED_TASK_STATUSES:
|
|
|
is_canceled = True
|
|
|
- if task.status == constants.TASK_STATUS_ERROR:
|
|
|
+ if task.status in (
|
|
|
+ constants.TASK_STATUS_ERROR,
|
|
|
+ constants.TASK_STATUS_FAILED_TO_SCHEDULE):
|
|
|
is_errord = True
|
|
|
if task.status in (
|
|
|
constants.TASK_STATUS_CANCELLING,
|
|
|
@@ -1577,6 +1937,10 @@ class ConductorServerEndpoint(object):
|
|
|
origin = self._get_task_origin(ctxt, execution.action)
|
|
|
destination = self._get_task_destination(ctxt, execution.action)
|
|
|
action = db_api.get_action(ctxt, execution.action_id)
|
|
|
+ origin_endpoint = db_api.get_endpoint(
|
|
|
+ ctxt, execution.action.origin_endpoint_id)
|
|
|
+ destination_endpoint = db_api.get_endpoint(
|
|
|
+ ctxt, execution.action.destination_endpoint_id)
|
|
|
|
|
|
started_tasks = []
|
|
|
|
|
|
@@ -1594,15 +1958,31 @@ class ConductorServerEndpoint(object):
|
|
|
task_info = action.info[task.instance]
|
|
|
db_api.set_task_status(
|
|
|
ctxt, task.id, constants.TASK_STATUS_PENDING)
|
|
|
- self._rpc_worker_client.begin_task(
|
|
|
- ctxt, server=None,
|
|
|
- task_id=task.id,
|
|
|
- task_type=task.task_type,
|
|
|
- origin=origin,
|
|
|
- destination=destination,
|
|
|
- instance=task.instance,
|
|
|
- task_info=task_info)
|
|
|
- started_tasks.append(task.id)
|
|
|
+ try:
|
|
|
+ worker_rpc = self._get_worker_service_rpc_for_task(
|
|
|
+ ctxt, task, origin_endpoint, destination_endpoint)
|
|
|
+ worker_rpc.begin_task(
|
|
|
+ ctxt, server=None,
|
|
|
+ task_id=task.id,
|
|
|
+ task_type=task.task_type,
|
|
|
+ origin=origin,
|
|
|
+ destination=destination,
|
|
|
+ instance=task.instance,
|
|
|
+ task_info=task_info)
|
|
|
+ LOG.debug(
|
|
|
+ "Successfully started task with ID '%s' (type '%s') "
|
|
|
+ "for execution '%s'", task.id, task.task_type,
|
|
|
+ execution.id)
|
|
|
+ started_tasks.append(task.id)
|
|
|
+ return constants.TASK_STATUS_PENDING
|
|
|
+ except Exception as ex:
|
|
|
+ LOG.warn(
|
|
|
+ "Error occured while starting new task '%s'. "
|
|
|
+ "Cancelling execution '%s'. Error was: %s",
|
|
|
+ task.id, execution.id, utils.get_exception_details())
|
|
|
+ self._cancel_tasks_execution(
|
|
|
+ ctxt, execution, requery=True)
|
|
|
+ raise
|
|
|
|
|
|
# aggregate all tasks and statuses:
|
|
|
task_statuses = {}
|
|
|
@@ -1622,7 +2002,7 @@ class ConductorServerEndpoint(object):
|
|
|
LOG.debug(
|
|
|
"All task statuses before execution '%s' lifecycle iteration "
|
|
|
"(for tasks of instance '%s'): %s",
|
|
|
- instance, execution.id, task_statuses)
|
|
|
+ execution.id, instance, task_statuses)
|
|
|
|
|
|
# NOTE: the tasks are saved in a random order in the DB, which
|
|
|
# complicates the processing logic so we just pre-sort:
|
|
|
@@ -1634,8 +2014,7 @@ class ConductorServerEndpoint(object):
|
|
|
if not task_deps[task.id]:
|
|
|
LOG.info(
|
|
|
"Starting depency-less task '%s'", task.id)
|
|
|
- _start_task(task)
|
|
|
- task_statuses[task.id] = constants.TASK_STATUS_PENDING
|
|
|
+ task_statuses[task.id] = _start_task(task)
|
|
|
continue
|
|
|
|
|
|
parent_task_statuses = {
|
|
|
@@ -1676,9 +2055,7 @@ class ConductorServerEndpoint(object):
|
|
|
"Starting task '%s' as all dependencies have "
|
|
|
"completed successfully: %s",
|
|
|
task.id, parent_task_statuses)
|
|
|
- _start_task(task)
|
|
|
- task_statuses[task.id] = (
|
|
|
- constants.TASK_STATUS_PENDING)
|
|
|
+ task_statuses[task.id] = _start_task(task)
|
|
|
else:
|
|
|
# it means one/more parents error'd/unscheduled
|
|
|
# so we mark this task as unscheduled:
|
|
|
@@ -1712,9 +2089,7 @@ class ConductorServerEndpoint(object):
|
|
|
"non-error parent (%s) was completed: %s",
|
|
|
task.id, list(non_error_parents.keys()),
|
|
|
parent_task_statuses)
|
|
|
- _start_task(task)
|
|
|
- task_statuses[task.id] = (
|
|
|
- constants.TASK_STATUS_PENDING)
|
|
|
+ task_statuses[task.id] = _start_task(task)
|
|
|
else:
|
|
|
LOG.info(
|
|
|
"Unscheduling on-error task '%s' as none of "
|
|
|
@@ -2007,7 +2382,7 @@ class ConductorServerEndpoint(object):
|
|
|
ctxt, task, execution, updated_task_info)
|
|
|
|
|
|
newly_started_tasks = self._advance_execution_state(
|
|
|
- ctxt, execution, instance=task.instance)
|
|
|
+ ctxt, execution, instance=task.instance, requery=False)
|
|
|
if newly_started_tasks:
|
|
|
LOG.info(
|
|
|
"The following tasks were started for execution '%s' "
|
|
|
@@ -2377,3 +2752,140 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
def get_diagnostics(self, ctxt):
|
|
|
return utils.get_diagnostics_info()
|
|
|
+
|
|
|
+ def create_region(self, ctxt, region_name, description="", enabled=True):
|
|
|
+ region = models.Region()
|
|
|
+ region.id = str(uuid.uuid4())
|
|
|
+ region.name = region_name
|
|
|
+ region.description = description
|
|
|
+ region.enabled = enabled
|
|
|
+ db_api.add_region(ctxt, region)
|
|
|
+ return self.get_region(ctxt, region.id)
|
|
|
+
|
|
|
+ def get_regions(self, ctxt):
|
|
|
+ return db_api.get_regions(ctxt)
|
|
|
+
|
|
|
+ @region_synchronized
|
|
|
+ def get_region(self, ctxt, region_id):
|
|
|
+ region = db_api.get_region(ctxt, region_id)
|
|
|
+ if not region:
|
|
|
+ raise exception.NotFound(
|
|
|
+ "Region with ID '%s' not found." % region_id)
|
|
|
+ return region
|
|
|
+
|
|
|
+ @region_synchronized
|
|
|
+ def update_region(self, ctxt, region_id, updated_values):
|
|
|
+ LOG.info(
|
|
|
+ "Attempting to update region '%s' with payload: %s",
|
|
|
+ region_id, updated_values)
|
|
|
+ db_api.update_region(ctxt, region_id, updated_values)
|
|
|
+ LOG.info("Region '%s' successfully updated", region_id)
|
|
|
+ return db_api.get_region(ctxt, region_id)
|
|
|
+
|
|
|
+ @region_synchronized
|
|
|
+ def delete_region(self, ctxt, region_id):
|
|
|
+ # TODO(aznashwan): add checks for endpoints/services
|
|
|
+ # associated to the region before deletion:
|
|
|
+ db_api.delete_region(ctxt, region_id)
|
|
|
+
|
|
|
+ def register_service(
|
|
|
+ self, ctxt, host, binary, topic, enabled, mapped_regions=None,
|
|
|
+ providers=None, specs=None):
|
|
|
+ service = db_api.find_service(ctxt, host, binary, topic=topic)
|
|
|
+ if service:
|
|
|
+ raise exception.Conflict(
|
|
|
+ "A Service with the specified parameters (host %s, binary %s, "
|
|
|
+ "topic %s) has already been registered under ID: %s" % (
|
|
|
+ host, binary, topic, service.id))
|
|
|
+
|
|
|
+ service = models.Service()
|
|
|
+ service.id = str(uuid.uuid4())
|
|
|
+ service.host = host
|
|
|
+ service.binary = binary
|
|
|
+ service.enabled = enabled
|
|
|
+ service.topic = topic
|
|
|
+ service.status = constants.SERVICE_STATUS_UP
|
|
|
+
|
|
|
+ if None in (providers, specs):
|
|
|
+ worker_rpc = self._get_rpc_client_for_service(service)
|
|
|
+ status = worker_rpc.get_service_status(ctxt)
|
|
|
+
|
|
|
+ service.providers = status["providers"]
|
|
|
+ service.specs = status["specs"]
|
|
|
+ else:
|
|
|
+ service.providers = providers
|
|
|
+ service.specs = specs
|
|
|
+
|
|
|
+ # create the service:
|
|
|
+ db_api.add_service(ctxt, service)
|
|
|
+ LOG.debug(
|
|
|
+ "Added new service to DB: %s", service.id)
|
|
|
+
|
|
|
+ # add region associations:
|
|
|
+ if mapped_regions:
|
|
|
+ try:
|
|
|
+ db_api.update_service(
|
|
|
+ ctxt, service.id, {
|
|
|
+ "mapped_regions": mapped_regions})
|
|
|
+ except Exception as ex:
|
|
|
+ LOG.warn(
|
|
|
+ "Error adding region mappings during new service "
|
|
|
+ "registration (host: %s), cleaning up endpoint and "
|
|
|
+ "all created mappings for regions: %s",
|
|
|
+ service.host, mapped_regions)
|
|
|
+ db_api.delete_service(ctxt, service.id)
|
|
|
+ raise
|
|
|
+
|
|
|
+ return self.get_service(ctxt, service.id)
|
|
|
+
|
|
|
+ def check_service_registered(self, ctxt, host, binary, topic):
|
|
|
+ props = "host='%s', binary='%s', topic='%s'" % (host, binary, topic)
|
|
|
+ LOG.debug(
|
|
|
+ "Checking for existence of service with properties: %s", props)
|
|
|
+ service = db_api.find_service(ctxt, host, binary, topic=topic)
|
|
|
+ if service:
|
|
|
+ LOG.debug(
|
|
|
+ "Found service '%s' for properties %s", service.id, props)
|
|
|
+ else:
|
|
|
+ LOG.debug(
|
|
|
+ "Could not find any service with the specified "
|
|
|
+ "properties: %s", props)
|
|
|
+ return service
|
|
|
+
|
|
|
+ @service_synchronized
|
|
|
+ def refresh_service_status(self, ctxt, service_id):
|
|
|
+ LOG.debug("Updating registration for worker service '%s'", service_id)
|
|
|
+ service = db_api.get_service(ctxt, service_id)
|
|
|
+ worker_rpc = self._get_rpc_client_for_service(service)
|
|
|
+ status = worker_rpc.get_service_status(ctxt)
|
|
|
+ updated_values = {
|
|
|
+ "providers": status["providers"],
|
|
|
+ "specs": status["specs"],
|
|
|
+ "status": constants.SERVICE_STATUS_UP}
|
|
|
+ db_api.update_service(ctxt, service_id, updated_values)
|
|
|
+ LOG.debug("Successfully refreshed status of service '%s'", service_id)
|
|
|
+ return db_api.get_service(ctxt, service_id)
|
|
|
+
|
|
|
+ def get_services(self, ctxt):
|
|
|
+ return db_api.get_services(ctxt)
|
|
|
+
|
|
|
+ @service_synchronized
|
|
|
+ def get_service(self, ctxt, service_id):
|
|
|
+ service = db_api.get_service(ctxt, service_id)
|
|
|
+ if not service:
|
|
|
+ raise exception.NotFound(
|
|
|
+ "Service with ID '%s' not found." % service_id)
|
|
|
+ return service
|
|
|
+
|
|
|
+ @service_synchronized
|
|
|
+ def update_service(self, ctxt, service_id, updated_values):
|
|
|
+ LOG.info(
|
|
|
+ "Attempting to update service '%s' with payload: %s",
|
|
|
+ service_id, updated_values)
|
|
|
+ db_api.update_service(ctxt, service_id, updated_values)
|
|
|
+ LOG.info("Successfully updated service '%s'", service_id)
|
|
|
+ return db_api.get_service(ctxt, service_id)
|
|
|
+
|
|
|
+ @service_synchronized
|
|
|
+ def delete_service(self, ctxt, service_id):
|
|
|
+ db_api.delete_service(ctxt, service_id)
|