Explorar o código

Improves events

Alessandro Pilotti %!s(int64=10) %!d(string=hai) anos
pai
achega
4b95cd4c59

+ 2 - 2
coriolis/api/v1/migrations.py

@@ -38,14 +38,14 @@ class MigrationController(api_wsgi.Controller):
         destination = migration["destination"]
         destination = migration["destination"]
 
 
         export_provider = factory.get_provider(
         export_provider = factory.get_provider(
-            origin["type"], constants.PROVIDER_TYPE_EXPORT)
+            origin["type"], constants.PROVIDER_TYPE_EXPORT, None)
         if not export_provider.validate_connection_info(
         if not export_provider.validate_connection_info(
                 origin.get("connection_info", {})):
                 origin.get("connection_info", {})):
             # TODO: use a decent exception
             # TODO: use a decent exception
             raise exception.CoriolisException("Invalid connection info")
             raise exception.CoriolisException("Invalid connection info")
 
 
         import_provider = factory.get_provider(
         import_provider = factory.get_provider(
-            destination["type"], constants.PROVIDER_TYPE_IMPORT)
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, None)
         if not import_provider.validate_connection_info(
         if not import_provider.validate_connection_info(
                 destination.get("connection_info", {})):
                 destination.get("connection_info", {})):
             # TODO: use a decent exception
             # TODO: use a decent exception

+ 51 - 0
coriolis/events.py

@@ -0,0 +1,51 @@
+import abc
+
+
+class EventManager(object):
+    __metaclass__ = abc.ABCMeta
+
+    def __init__(self, event_handler):
+        self._event_handler = event_handler
+        self._current_step = 0
+        self._total_steps = None
+
+    def set_total_progress_steps(self, total_steps):
+        self._total_steps = total_steps
+
+    def progress_update(self, message):
+        self._current_step += 1
+        if self._event_handler:
+            self._event_handler.progress_update(
+                self._current_step, self._total_steps, message)
+
+    def info(self, message):
+        if self._event_handler:
+            self._event_handler.info(message)
+
+    def warn(self, message):
+        if self._event_handler:
+            self._event_handler.warn(message)
+
+    def error(self, message):
+        if self._event_handler:
+            self._event_handler.error(message)
+
+
+class BaseEventHandler(object):
+    __metaclass__ = abc.ABCMeta
+
+    @abc.abstractmethod
+    def progress_update(self, current_step, total_steps, message):
+        pass
+
+    @abc.abstractmethod
+    def info(self, message):
+        pass
+
+    @abc.abstractmethod
+    def warn(self, message):
+        pass
+
+    @abc.abstractmethod
+    def error(self, message):
+        pass

+ 2 - 1
coriolis/osmorphing/base.py

@@ -11,13 +11,14 @@ class BaseOSMorphingTools(object):
 
 
     _packages = {}
     _packages = {}
 
 
-    def __init__(self, ssh, os_root_dir, hypervisor, platform):
+    def __init__(self, ssh, os_root_dir, hypervisor, platform, event_manager):
         self._ssh = ssh
         self._ssh = ssh
         self._os_root_dir = os_root_dir
         self._os_root_dir = os_root_dir
         self._hypervisor = hypervisor
         self._hypervisor = hypervisor
         self._platform = platform
         self._platform = platform
         self._distro = None
         self._distro = None
         self._version = None
         self._version = None
+        self._event_manager = event_manager
 
 
     def _test_path(self, chroot_path):
     def _test_path(self, chroot_path):
         path = os.path.join(self._os_root_dir, chroot_path)
         path = os.path.join(self._os_root_dir, chroot_path)

+ 3 - 2
coriolis/osmorphing/factory.py

@@ -5,13 +5,14 @@ from coriolis.osmorphing import ubuntu
 
 
 
 
 def get_os_morphing_tools(ssh, os_root_dir, target_hypervisor,
 def get_os_morphing_tools(ssh, os_root_dir, target_hypervisor,
-                          target_platform):
+                          target_platform, event_manager):
     os_morphing_tools_clss = [debian.DebianMorphingTools,
     os_morphing_tools_clss = [debian.DebianMorphingTools,
                               ubuntu.UbuntuMorphingTools,
                               ubuntu.UbuntuMorphingTools,
                               redhat.RedHatMorphingTools]
                               redhat.RedHatMorphingTools]
 
 
     for cls in os_morphing_tools_clss:
     for cls in os_morphing_tools_clss:
