Bläddra i källkod

Add replica scheduler

Gabriel Adrian Samfira 8 år sedan
förälder
incheckning
e16e22b078

+ 1 - 0
.gitignore

@@ -1,4 +1,5 @@
 *.DS_Store
+.testrepository
 *.egg*
 *.log
 *.mo

+ 129 - 0
coriolis/api/v1/replica_schedules.py

@@ -0,0 +1,129 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+from webob import exc
+
+from coriolis.api.v1.views import replica_schedule_view
+from coriolis.api import wsgi as api_wsgi
+from coriolis import exception
+from coriolis.replica_cron import api
+from coriolis import schemas
+
+import jsonschema
+
+from oslo_log import log as logging
+from oslo_utils import strutils
+from oslo_utils import timeutils
+
+LOG = logging.getLogger(__name__)
+
+
+class ReplicaScheduleController(api_wsgi.Controller):
+    def __init__(self):
+        self._schedule_api = api.API()
+        super(ReplicaScheduleController, self).__init__()
+
+    def show(self, req, replica_id, id):
+        schedule = self._schedule_api.get_schedule(
+            req.environ["coriolis.context"], replica_id, id)
+        if not schedule:
+            raise exc.HTTPNotFound()
+
+        return replica_schedule_view.single(req, schedule)
+
+    def index(self, req, replica_id):
+        show_expired = strutils.bool_from_string(
+            req.GET.get("show_expired", True), strict=True)
+        return replica_schedule_view.collection(
+            req, self._schedule_api.get_schedules(
+                req.environ['coriolis.context'], replica_id,
+                expired=show_expired))
+
+    def _validate_schedule(self, schedule):
+        schema = schemas.SCHEDULE_API_BODY_SCHEMA["properties"]["schedule"]
+        schemas.validate_value(schedule, schema)
+        return schedule
+
+    def _validate_expiration_date(self, expiration_date):
+        if expiration_date is None:
+            return expiration_date
+        exp = timeutils.normalize_time(
+            timeutils.parse_isotime(expiration_date))
+        now = timeutils.utcnow()
+        if now > exp:
+            raise exception.InvalidInput(
+                "expiration_date is in the past")
+        return exp
+
+    def _validate_create_body(self, body):
+        schedule = body.get("schedule")
+        if schedule is None:
+            raise exception.InvalidInput(
+                "schedule is required")
+        schedule = self._validate_schedule(schedule)
+        schemas.validate_value(
+            body, schemas.SCHEDULE_API_BODY_SCHEMA,
+            format_checker=jsonschema.FormatChecker())
+
+        enabled = body.get("enabled", True)
+        exp = body.get("expiration_date", None)
+        if exp is not None:
+            exp = self._validate_expiration_date(exp)
+        shutdown = body.get("shutdown_instance", False)
+        return (schedule, enabled, exp, shutdown)
+
+    def _validate_update_body(self, update_body):
+        body = {}
+        schedule = update_body.get("schedule")
+        if schedule is not None:
+            schedule = self._validate_schedule(schedule)
+            body["schedule"] = schedule
+        enabled = update_body.get("enabled")
+        if enabled is not None:
+            body["enabled"] = enabled
+        shutdown = update_body.get("shutdown_instance")
+        if shutdown is not None:
+            body["shutdown_instance"] = shutdown
+        schemas.validate_value(
+            body, schemas.SCHEDULE_API_BODY_SCHEMA,
+            format_checker=jsonschema.FormatChecker())
+
+        exp = None
+        if "expiration_date" in update_body:
+            exp = self._validate_expiration_date(
+                update_body.get("expiration_date"))
+            body["expiration_date"] = exp
+        return body
+
+    def create(self, req, replica_id, body):
+        LOG.debug("Got request: %r %r %r" % (req, replica_id, body))
+        try:
+            schedule, enabled, exp_date, shutdown = self._validate_create_body(
+                body)
+        except Exception as err:
+            raise exception.InvalidInput(err)
+
+        return replica_schedule_view.single(req, self._schedule_api.create(
+            req.environ['coriolis.context'], replica_id, schedule, enabled,
+            exp_date, shutdown))
+
+    def update(self, req, replica_id, id, body):
+        LOG.debug("Got request: %r %r %r %r" % (
+            req, replica_id, id, body))
+
+        try:
+            update_values = self._validate_update_body(body)
+        except Exception as err:
+            raise exception.InvalidInput(err)
+
+        return replica_schedule_view.single(req, self._schedule_api.update(
+            req.environ['coriolis.context'], replica_id, id,
+            update_values))
+
+    def delete(self, req, replica_id, id):
+        self._schedule_api.delete(
+            req.environ['coriolis.context'], replica_id, id)
+        raise exc.HTTPNoContent()
+
+
+def create_resource():
+    return api_wsgi.Resource(ReplicaScheduleController())

+ 8 - 0
coriolis/api/v1/router.py

@@ -13,6 +13,7 @@ from coriolis.api.v1 import migrations
 from coriolis.api.v1 import provider_schemas
 from coriolis.api.v1 import providers
 from coriolis.api.v1 import replica_actions
+from coriolis.api.v1 import replica_schedules
 from coriolis.api.v1 import replica_tasks_execution_actions
 from coriolis.api.v1 import replica_tasks_executions
 from coriolis.api.v1 import replicas
@@ -117,3 +118,10 @@ class APIRouter(api.APIRouter):
                            'replica_tasks_execution_actions'],
                        action='action',
                        conditions={'method': 'POST'})
+
+        sched = replica_schedules.create_resource()
+        self.resources['replica_schedules'] = sched
+        mapper.resource('replica_schedule', 'replicas/{replica_id}/schedules',
+                        controller=self.resources['replica_schedules'],
+                        collection={'index': 'GET'},
+                        member={'action': 'POST'})

+ 24 - 0
coriolis/api/v1/views/replica_schedule_view.py

@@ -0,0 +1,24 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import itertools
+
+
+def format_schedule(req, schedule, keys=None):
+    def transform(key, value):
+        if keys and key not in keys:
+            return
+        yield (key, value)
+
+    return dict(itertools.chain.from_iterable(
+        transform(k, v) for k, v in schedule.items()))
+
+
+def single(req, schedule):
+    return {"schedule": format_schedule(req, schedule)}
+
+
+def collection(req, schedules):
+    formatted_schedules = [format_schedule(req, m)
+                           for m in schedules]
+    return {'schedules': formatted_schedules}