-        tools = cls(ssh, os_root_dir, target_hypervisor, target_platform)
+        tools = cls(ssh, os_root_dir, target_hypervisor, target_platform,
+                    event_manager)
         os_info = tools.check_os()
         os_info = tools.check_os()
         if os_info:
         if os_info:
             return (tools, os_info)
             return (tools, os_info)

+ 13 - 9
coriolis/osmorphing/manager.py

@@ -9,25 +9,27 @@ LOG = logging.getLogger(__name__)
 
 
 
 
 def morph_image(connection_info, target_hypervisor, target_platform,
 def morph_image(connection_info, target_hypervisor, target_platform,
-                volume_devs, nics_info):
+                volume_devs, nics_info, event_manager):
     (ip, port, username, pkey) = connection_info
     (ip, port, username, pkey) = connection_info
 
 
     LOG.info("Waiting for connectivity on host: %(ip)s:%(port)s",
     LOG.info("Waiting for connectivity on host: %(ip)s:%(port)s",
              {"ip": ip, "port": port})
              {"ip": ip, "port": port})
     utils.wait_for_port_connectivity(ip, port)
     utils.wait_for_port_connectivity(ip, port)
 
 
-    LOG.info("Connecting to host: %(ip)s:%(port)s", {"ip": ip, "port": port})
+    event_manager.progress_update(
+        "Connecting to host: %(ip)s:%(port)s" % {"ip": ip, "port": port})
     ssh = paramiko.SSHClient()
     ssh = paramiko.SSHClient()
     ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     ssh.connect(hostname=ip, port=port, username=username, pkey=pkey)
     ssh.connect(hostname=ip, port=port, username=username, pkey=pkey)
 
 
-    os_mount_tools = osmount_factory.get_os_mount_tools(ssh)
-    LOG.info("Discovering and mounting OS partitions")
+    os_mount_tools = osmount_factory.get_os_mount_tools(ssh, event_manager)
+
+    event_manager.progress_update("Discovering and mounting OS partitions")
     os_root_dir, other_mounted_dirs = os_mount_tools.mount_os(volume_devs)
     os_root_dir, other_mounted_dirs = os_mount_tools.mount_os(volume_devs)
     os_morphing_tools, os_info = osmorphing_factory.get_os_morphing_tools(
     os_morphing_tools, os_info = osmorphing_factory.get_os_morphing_tools(
-        ssh, os_root_dir, target_hypervisor, target_platform)
+        ssh, os_root_dir, target_hypervisor, target_platform, event_manager)
 
 
-    LOG.info('OS being migrated: %s', str(os_info))
+    event_manager.progress_update('OS being migrated: %s' % str(os_info))
 
 
     os_morphing_tools.set_net_config(nics_info, dhcp=True)
     os_morphing_tools.set_net_config(nics_info, dhcp=True)
     LOG.info("Pre packages")
     LOG.info("Pre packages")
@@ -37,15 +39,17 @@ def morph_image(connection_info, target_hypervisor, target_platform,
      packages_remove) = os_morphing_tools.get_packages()
      packages_remove) = os_morphing_tools.get_packages()
 
 
     if packages_add:
     if packages_add:
-        LOG.info("Adding packages: %s", str(packages_add))
+        event_manager.progress_update(
+            "Adding packages: %s" % str(packages_add))
         os_morphing_tools.install_packages(packages_add)
         os_morphing_tools.install_packages(packages_add)
 
 
     if packages_remove:
     if packages_remove:
-        LOG.info("Removing packages: %s", str(packages_remove))
+        event_manager.progress_update(
+            "Removing packages: %s" % str(packages_remove))
         os_morphing_tools.uninstall_packages(packages_remove)
         os_morphing_tools.uninstall_packages(packages_remove)
 
 
     LOG.info("Post packages")
     LOG.info("Post packages")
     os_morphing_tools.post_packages_install()
     os_morphing_tools.post_packages_install()
 
 
-    LOG.info("Dismounting OS partitions")
+    event_manager.progress_update("Dismounting OS partitions")
     os_mount_tools.dismount_os(other_mounted_dirs + [os_root_dir])
     os_mount_tools.dismount_os(other_mounted_dirs + [os_root_dir])

+ 2 - 1
coriolis/osmorphing/osmount/base.py

@@ -6,8 +6,9 @@ from coriolis import utils
 class BaseOSMountTools(object):
 class BaseOSMountTools(object):
     __metaclass__ = abc.ABCMeta
     __metaclass__ = abc.ABCMeta
 
 
-    def __init__(self, ssh):
+    def __init__(self, ssh, event_manager):
         self._ssh = ssh
         self._ssh = ssh
+        self._event_manager = event_manager
 
 
     @abc.abstractmethod
     @abc.abstractmethod
     def check_os(self):
     def check_os(self):

+ 2 - 2
coriolis/osmorphing/osmount/factory.py

@@ -2,11 +2,11 @@ from coriolis import exception
 from coriolis.osmorphing.osmount import ubuntu
 from coriolis.osmorphing.osmount import ubuntu
 
 
 
 
-def get_os_mount_tools(ssh):
+def get_os_mount_tools(ssh, event_manager):
     os_mount_tools = [ubuntu.UbuntuOSMountTools]
     os_mount_tools = [ubuntu.UbuntuOSMountTools]
 
 
     for cls in os_mount_tools:
     for cls in os_mount_tools:
-        tools = cls(ssh)
+        tools = cls(ssh, event_manager)
         if tools.check_os():
         if tools.check_os():
             return tools
             return tools
     raise exception.CoriolisException("OS mount tools not found")
     raise exception.CoriolisException("OS mount tools not found")

+ 6 - 3
coriolis/osmorphing/redhat.py

@@ -55,7 +55,8 @@ class RedHatMorphingTools(base.BaseOSMorphingTools):
                              "configuration mac_address: %s",
                              "configuration mac_address: %s",
                              ifcfg_file, mac_address)
                              ifcfg_file, mac_address)
             if not mac_address:
             if not mac_address:
-                LOG.warn("HWADDR not defined, skipping: %s", ifcfg_file)
+                self._event_manager.warn(
+                    "HWADDR not defined, skipping: %s" % ifcfg_file)
                 continue
                 continue
             name = ifcfg.get("NAME")
             name = ifcfg.get("NAME")
             if not name:
             if not name:
@@ -140,7 +141,8 @@ class RedHatMorphingTools(base.BaseOSMorphingTools):
             m = re.match('^kernel-(.*)$', package_name)
             m = re.match('^kernel-(.*)$', package_name)
             if m:
             if m:
                 kernel_version = m.groups()[0]
                 kernel_version = m.groups()[0]
-                LOG.info("Generating initrd for kernel: %s", kernel_version)
+                self._event_manager.progress_update(
+                    "Generating initrd for kernel: %s" % kernel_version)
                 self._exec_cmd_chroot(
                 self._exec_cmd_chroot(
                     "dracut -f /boot/initramfs-%(version)s.img %(version)s" %
                     "dracut -f /boot/initramfs-%(version)s.img %(version)s" %
                     {"version": kernel_version})
                     {"version": kernel_version})
@@ -217,7 +219,8 @@ class RedHatMorphingTools(base.BaseOSMorphingTools):
             major_version = version.split(".")[0]
             major_version = version.split(".")[0]
             repo_name = "rhel-%s-server-rh-common-rpms" % major_version
             repo_name = "rhel-%s-server-rh-common-rpms" % major_version
             # This is necessary for cloud-init
             # This is necessary for cloud-init
-            LOG.info("Enabling repository: %s" % repo_name)
+            self._event_manager.progress_update(
+                "Enabling repository: %s" % repo_name)
             self._exec_cmd_chroot(
             self._exec_cmd_chroot(
                 "subscription-manager repos --enable %s" % repo_name)
                 "subscription-manager repos --enable %s" % repo_name)
 
 

+ 7 - 51
coriolis/providers/base.py

@@ -1,44 +1,20 @@
 import abc
 import abc
 
 
+from coriolis import events
 
 
-class Baseprovider(object):
-    __metaclass__ = abc.ABCMeta
-
-    def __init__(self):
-        self._progress_update_manager = None
-        self._current_step = 0
-        self._total_steps = None
-
-    def set_event_handler(self, event_handler):
-        self._event_handler = event_handler
-
-    def _set_total_progress_steps(self, total_steps):
-        self._total_steps = total_steps
-
-    def _progress_update(self, message):
-        self._current_step += 1
-        if self._event_handler:
-            self._event_handler.progress_update(
-                self._current_step, self._total_steps, message)
-
-    def _info(self, message):
-        if self._event_handler:
-            self._event_handler.info(message)
 
 
-    def _warn(self, message):
-        if self._event_handler:
-            self._event_handler.warn(message)
+class BaseProvider(object):
+    __metaclass__ = abc.ABCMeta
 
 
-    def _error(self, message):
-        if self._event_handler:
-            self._event_handler.error(message)
+    def __init__(self, event_handler):
+        self._event_manager = events.EventManager(event_handler)
 
 
     @abc.abstractmethod
     @abc.abstractmethod
     def validate_connection_info(self, connection_info):
     def validate_connection_info(self, connection_info):
         pass
         pass
 
 
 
 