+ 30 - 0
coriolis/cmd/replica_cron.py

@@ -0,0 +1,30 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import sys
+
+from oslo_config import cfg
+
+from coriolis.replica_cron.rpc import server as rpc_server
+from coriolis import service
+from coriolis import utils
+
+CONF = cfg.CONF
+
+
+def main():
+    CONF(sys.argv[1:], project='coriolis',
+         version="1.0.0")
+    utils.setup_logging()
+
+    server = service.MessagingService(
+        'coriolis_replica_cron_worker',
+        [rpc_server.ReplicaCronServerEndpoint()],
+        rpc_server.VERSION, worker_count=1)
+    launcher = service.service.launch(
+        CONF, server, workers=server.get_workers_count())
+    launcher.wait()
+
+
+if __name__ == "__main__":
+    main()

+ 38 - 0
coriolis/conductor/rpc/client.py

@@ -188,3 +188,41 @@ class ConductorClient(object):
         self._client.cast(ctxt, 'task_progress_update', task_id=task_id,
                           current_step=current_step, total_steps=total_steps,
                           message=message)
+
+    def create_replica_schedule(self, ctxt, replica_id,
+                                schedule, enabled, exp_date,
+                                shutdown_instance):
+        return self._client.call(
+            ctxt, 'create_replica_schedule',
+            replica_id=replica_id,
+            schedule=schedule,
+            enabled=enabled,
+            exp_date=exp_date,
+            shutdown_instance=shutdown_instance)
+
+    def update_replica_schedule(self, ctxt, replica_id, schedule_id,
+                                updated_values):
+        return self._client.call(
+            ctxt, 'update_replica_schedule',
+            replica_id=replica_id,
+            schedule_id=schedule_id,
+            updated_values=updated_values)
+
+    def delete_replica_schedule(self, ctxt, replica_id, schedule_id):
+        return self._client.call(
+            ctxt, 'delete_replica_schedule',
+            replica_id=replica_id,
+            schedule_id=schedule_id)
+
+    def get_replica_schedules(self, ctxt, replica_id=None, expired=True):
+        return self._client.call(
+            ctxt, 'get_replica_schedules',
+            replica_id=replica_id, expired=expired)
+
+    def get_replica_schedule(self, ctxt, replica_id,
+                             schedule_id, expired=True):
+        return self._client.call(
+            ctxt, 'get_replica_schedule',
+            replica_id=replica_id,
+            schedule_id=schedule_id,
+            expired=expired)

+ 85 - 2
coriolis/conductor/rpc/server.py

@@ -8,10 +8,12 @@ from oslo_concurrency import lockutils
 from oslo_log import log as logging
 
 from coriolis import constants
+from coriolis import context
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
 from coriolis import exception
 from coriolis import keystone
+from coriolis.replica_cron.rpc import client as rpc_cron_client
 from coriolis import utils
 from coriolis.worker.rpc import client as rpc_worker_client
 
@@ -40,6 +42,16 @@ def replica_synchronized(func):
     return wrapper
 
 
+def schedule_synchronized(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, replica_id, schedule_id, *args, **kwargs):
+        @lockutils.synchronized(schedule_id)
+        def inner():
+            return func(self, ctxt, replica_id, schedule_id, *args, **kwargs)
+        return inner()
+    return wrapper
+
+
 def task_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, task_id, *args, **kwargs):
@@ -73,6 +85,7 @@ def tasks_execution_synchronized(func):
 class ConductorServerEndpoint(object):
     def __init__(self):
         self._rpc_worker_client = rpc_worker_client.WorkerClient()
+        self._replica_cron_client = rpc_cron_client.ReplicaCronClient()
 
     def create_endpoint(self, ctxt, name, endpoint_type, description,
                         connection_info):
@@ -176,7 +189,9 @@ class ConductorServerEndpoint(object):
         }
 
     def _begin_tasks(self, ctxt, execution, task_info={}):
-        keystone.create_trust(ctxt)
+        if not ctxt.trust_id:
+            keystone.create_trust(ctxt)
+            ctxt.delete_trust_id = True
 
         origin = self._get_task_origin(ctxt, execution.action)
         destination = self._get_task_destination(ctxt, execution.action)
@@ -686,7 +701,8 @@ class ConductorServerEndpoint(object):
         LOG.info("Tasks execution %(id)s completed with status: %(status)s",
                  {"id": execution_id, "status": execution_status})
         db_api.set_execution_status(ctxt, execution_id, execution_status)
-        keystone.delete_trust(ctxt)
+        if ctxt.delete_trust_id:
+            keystone.delete_trust(ctxt)
 
     @task_synchronized
     def set_task_host(self, ctxt, task_id, host, process_id):
@@ -827,3 +843,70 @@ class ConductorServerEndpoint(object):
         LOG.info("Task progress update: %s", task_id)
         db_api.add_task_progress_update(ctxt, task_id, current_step,
                                         total_steps, message)