-class BaseImportProvider(Baseprovider):
+class BaseImportProvider(BaseProvider):
     __metaclass__ = abc.ABCMeta
     __metaclass__ = abc.ABCMeta
 
 
     @abc.abstractmethod
     @abc.abstractmethod
@@ -47,30 +23,10 @@ class BaseImportProvider(Baseprovider):
         pass
         pass
 
 
 
 
-class BaseExportProvider(Baseprovider):
+class BaseExportProvider(BaseProvider):
     __metaclass__ = abc.ABCMeta
     __metaclass__ = abc.ABCMeta
 
 
     @abc.abstractmethod
     @abc.abstractmethod
     def export_instance(self, ctxt, connection_info, instance_name,
     def export_instance(self, ctxt, connection_info, instance_name,
                         export_path):
                         export_path):
         pass
         pass
-
-
-class BaseProviderEventHandler(object):
-    __metaclass__ = abc.ABCMeta
-
-    @abc.abstractmethod
-    def progress_update(self, current_step, total_steps, message):
-        pass
-
-    @abc.abstractmethod
-    def info(self, message):
-        pass
-
-    @abc.abstractmethod
-    def warn(self, message):
-        pass
-
-    @abc.abstractmethod
-    def error(self, message):
-        pass

+ 2 - 2
coriolis/providers/factory.py

@@ -13,7 +13,7 @@ IMPORT_PROVIDERS = {
 }
 }
 
 
 
 
-def get_provider(platform_name, provider_type):
+def get_provider(platform_name, provider_type, event_handler):
     if provider_type == constants.PROVIDER_TYPE_EXPORT:
     if provider_type == constants.PROVIDER_TYPE_EXPORT:
         cls = EXPORT_PROVIDERS.get(platform_name)
         cls = EXPORT_PROVIDERS.get(platform_name)
     elif provider_type == constants.PROVIDER_TYPE_IMPORT:
     elif provider_type == constants.PROVIDER_TYPE_IMPORT:
@@ -21,4 +21,4 @@ def get_provider(platform_name, provider_type):
 
 
     if not cls:
     if not cls:
         raise exception.NotFound("Provider not found: %s" % platform_name)
         raise exception.NotFound("Provider not found: %s" % platform_name)
-    return cls()
+    return cls(event_handler)

+ 19 - 14
coriolis/providers/openstack/__init__.py

@@ -236,13 +236,14 @@ class ImportProvider(base.BaseExportProvider):
         try:
         try:
             migr_keypair_name = self._get_unique_name()
             migr_keypair_name = self._get_unique_name()
 
 
-            self._progress_update("Creating migration worker instance keypair")
+            self._event_manager.progress_update(
+                "Creating migration worker instance keypair")
 
 
             k = paramiko.RSAKey.generate(2048)
             k = paramiko.RSAKey.generate(2048)
             public_key = "ssh-rsa %s tmp@migration" % k.get_base64()
             public_key = "ssh-rsa %s tmp@migration" % k.get_base64()
             keypair = self._create_keypair(nova, migr_keypair_name, public_key)
             keypair = self._create_keypair(nova, migr_keypair_name, public_key)
 
 
-            self._progress_update(
+            self._event_manager.progress_update(
                 "Creating migration worker instance Neutron port")
                 "Creating migration worker instance Neutron port")
 
 
             port = self._create_neutron_port(neutron, migr_network_name)
             port = self._create_neutron_port(neutron, migr_network_name)
@@ -255,7 +256,7 @@ class ImportProvider(base.BaseExportProvider):
                 userdata=userdata,
                 userdata=userdata,
                 nics=[{'port-id': port['id']}])
                 nics=[{'port-id': port['id']}])
 
 
-            self._progress_update(
+            self._event_manager.progress_update(
                 "Adding migration worker instance floating IP")
                 "Adding migration worker instance floating IP")
 
 
             floating_ip = nova.floating_ips.create(pool=migr_fip_pool_name)
             floating_ip = nova.floating_ips.create(pool=migr_fip_pool_name)
@@ -264,7 +265,7 @@ class ImportProvider(base.BaseExportProvider):
             LOG.info("Floating IP: %s", floating_ip.ip)
             LOG.info("Floating IP: %s", floating_ip.ip)
             instance.add_floating_ip(floating_ip)
             instance.add_floating_ip(floating_ip)
 
 
-            self._progress_update(
+            self._event_manager.progress_update(
                 "Adding migration worker instance security group")
                 "Adding migration worker instance security group")
 
 
             migr_sec_group_name = self._get_unique_name()
             migr_sec_group_name = self._get_unique_name()
@@ -277,7 +278,7 @@ class ImportProvider(base.BaseExportProvider):
                 to_port=SSH_PORT)
                 to_port=SSH_PORT)
             instance.add_security_group(sec_group.id)
             instance.add_security_group(sec_group.id)
 
 
-            self._progress_update(
+            self._event_manager.progress_update(
                 "Waiting for connectivity on host: %(ip)s:%(port)s" %
                 "Waiting for connectivity on host: %(ip)s:%(port)s" %
                 {"ip": floating_ip.ip, "port": SSH_PORT})
                 {"ip": floating_ip.ip, "port": SSH_PORT})
 
 
@@ -359,7 +360,7 @@ class ImportProvider(base.BaseExportProvider):
                 #    utils.convert_disk_format(disk_path, target_disk_path,
                 #    utils.convert_disk_format(disk_path, target_disk_path,
                 #                              target_disk_format)
                 #                              target_disk_format)
 
 