+
+    def _get_replica_schedule(self, ctxt, replica_id,
+                              schedule_id, expired=True):
+        schedule = db_api.get_replica_schedule(
+            ctxt, replica_id, schedule_id, expired=expired)
+        if not schedule:
+            raise exception.NotFound("Schedule not found")
+        return schedule
+
+    def create_replica_schedule(self, ctxt, replica_id,
+                                schedule, enabled, exp_date,
+                                shutdown_instance):
+        keystone.create_trust(ctxt)
+        replica = self._get_replica(ctxt, replica_id)
+        replica_schedule = models.ReplicaSchedule()
+        replica_schedule.id = str(uuid.uuid4())
+        replica_schedule.replica = replica
+        replica_schedule.replica_id = replica_id
+        replica_schedule.schedule = schedule
+        replica_schedule.expiration_date = exp_date
+        replica_schedule.enabled = enabled
+        replica_schedule.shutdown_instance = shutdown_instance
+        replica_schedule.trust_id = ctxt.trust_id
+
+        db_api.add_replica_schedule(
+            ctxt, replica_schedule,
+            lambda ctxt, sched: self._replica_cron_client.register(
+                ctxt, sched))
+        return self.get_replica_schedule(
+            ctxt, replica_id, replica_schedule.id)
+
+    @schedule_synchronized
+    def update_replica_schedule(self, ctxt, replica_id, schedule_id,
+                                updated_values):
+        db_api.update_replica_schedule(
+            ctxt, replica_id, schedule_id, updated_values, None,
+            lambda ctxt, sched: self._replica_cron_client.register(
+                ctxt, sched))
+        return self._get_replica_schedule(ctxt, replica_id, schedule_id)
+
+    def _cleanup_schedule_resources(self, ctxt, schedule):
+        self._replica_cron_client.unregister(ctxt, schedule)
+        if schedule.trust_id:
+            tmp_trust = context.get_admin_context(
+                trust_id=schedule.trust_id)
+            keystone.delete_trust(tmp_trust)
+
+    @schedule_synchronized
+    def delete_replica_schedule(self, ctxt, replica_id, schedule_id):
+        db_api.delete_replica_schedule(
+            ctxt, replica_id, schedule_id, None,
+            lambda ctxt, sched: self._cleanup_schedule_resources(
+                ctxt, sched))
+
+    @replica_synchronized
+    def get_replica_schedules(self, ctxt, replica_id=None, expired=True):
+        return db_api.get_replica_schedules(
+            ctxt, replica_id=replica_id, expired=expired)
+
+    @schedule_synchronized
+    def get_replica_schedule(self, ctxt, replica_id,
+                             schedule_id, expired=True):
+        schedule = self._get_replica_schedule(
+            ctxt, replica_id, schedule_id, expired=True)
+        if not schedule:
+            raise exception.NotFound("Schedule not found")
+        return schedule

+ 8 - 1
coriolis/context.py

@@ -13,7 +13,7 @@ class RequestContext(context.RequestContext):
                  timestamp=None, request_id=None, auth_token=None,
                  overwrite=True, domain=None, user_domain=None,
                  project_domain=None, show_deleted=None, trust_id=None,