-                self._progress_update("Uploading Glance image")
+                self._event_manager.progress_update("Uploading Glance image")
 
 
                 disk_format = disk_file_info["format"]
                 disk_format = disk_file_info["format"]
                 image = self._create_image(
                 image = self._create_image(
@@ -372,7 +373,7 @@ class ImportProvider(base.BaseExportProvider):
                 if disk_format != constants.DISK_FORMAT_RAW:
                 if disk_format != constants.DISK_FORMAT_RAW:
                     virtual_disk_size += DISK_HEADER_SIZE
                     virtual_disk_size += DISK_HEADER_SIZE
 
 
-                self._progress_update("Creating Cinder volume")
+                self._event_manager.progress_update("Creating Cinder volume")
 
 
                 volume_size_gb = math.ceil(virtual_disk_size / units.Gi)
                 volume_size_gb = math.ceil(virtual_disk_size / units.Gi)
                 volume = nova.volumes.create(
                 volume = nova.volumes.create(
@@ -391,11 +392,12 @@ class ImportProvider(base.BaseExportProvider):
             for i, volume in enumerate(volumes):
             for i, volume in enumerate(volumes):
                 self._wait_for_volume(nova, volume, 'available')
                 self._wait_for_volume(nova, volume, 'available')
 
 
-                self._progress_update("Deleting Glance image")
+                self._event_manager.progress_update("Deleting Glance image")
 
 
                 glance.images.delete(images[i].id)
                 glance.images.delete(images[i].id)
 
 
-                self._progress_update("Attaching volume to worker instance")
+                self._event_manager.progress_update(
+                    "Attaching volume to worker instance")
 
 
                 # TODO: improve device assignment
                 # TODO: improve device assignment
                 volume_dev = "/dev/sd%s" % chr(ord('a') + i + 1)
                 volume_dev = "/dev/sd%s" % chr(ord('a') + i + 1)
@@ -405,20 +407,23 @@ class ImportProvider(base.BaseExportProvider):
 
 
                 guest_conn_info = migr_resources.get_guest_connection_info()
                 guest_conn_info = migr_resources.get_guest_connection_info()
 
 
-            self._progress_update("Preparing instance for target platform")
+            self._event_manager.progress_update(
+                "Preparing instance for target platform")
             osmorphing_manager.morph_image(guest_conn_info,
             osmorphing_manager.morph_image(guest_conn_info,
                                            hypervisor_type,
                                            hypervisor_type,
                                            constants.PLATFORM_OPENSTACK,
                                            constants.PLATFORM_OPENSTACK,
                                            volume_devs,
                                            volume_devs,
-                                           nics_info)
+                                           nics_info,
+                                           self._event_manager)
         finally:
         finally:
-            self._progress_update("Removing worker instance resources")
+            self._event_manager.progress_update(
+                "Removing worker instance resources")
 
 
             migr_resources.delete()
             migr_resources.delete()
 
 
         ports = []
         ports = []
         for nic_info in nics_info:
         for nic_info in nics_info:
-            self._progress_update(
+            self._event_manager.progress_update(
                 "Creating Neutron port for migrated instance")
                 "Creating Neutron port for migrated instance")
 
 
             origin_network_name = nic_info.get("network_name")
             origin_network_name = nic_info.get("network_name")
@@ -436,7 +441,7 @@ class ImportProvider(base.BaseExportProvider):
             ports.append(self._create_neutron_port(
             ports.append(self._create_neutron_port(
                 neutron, network_name, nic_info.get("mac_address")))
                 neutron, network_name, nic_info.get("mac_address")))
 
 
-        self._progress_update(
+        self._event_manager.progress_update(
             "Creating migrated instance")
             "Creating migrated instance")
 
 
         instance = self._create_target_instance(
         instance = self._create_target_instance(

+ 5 - 5
coriolis/providers/vmware_vsphere/__init__.py

@@ -258,19 +258,19 @@ class ExportProvider(base.BaseExportProvider):
         if allow_untrusted:
         if allow_untrusted:
             context.verify_mode = ssl.CERT_NONE
             context.verify_mode = ssl.CERT_NONE
 
 
-        self._set_total_progress_steps(4)
+        self._event_manager.set_total_progress_steps(4)
 
 
-        self._progress_update("Connecting to vSphere host")
+        self._event_manager.progress_update("Connecting to vSphere host")
         si = self._connect(host, username, password, port, context)
         si = self._connect(host, username, password, port, context)
         try:
         try:
-            self._progress_update("Getting VM info")
+            self._event_manager.progress_update("Getting VM info")
             vm_info, vm = self._get_vm_info(si, instance_name)
             vm_info, vm = self._get_vm_info(si, instance_name)
-            self._progress_update("Exporting disks")
+            self._event_manager.progress_update("Exporting disks")
             disk_paths = self._export_disks(vm, export_path, context)
             disk_paths = self._export_disks(vm, export_path, context)
         finally:
         finally:
             connect.Disconnect(si)
             connect.Disconnect(si)
 
 
-        self._progress_update("Converting virtual disks format")
+        self._event_manager.progress_update("Converting virtual disks format")
         for disk_path in disk_paths:
         for disk_path in disk_paths:
             path = disk_path["path"]
             path = disk_path["path"]
             LOG.info("Converting VMDK type: %s" % path)
             LOG.info("Converting VMDK type: %s" % path)

+ 4 - 4
coriolis/worker/rpc/server.py

@@ -9,8 +9,8 @@ import psutil
 
 
 from coriolis.conductor.rpc import client as rpc_conductor_client
 from coriolis.conductor.rpc import client as rpc_conductor_client
 from coriolis import constants
 from coriolis import constants
+from coriolis import events
 from coriolis import exception
 from coriolis import exception
-from coriolis.providers import base
 from coriolis.providers import factory
 from coriolis.providers import factory
 from coriolis import utils
 from coriolis import utils
 
 
@@ -30,7 +30,7 @@ TMP_DIRS_KEY = "__tmp_dirs"
 VERSION = "1.0"
 VERSION = "1.0"
 
 
 
 
-class _ConductorProviderEventHandler(base.BaseProviderEventHandler):
+class _ConductorProviderEventHandler(events.BaseEventHandler):
     def __init__(self, ctxt, task_id):
     def __init__(self, ctxt, task_id):
         self._ctxt = ctxt
         self._ctxt = ctxt
         self._task_id = task_id
         self._task_id = task_id
@@ -167,9 +167,9 @@ def _task_process(ctxt, task_id, task_type, origin, destination, instance,
             raise exception.NotFound(
             raise exception.NotFound(
                 "Unknown task type: %s" % task_type)
                 "Unknown task type: %s" % task_type)
 
 
-        provider = factory.get_provider(data["type"], provider_type)
         event_handler = _ConductorProviderEventHandler(ctxt, task_id)
         event_handler = _ConductorProviderEventHandler(ctxt, task_id)
-        provider.set_event_handler(event_handler)
+        provider = factory.get_provider(data["type"], provider_type,
+                                        event_handler)
 
 
         connection_info = data.get("connection_info", {})
         connection_info = data.get("connection_info", {})
         target_environment = data.get("target_environment", {})
         target_environment = data.get("target_environment", {})