-                 **kwargs):
+                 delete_trust_id=False, **kwargs):
 
         super(RequestContext, self).__init__(auth_token=auth_token,
                                              user=user,
@@ -34,6 +34,7 @@ class RequestContext(context.RequestContext):
             timestamp = timeutils.parse_isotime(timestamp)
         self.timestamp = timestamp
         self.trust_id = trust_id
+        self.delete_trust_id = delete_trust_id
 
     def to_dict(self):
         result = super(RequestContext, self).to_dict()
@@ -52,3 +53,9 @@ class RequestContext(context.RequestContext):
     @classmethod
     def from_dict(cls, values):
         return cls(**values)
+
+
+def get_admin_context(trust_id=None):
+    return RequestContext(
+        user=None, tenant=None, is_admin=True,
+        trust_id=trust_id)

+ 173 - 25
coriolis/db/api.py

@@ -5,8 +5,11 @@ from oslo_config import cfg
 from oslo_db import api as db_api
 from oslo_db import options as db_options
 from oslo_db.sqlalchemy import enginefacade
+from oslo_utils import timeutils
 from sqlalchemy import func
+from sqlalchemy import or_
 from sqlalchemy import orm
+from sqlalchemy.sql import null
 
 from coriolis.db.sqlalchemy import models
 from coriolis import exception
@@ -41,11 +44,45 @@ def _session(context):
     return (context and context.session) or get_session()
 
 
+def is_user_context(context):
+    """Indicates if the request context is a normal user."""
+    if not context:
+        return False
+    if not context.user_id or not context.project_id:
+        return False
+    if context.is_admin:
+        return False
+    return True
+
+
 def _model_query(context, *args):
     session = _session(context)
     return session.query(*args)
 
 
+def _get_replica_schedules_filter(context, replica_id=None,
+                                  schedule_id=None, expired=True):
+    now = timeutils.utcnow()
+    q = _soft_delete_aware_query(context, models.ReplicaSchedule)
+    q = q.join(models.Replica)
+    sched_filter = q.filter()
+    if is_user_context(context):
+        sched_filter = sched_filter.filter(
+            models.Replica.project_id == context.tenant)
+
+    if replica_id:
+        sched_filter = sched_filter.filter(
+            models.Replica.id == replica_id)
+    if schedule_id:
+        sched_filter = sched_filter.filter(
+            models.ReplicaSchedule.id == schedule_id)
+    if not expired:
+        sched_filter = sched_filter.filter(
+            or_(models.ReplicaSchedule.expiration_date == null(),
+                models.ReplicaSchedule.expiration_date > now))
+    return sched_filter
+
+
 def _soft_delete_aware_query(context, *args, **kwargs):
     """Query helper that accounts for context's `show_deleted` field.
 
@@ -62,15 +99,19 @@ def _soft_delete_aware_query(context, *args, **kwargs):
 @enginefacade.reader
 def get_endpoints(context):
     q = _soft_delete_aware_query(context, models.Endpoint)
-    return q.filter(
-        models.Endpoint.project_id == context.tenant).all()
+    if is_user_context(context):
+        q = q.filter(
+            models.Endpoint.project_id == context.tenant)
+    return q.filter().all()
 
 
 @enginefacade.reader
 def get_endpoint(context, endpoint_id):
     q = _soft_delete_aware_query(context, models.Endpoint)
+    if is_user_context(context):
+        q = q.filter(
+            models.Endpoint.project_id == context.tenant)
     return q.filter(
-        models.Endpoint.project_id == context.tenant,
         models.Endpoint.id == endpoint_id).first()
 
 
@@ -93,8 +134,11 @@ def update_endpoint(context, endpoint_id, updated_values):
 
 @enginefacade.writer
 def delete_endpoint(context, endpoint_id):
+    args = {"id": endpoint_id}
+    if is_user_context(context):
+        args["project_id"] = context.tenant
     count = _soft_delete_aware_query(context, models.Endpoint).filter_by(
-        project_id=context.tenant, id=endpoint_id).soft_delete()
+        **args).soft_delete()
     if count == 0:
         raise exception.NotFound("0 entries were soft deleted")
 
@@ -105,8 +149,9 @@ def get_replica_tasks_executions(context, replica_id, include_tasks=False):
     q = q.join(models.Replica)
     if include_tasks:
         q = _get_tasks_with_details_options(q)
+    if is_user_context(context):
+        q = q.filter(models.Replica.project_id == context.tenant)
     return q.filter(
-        models.Replica.project_id == context.tenant,
         models.Replica.id == replica_id).all()
 
 
@@ -115,16 +160,18 @@ def get_replica_tasks_execution(context, replica_id, execution_id):
     q = _soft_delete_aware_query(context, models.TasksExecution).join(
         models.Replica)
     q = _get_tasks_with_details_options(q)
+    if is_user_context(context):
+        q = q.filter(models.Replica.project_id == context.tenant)
     return q.filter(
-        models.Replica.project_id == context.tenant,
         models.Replica.id == replica_id,
         models.TasksExecution.id == execution_id).first()
 
 
 @enginefacade.writer
 def add_replica_tasks_execution(context, execution):
-    if execution.action.project_id != context.tenant:
-        raise exception.NotAuthorized()
+    if is_user_context(context):
+        if execution.action.project_id != context.tenant:
+            raise exception.NotAuthorized()
 
     # include deleted records
     max_number = _model_query(
@@ -139,14 +186,91 @@ def add_replica_tasks_execution(context, execution):
 def delete_replica_tasks_execution(context, execution_id):
     q = _soft_delete_aware_query(context, models.TasksExecution).filter(
         models.TasksExecution.id == execution_id)
-    if not q.join(models.Replica).filter(
-            models.Replica.project_id == context.tenant).first():
-        raise exception.NotAuthorized()
+    if is_user_context(context):
+        if not q.join(models.Replica).filter(
+                models.Replica.project_id == context.tenant).first():
+            raise exception.NotAuthorized()
     count = q.soft_delete()
     if count == 0:
         raise exception.NotFound("0 entries were soft deleted")
 
 
+@enginefacade.reader
+def get_replica_schedules(context, replica_id=None, expired=True):
+    sched_filter = _get_replica_schedules_filter(
+        context, replica_id=replica_id, expired=expired)
+    return sched_filter.all()
+
+
+@enginefacade.reader
+def get_replica_schedule(context, replica_id, schedule_id, expired=True):
+    sched_filter = _get_replica_schedules_filter(
+        context, replica_id=replica_id, schedule_id=schedule_id,
+        expired=expired)
+    return sched_filter.first()
+
+
+@enginefacade.writer
+def update_replica_schedule(context, replica_id, schedule_id,
+                            updated_values, pre_update_callable=None,
+                            post_update_callable=None):
+    # NOTE(gsamfira): we need to refactor the DB layer a bit to allow
+    # two-phase transactions or at least allow running these functions
+    # inside a single transaction block.
+    schedule = get_replica_schedule(context, replica_id, schedule_id)
+    if pre_update_callable:
+        pre_update_callable(schedule=schedule)
+    for val in ["schedule", "expiration_date", "enabled", "shutdown_instance"]:
+        if val in updated_values:
+            setattr(schedule, val, updated_values[val])
+    if post_update_callable:
+        # at this point nothing has really been sent to the DB,
+        # but we may need to act upon the new changes elsewhere
+        # before we actually commit to the database
+        post_update_callable(context, schedule)
+
+
+@enginefacade.writer
+def delete_replica_schedule(context, replica_id,
+                            schedule_id, pre_delete_callable=None,
+                            post_delete_callable=None):
+    # NOTE(gsamfira): we need to refactor the DB layer a bit to allow
+    # two-phase transactions or at least allow running these functions
+    # inside a single transaction block.
+
+    q = _soft_delete_aware_query(context, models.ReplicaSchedule).filter(
+        models.ReplicaSchedule.id == schedule_id,
+        models.ReplicaSchedule.replica_id == replica_id)
+    schedule = q.first()
+    if not schedule:
+        raise exception.NotFound(
+            "No such schedule")
+    if is_user_context(context):
+        if not q.join(models.Replica).filter(
+                models.Replica.project_id == context.tenant).first():
+                raise exception.NotAuthorized()
+    if pre_delete_callable:
+        pre_delete_callable(context, schedule)
+    count = q.soft_delete()
+    if post_delete_callable:
+        post_delete_callable(context, schedule)
+    if count == 0:
+        raise exception.NotFound("0 entries were soft deleted")
+
+
+@enginefacade.writer
+def add_replica_schedule(context, schedule, post_create_callable=None):
+    # NOTE(gsamfira): we need to refactor the DB layer a bit to allow
+    # two-phase transactions or at least allow running these functions
+    # inside a single transaction block.
+
+    if schedule.replica.project_id != context.tenant:
+        raise exception.NotAuthorized()
+    context.session.add(schedule)
+    if post_create_callable:
+        post_create_callable(context, schedule)
+
+
 def _get_replica_with_tasks_executions_options(q):
     return q.options(orm.joinedload(models.Replica.executions))
 
@@ -156,16 +280,21 @@ def get_replicas(context, include_tasks_executions=False):
     q = _soft_delete_aware_query(context, models.Replica)
     if include_tasks_executions:
         q = _get_replica_with_tasks_executions_options(q)
-    return q.filter(
-        models.Replica.project_id == context.tenant).all()
+    q = q.filter()
+    if is_user_context(context):
+        q = q.filter(
+            models.Replica.project_id == context.tenant)
+    return q.all()
 
 
 @enginefacade.reader
 def get_replica(context, replica_id):
     q = _soft_delete_aware_query(context, models.Replica)
     q = _get_replica_with_tasks_executions_options(q)
+    if is_user_context(context):
+        q = q.filter(
+            models.Replica.project_id == context.tenant)
     return q.filter(
-        models.Replica.project_id == context.tenant,
         models.Replica.id == replica_id).first()
 
 
@@ -178,8 +307,11 @@ def add_replica(context, replica):
 
 @enginefacade.writer
 def _delete_transfer_action(context, cls, id):
+    args = {"base_id": id}
+    if is_user_context(context):
+        args["project_id"] = context.tenant
     count = _soft_delete_aware_query(context, cls).filter_by(
-        project_id=context.tenant, base_id=id).soft_delete()
+        **args).soft_delete()
     if count == 0:
         raise exception.NotFound("0 entries were soft deleted")
 
@@ -197,8 +329,10 @@ def get_replica_migrations(context, replica_id):
     q = _soft_delete_aware_query(context, models.Migration)
     q = q.join("replica")
     q = q.options(orm.joinedload("executions"))
+    if is_user_context(context):
+        q = q.filter(
+            models.Migration.project_id == context.tenant)
     return q.filter(
-        models.Migration.project_id == context.tenant,
         models.Replica.id == replica_id).all()
 
 
@@ -209,7 +343,10 @@ def get_migrations(context, include_tasks=False):
         q = _get_migration_task_query_options(q)
     else:
         q = q.options(orm.joinedload("executions"))
-    return q.filter_by(project_id=context.tenant).all()
+    args = {}
+    if is_user_context(context):
+        args["project_id"] = context.tenant
+    return q.filter_by(**args).all()
 
 
 def _get_tasks_with_details_options(query):
@@ -237,7 +374,10 @@ def _get_migration_task_query_options(query):
 def get_migration(context, migration_id):
     q = _soft_delete_aware_query(context, models.Migration)
     q = _get_migration_task_query_options(q)
-    return q.filter_by(project_id=context.tenant, id=migration_id).first()
+    args = {"id": migration_id}
+    if is_user_context(context):
+        args["project_id"] = context.tenant
+    return q.filter_by(**args).first()
 
 
 @enginefacade.writer
@@ -256,9 +396,12 @@ def delete_migration(context, migration_id):
 def set_execution_status(context, execution_id, status):
     execution = _soft_delete_aware_query(
         context, models.TasksExecution).join(
-            models.TasksExecution.action).filter(
-                models.BaseTransferAction.project_id == context.tenant,
-                models.TasksExecution.id == execution_id).first()
+            models.TasksExecution.action)
+    if is_user_context(context):
+        execution = execution.filter(
+            models.BaseTransferAction.project_id == context.tenant)
+    execution = execution.filter(
+        models.TasksExecution.id == execution_id).first()
     if not execution:
         raise exception.NotFound(
             "Tasks execution not found: %s" % execution_id)
@@ -269,9 +412,12 @@ def set_execution_status(context, execution_id, status):
 @enginefacade.reader
 def get_action(context, action_id):
     action = _soft_delete_aware_query(
-        context, models.BaseTransferAction).filter(
-            models.BaseTransferAction.project_id == context.tenant,
-            models.BaseTransferAction.base_id == action_id).first()
+        context, models.BaseTransferAction)
+    if is_user_context(context):
+        action = action.filter(
+            models.BaseTransferAction.project_id == context.tenant)
+    action = action.filter(
+        models.BaseTransferAction.base_id == action_id).first()
     if not action:
         raise exception.NotFound(
             "Transfer action not found: %s" % action_id)
@@ -301,8 +447,10 @@ def get_tasks_execution(context, execution_id):
     q = q.join(models.BaseTransferAction)
     q = q.options(orm.joinedload("action"))
     q = q.options(orm.joinedload("tasks"))
+    if is_user_context(context):
+        q = q.filter(
+            models.BaseTransferAction.project_id == context.tenant)
     execution = q.filter(
-        models.BaseTransferAction.project_id == context.tenant,
         models.TasksExecution.id == execution_id).first()
     if not execution:
         raise exception.NotFound(

+ 46 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/004_adds_replica_schedules.py

@@ -0,0 +1,46 @@
+import uuid
+
+import sqlalchemy
+
+
+def upgrade(migrate_engine):
+    meta = sqlalchemy.MetaData()
+    meta.bind = migrate_engine
+
+    sqlalchemy.Table(
+        'replica', meta, autoload=True)
+
+    replica_schedules = sqlalchemy.Table(
+        'replica_schedules', meta,
+        sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
+                          default=lambda: str(uuid.uuid4())),
+        sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('deleted', sqlalchemy.String(36)),
+        sqlalchemy.Column("replica_id", sqlalchemy.String(36),
+                          sqlalchemy.ForeignKey(
+                              'replica.id'), nullable=False),
+        sqlalchemy.Column("schedule", sqlalchemy.String(255), nullable=False),
+        sqlalchemy.Column("expiration_date", sqlalchemy.DateTime),
+        sqlalchemy.Column("enabled", sqlalchemy.Boolean,
+                          default=True, nullable=False),
+        sqlalchemy.Column("shutdown_instance", sqlalchemy.Boolean,
+                          default=False, nullable=False),
+        sqlalchemy.Column('trust_id', sqlalchemy.String(36)),
+        mysql_engine='InnoDB',
+        mysql_charset='utf8'
+    )
+
+    tables = (
+        replica_schedules,
+    )
+
+    for index, table in enumerate(tables):
+        try:
+            table.create()
+        except Exception:
+            # If an error occurs, drop all tables created so far to return
+            # to the previously existing state.
+            meta.drop_all(tables=tables[:index])
+            raise

+ 22 - 0
coriolis/db/sqlalchemy/models.py

@@ -170,3 +170,25 @@ class Endpoint(BASE, models.TimestampMixin, models.ModelBase,
         BaseTransferAction, backref=orm.backref('destination_endpoint'),
         primaryjoin="and_(BaseTransferAction.destination_endpoint_id=="
         "Endpoint.id, BaseTransferAction.deleted=='0')")
+
+
+class ReplicaSchedule(BASE, models.TimestampMixin, models.ModelBase,
+                      models.SoftDeleteMixin):
+    __tablename__ = "replica_schedules"
+
+    id = sqlalchemy.Column(sqlalchemy.String(36),
+                           default=lambda: str(uuid.uuid4()),
+                           primary_key=True)
+    replica_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('replica.id'), nullable=False)
+    replica = orm.relationship(
+        Replica, backref=orm.backref("schedules"), foreign_keys=[replica_id])
+    schedule = sqlalchemy.Column(types.Json, nullable=False)
+    expiration_date = sqlalchemy.Column(
+        sqlalchemy.types.DateTime, nullable=True)
+    enabled = sqlalchemy.Column(
+        sqlalchemy.Boolean, nullable=False, default=True)
+    shutdown_instance = sqlalchemy.Column(
+        sqlalchemy.Boolean, nullable=False, default=False)
+    trust_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)

+ 6 - 0
coriolis/exception.py

@@ -123,6 +123,12 @@ class NotAuthorized(CoriolisException):
     safe = True
 
 
+class Conflict(CoriolisException):
+    message = _("Conflict")
+    code = 409
+    safe = True
+
+
 class AdminRequired(NotAuthorized):
     message = _("User does not have admin privileges")
 

+ 0 - 0
coriolis/replica_cron/__init__.py


+ 30 - 0
coriolis/replica_cron/api.py

@@ -0,0 +1,30 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+from coriolis.conductor.rpc import client as rpc_client
+
+
+class API(object):
+    def __init__(self):
+        self._rpc_client = rpc_client.ConductorClient()
+
+    def create(self, ctxt, replica_id, schedule, enabled,
+               exp_date, shutdown_instance):
+        return self._rpc_client.create_replica_schedule(
+            ctxt, replica_id, schedule, enabled, exp_date,
+            shutdown_instance)
+
+    def get_schedules(self, ctxt, replica_id, expired=True):
+        return self._rpc_client.get_replica_schedules(
+            ctxt, replica_id, expired=expired)
+
+    def get_schedule(self, ctxt, replica_id, schedule_id, expired=True):
+        return self._rpc_client.get_replica_schedule(
+            ctxt, replica_id, schedule_id, expired=expired)
+
+    def update(self, ctxt, replica_id, schedule_id, update_values):
+        return self._rpc_client.update_replica_schedule(
+            ctxt, replica_id, schedule_id, update_values)
+
+    def delete(self, ctxt, replica_id, schedule_id):
+        self._rpc_client.delete_replica_schedule(
+            ctxt, replica_id, schedule_id)

+ 248 - 0
coriolis/replica_cron/cron.py

@@ -0,0 +1,248 @@
+import datetime
+import sys
+import time
+
+import eventlet
+from eventlet import semaphore
+from oslo_log import log
+from oslo_utils import timeutils
+import schedule
+
+from coriolis import exception
+from coriolis import schemas
+
+
+LOG = log.getLogger(__name__)
+
+SCHEDULE_FIELDS = ("minute", "hour", "dom", "month", "dow")
+
+
+class CronJob(object):
+
+    def __init__(self, name, description, schedule, enabled,
+                 expires, on_success, on_error,
+                 job_callable, *args, **kw):
+        # param: name: string: unique ID that describes this job
+        # param: description: string: a short description of the job
+        # param: schedule: dict: cron job schedule. This is of the form:
+        #     {
+        #         "minute": 1,
+        #         "hour": 0,
+        #         "dom": 20,
+        #         "month": 11,
+        #         "dow": 1
+        #     }
+        # param: enabled: bool: Whether or not this cron job is enabled
+        # param: expires: datetime: expiration date for this cronjob
+        # param: on_success: callable: a function that gets called if the
+        # job is successful. This function must accept the result returned
+        # by the scheduled function
+        #
+        # param: on_error: callable: If the function scheduled to run raises
+        # an exception, this function will run. on_error MUST accept the
+        # exception info raised by the scheduled function, as the only
+        # parameter. Any exception thrown by this callback will be logged and
+        # ignored.
+        #
+        # param: job_callable: callable: The function we are scheduling to run,
+        # Every other *arg or **kw following this parameter will be passed in
+        # directly to this function.
+
+        self.name = name
+        if not callable(job_callable):
+            raise exception.CoriolisException("Invalid job function")
+
+        schema = schemas.SCHEDULE_API_BODY_SCHEMA["properties"]["schedule"]
+        schemas.validate_value(schedule, schema)
+
+        if on_success and not callable(on_success):
+            raise ValueError("on_success must be callable")
+        if on_error and not callable(on_error):
+            raise ValueError("on_error must be callable")
+
+        self._on_success = on_success
+        self._on_error = on_error
+        self.schedule = schedule
+        self._func = job_callable
+        self._description = description
+        self._args = args
+        self._kw = kw
+        self._enabled = enabled
+        if expires:
+            if not isinstance(expires, datetime.datetime):
+                raise exception.CoriolisException(
+                    "Invalid expires")
+        self._expires = expires
+        self._last_run = None
+
+    def _compare(self, pairs):
+        # we don't support the full cron syntax. Either exact matches
+        # or wildcard matches are allowed. Expressions such as:
+        # */10 (every 10 units)
+        # are not supported
+        ret = []
+        for item in pairs:
+            if item[1] is None or item[0] == item[1]:
+                ret.append(True)
+            else:
+                ret.append(False)
+        return ret
+
+    def is_expired(self):
+        now = timeutils.utcnow()
+        if self._expires and self._expires < now:
+            return True
+        return False
+
+    def should_run(self, dt):
+        # NOTE(gsamfira): I find your lack of strong typing....disturbing
+        if type(dt) is not datetime.datetime:
+            raise exception.CoriolisException("Invalid datetime object")
+        if self.is_expired():
+            return False
+        if self._enabled is False:
+            return False
+        if self._last_run:
+            if (dt - self._last_run).seconds < 60:
+                return False
+        fields = ('year', 'month', 'dom', 'hour',
+                  'minute', 'second', 'dow')
+        dt_fields = dict(zip(fields, dt.timetuple()))
+
+        pairs = [(dt_fields[i], self.schedule.get(i)) for i in SCHEDULE_FIELDS]
+        compared = self._compare(pairs)
+        return False not in compared
+
+    def _send_status(self, queue, status):
+        if not queue:
+            return
+        queue.put(status)
+
+    def start(self, dt, status_queue=None):
+        result = None
+        exc_info = None
+        try:
+            result = self._func(*self._args, **self._kw)
+            if self._on_success:
+                self._on_success(result)
+        except BaseException as err:
+            exc_info = sys.exc_info()
+            LOG.exception(err)
+            if self._on_error:
+                try:
+                    self._on_error(exc_info)
+                except Exception as callback_err:
+                    LOG.exception(callback_err)
+        self._send_status(
+            status_queue,
+            {"result": result,
+             "description": self._description,
+             "name": self.name,
+             "error_info": exc_info,
+             "last_run": dt})
+
+
+class Cron(object):
+
+    def __init__(self):
+        self._queue = eventlet.Queue(maxsize=1000)
+        self._should_stop = False
+        self._jobs = {}
+        self._eventlets = []
+        self._semaphore = semaphore.Semaphore(value=1)
+
+    def register(self, job):
+        if not isinstance(job, CronJob):
+            raise ValueError("Invalid job class")
+        name = job.name
+        with self._semaphore:
+            self._jobs[name] = job
+
+    def unregister(self, name):
+        job = self._jobs.get(name)
+        if job:
+            with self._semaphore:
+                del self._jobs[name]
+
+    def _check_jobs(self):
+        LOG.debug("Checking jobs")
+        jobs = self._jobs.copy()
+        job_nr = len(jobs)
+        spawned = 0
+        now = timeutils.utcnow()
+        if job_nr:
+            for job in jobs:
+                if jobs[job].should_run(now):
+                    LOG.debug("Spawning job %s" % job)
+                    eventlet.spawn(jobs[job].start, now, self._queue)
+                    spawned += 1
+
+        done = timeutils.utcnow()
+        delta = done - now
+        LOG.debug("Spawned %(jobs)d jobs in %(seconds)d seconds" % {
+            "seconds": delta.seconds,
+            "jobs": spawned})
+
+    def _loop(self):
+        while True:
+            schedule.run_pending()
+            time.sleep(.2)
+
+    def _result_loop(self):
+        while True:
+            job_info = self._queue.get()
+            name = job_info["name"]
+            result = job_info["result"]
+            error = job_info["error_info"]
+            last_run = job_info["last_run"]
+            desc = job_info["description"]
+            with self._semaphore:
+                self._jobs[name]._last_run = last_run
+            # TODO(gsamfira): send this to the controller and update
+            # the logs table...or do something much more meaningful
+            if error:
+                LOG.error("Job %(job_desc)s exited with error: %(job_err)r" % {
+                    "job_desc": desc,
+                    "job_err": error})
+            if result:
+                LOG.info("Job %(desc)s returned: %(ret)r" % {
+                    "job_desc": desc,
+                    "job_ret": result})
+
+    def _janitor(self):
+        # remove expired jobs from memory. The check for expired
+        # jobs runs once every minute.
+        while True:
+            with self._semaphore:
+                tmp = {}
+                for job in self._jobs:
+                    if self._jobs[job].is_expired():
+                        LOG.debug("Removing expired job: %s" % job)
+                        continue
+                    tmp[job] = self._jobs[job]
+                self._jobs = tmp.copy()
+                tmp = None
+            # No need to run very often. Once a minute should do
+            time.sleep(60)
+
+    def _ripper(self):
+        # Not sure if this will ever be called, but for correctness
+        # sake, thought I'd add it
+        while True:
+            if self._should_stop:
+                if len(self._eventlets):
+                    for greenthread in self._eventlets:
+                        eventlet.kill(greenthread)
+                    self._eventlets = []
+                return
+            time.sleep(.5)
+
+    def start(self):
+        schedule.every().minute.do(self._check_jobs)
+        self._eventlets.append(eventlet.spawn(self._loop))
+        self._eventlets.append(eventlet.spawn(self._janitor))
+        self._eventlets.append(eventlet.spawn(self._result_loop))
+        eventlet.spawn(self._ripper)
+
+    def stop(self):
+        self._should_stop = True

+ 0 - 0
coriolis/replica_cron/rpc/__init__.py


+ 21 - 0
coriolis/replica_cron/rpc/client.py

@@ -0,0 +1,21 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import oslo_messaging as messaging
+
+from coriolis import rpc
+
+VERSION = "1.0"
+
+
+class ReplicaCronClient(object):
+    def __init__(self):
+        target = messaging.Target(
+            topic='coriolis_replica_cron_worker', version=VERSION)
+        self._client = rpc.get_client(target)
+
+    def register(self, ctxt, schedule):
+        self._client.call(ctxt, 'register', schedule=schedule)
+
+    def unregister(self, ctxt, schedule):
+        self._client.call(ctxt, 'unregister', schedule=schedule)

+ 91 - 0
coriolis/replica_cron/rpc/server.py

@@ -0,0 +1,91 @@
+import json
+
+from coriolis.conductor.rpc import client as rpc_client
+from coriolis import context
+from coriolis import exception
+# from coriolis import utils
+from coriolis.replica_cron import cron
+
+from oslo_log import log as logging
+from oslo_utils import timeutils
+
+LOG = logging.getLogger(__name__)
+
+VERSION = "1.0"
+
+
+def _trigger_replica(ctxt, conductor_client, replica_id, shutdown_instance):
+    try:
+        conductor_client.execute_replica_tasks(
+            ctxt, replica_id, shutdown_instance)
+    except (exception.InvalidReplicaState,
+            exception.InvalidActionTasksExecutionState):
+        LOG.info("A replica or migration already running")
+
+
+class ReplicaCronServerEndpoint(object):
+
+    def __init__(self):
+        self._rpc_client = rpc_client.ConductorClient()
+        # Setup cron loop
+        self._cron = cron.Cron()
+        self._admin_ctx = context.get_admin_context()
+        self._init_cron()
+
+    def _deserialize_schedule(self, sched):
+        expires = sched.get("expiration_date")
+        if expires:
+            sched["expiration_date"] = timeutils.normalize_time(
+                timeutils.parse_isotime(expires))
+        tmp = sched["schedule"]
+        if type(tmp) is str:
+            sched["schedule"] = json.loads(tmp)
+        return sched
+
+    def _register_schedule(self, schedule, date=None):
+        date = date or timeutils.utcnow()
+        sched = self._deserialize_schedule(schedule)
+        expires = sched.get("expiration_date")
+        if expires and expires <= date:
+            LOG.info("Not registering expired schedule: %s" % sched["id"])
+            return
+        trust_ctxt = context.get_admin_context(
+            trust_id=schedule["trust_id"])
+        description = "Scheduled job for %s" % sched["id"]
+        job = cron.CronJob(
+            sched["id"], description, sched["schedule"],
+            sched["enabled"], sched["expiration_date"],
+            None, None, _trigger_replica, trust_ctxt,
+            self._rpc_client, schedule["replica_id"],
+            schedule["shutdown_instance"])
+        self._cron.register(job)
+
+    def _init_cron(self):
+        now = timeutils.utcnow()
+        schedules = self._get_all_schedules()
+        for schedule in schedules:
+            try:
+                self._register_schedule(schedule, date=now)
+            except Exception as err:
+                # NOTE(gsamfira): If we fail here, the service will
+                # not be able to start. Should we fail here because
+                # of an invalid schedule that managed to creep its
+                # way into the DB, or just ignore that one schedule?
+                LOG.exception(err)
+        self._cron.start()
+
+    def _get_all_schedules(self):
+        schedules = self._rpc_client.get_replica_schedules(
+            self._admin_ctx, expired=False)
+        return schedules
+
+    def register(self, ctxt, schedule):
+        now = timeutils.utcnow()
+        LOG.debug("Registering new schedule %s: %r" % (
+            schedule["id"], schedule["schedule"]))
+        self._register_schedule(schedule, date=now)
+
+    def unregister(self, ctxt, schedule):
+        schedule_id = schedule["id"]
+        LOG.debug("removing schedule %s" % schedule_id)
+        self._cron.unregister(schedule_id)

+ 6 - 2
coriolis/schemas.py

@@ -24,6 +24,7 @@ _CORIOLIS_VM_EXPORT_INFO_SCHEMA_NAME = "vm_export_info_schema.json"
 _CORIOLIS_VM_INSTANCE_INFO_SCHEMA_NAME = "vm_instance_info_schema.json"
 _CORIOLIS_OS_MORPHING_RES_SCHEMA_NAME = "os_morphing_resources_schema.json"
 _CORIOLIS_VM_NETWORK_SCHEMA_NAME = "vm_network_schema.json"
+_SCHEDULE_API_BODY_SCHEMA_NAME = "replica_schedule_schema.json"
 
 
 def get_schema(package_name, schema_name,
@@ -44,13 +45,13 @@ def get_schema(package_name, schema_name,
     return schema
 
 
-def validate_value(val, schema):
+def validate_value(val, schema, format_checker=None):
     """Simple wrapper for jsonschema.validate for usability.
 
     NOTE: silently passes empty schemas.
     """
     try:
-        jsonschema.validate(val, schema)
+        jsonschema.validate(val, schema, format_checker=format_checker)
     except jsonschema.exceptions.ValidationError as ex:
         LOG.debug("Schema validation failed: %s", ex)
         # Don't pass the value in the exception to avoid including sensitive
@@ -80,3 +81,6 @@ CORIOLIS_VM_INSTANCE_INFO_SCHEMA = get_schema(
 
 CORIOLIS_VM_NETWORK_SCHEMA = get_schema(
     __name__, _CORIOLIS_VM_NETWORK_SCHEMA_NAME)
+
+SCHEDULE_API_BODY_SCHEMA = get_schema(
+    __name__, _SCHEDULE_API_BODY_SCHEMA_NAME)

+ 56 - 0
coriolis/schemas/replica_schedule_schema.json

@@ -0,0 +1,56 @@
+{
+  "$schema": "http://cloudbase.it/coriolis/schemas/replica_schedule_schema#",
+  "type": "object",
+  "properties": {
+    "schedule": {
+      "type": "object",
+      "properties": {
+        "minute": {
+              "type": "integer",
+              "minimum": 0,
+              "maximum": 59
+        },
+        "hour": {
+              "type": "integer",
+              "minimum": 0,
+              "maximum": 23
+        },
+        "dom": {
+              "type": "integer",
+              "minimum": 1,
+              "maximum": 31
+        },
+        "month": {
+              "type": "integer",
+              "minimum": 1,
+              "maximum": 12
+        },
+        "dow": {
+              "type": "integer",
+              "minimum": 0,
+              "maximum": 6
+        }
+      }
+    },
+    "expiration_date": {
+      "oneOf": [
+        {
+          "type": "string",
+          "format": "date-time",
+          "description": "Expiration date for this schedule."
+        },
+        {
+          "type": "null"
+        }
+      ]
+    },
+    "enabled": {
+      "type": "boolean",
+      "description": "Schedule is enabled."
+    },
+    "shutdown_instance": {
+      "type": "boolean",
+      "description": "Shutdown instance before creating a snapshot."
+    }
+  }
+}

+ 2 - 2
coriolis/service.py

@@ -71,13 +71,13 @@ class WSGIService(service.ServiceBase):
 
 
 class MessagingService(service.ServiceBase):
-    def __init__(self, topic, endpoints, version):
+    def __init__(self, topic, endpoints, version, worker_count=None):
         target = messaging.Target(topic=topic,
                                   server=utils.get_hostname(),
                                   version=version)
         self._server = rpc.get_server(target, endpoints)
 
-        self._workers = (CONF.messaging_workers or
+        self._workers = (worker_count or CONF.messaging_workers or
                          processutils.get_worker_count())
 
     def get_workers_count(self):

+ 3 - 1
requirements.txt

@@ -5,7 +5,7 @@ jsonschema
 PyMySQL
 oslo.concurrency
 oslo.config
-oslo.context
+oslo.context>=2.19.1
 oslo.db
 oslo.i18n
 oslo.log
@@ -26,5 +26,7 @@ git+https://github.com/cloudbase/pywinrm.git@requests#egg=pywinrm
 PyYAML
 requests
 mysqlclient
+schedule
+strict-rfc3339
 sqlalchemy
 webob

+ 1 - 0
setup.cfg

@@ -28,6 +28,7 @@ console_scripts =
     coriolis-api = coriolis.cmd.api:main
     coriolis-conductor = coriolis.cmd.conductor:main
     coriolis-worker = coriolis.cmd.worker:main
+    coriolis-replica-cron = coriolis.cmd.replica_cron:main
     coriolis-dbsync = coriolis.cmd.db_sync:main
 
 [wheel]