Просмотр исходного кода

Merge pull request #44 from baizhang/gce

Add GCEBlockStoreService, GCEVolumeService, GCESnapshotService
Nuwan Goonasekera 9 лет назад
Родитель
Сommit
85984c162b

+ 3 - 2
cloudbridge/cloud/providers/gce/provider.py

@@ -17,6 +17,7 @@ import googleapiclient.http
 from oauth2client.client import GoogleCredentials
 from oauth2client.service_account import ServiceAccountCredentials
 
+from .services import GCEBlockStoreService
 from .services import GCEComputeService
 from .services import GCENetworkService
 from .services import GCESecurityService
@@ -162,6 +163,7 @@ class GCECloudProvider(BaseCloudProvider):
         self._compute = GCEComputeService(self)
         self._security = GCESecurityService(self)
         self._network = GCENetworkService(self)
+        self._block_store = GCEBlockStoreService(self)
 
         self._compute_resources = GCPResources(self.gce_compute)
         self._storage_resources = GCPResources(self.gcp_storage)
@@ -180,8 +182,7 @@ class GCECloudProvider(BaseCloudProvider):
 
     @property
     def block_store(self):
-        raise NotImplementedError(
-            "GCECloudProvider does not implement this service")
+        return self._block_store
 
     @property
     def object_store(self):

+ 287 - 4
cloudbridge/cloud/providers/gce/resources.py

@@ -1,6 +1,7 @@
 """
 DataTypes used by this provider
 """
+from cloudbridge.cloud.base.resources import BaseAttachmentInfo
 from cloudbridge.cloud.base.resources import BaseFloatingIP
 from cloudbridge.cloud.base.resources import BaseInstance
 from cloudbridge.cloud.base.resources import BaseInstanceType
@@ -11,8 +12,12 @@ from cloudbridge.cloud.base.resources import BasePlacementZone
 from cloudbridge.cloud.base.resources import BaseRegion
 from cloudbridge.cloud.base.resources import BaseSecurityGroup
 from cloudbridge.cloud.base.resources import BaseSecurityGroupRule
+from cloudbridge.cloud.base.resources import BaseSnapshot
+from cloudbridge.cloud.base.resources import BaseVolume
 from cloudbridge.cloud.interfaces.resources import InstanceState
 from cloudbridge.cloud.interfaces.resources import MachineImageState
+from cloudbridge.cloud.interfaces.resources import SnapshotState
+from cloudbridge.cloud.interfaces.resources import VolumeState
 
 import cloudbridge as cb
 
@@ -80,7 +85,7 @@ class GCEInstanceType(BaseInstanceType):
 
     @property
     def id(self):
-        return str(self._inst_dict.get('id'))
+        return self._inst_dict.get('selfLink')
 
     @property
     def name(self):
@@ -193,7 +198,7 @@ class GCERegion(BaseRegion):
 class GCEFirewallsDelegate(object):
     DEFAULT_NETWORK = 'default'
     _NETWORK_URL_PREFIX = 'global/networks/'
-  
+
     def __init__(self, provider):
         self._provider = provider
         self._list_response = None
@@ -222,7 +227,7 @@ class GCEFirewallsDelegate(object):
             if network_name is not None:
                 out.add((firewall['targetTags'][0], network_name))
         return out
-            
+
     def network_name(self, firewall):
         """
         Extract the network name of a firewall.
@@ -637,7 +642,7 @@ class GCEMachineImage(BaseMachineImage):
         :rtype: ``str``
         :return: ID for this instance as returned by the cloud middleware.
         """
-        return self._gce_image['name']
+        return self._gce_image.get('selfLink')
 
     @property
     def name(self):
@@ -1270,3 +1275,281 @@ class GCEFloatingIP(BaseFloatingIP):
                                          address=self._ip['name'])
                                  .execute())
        self._provider.wait_for_operation(response, region=self._region)
+
+
+class GCEVolume(BaseVolume):
+
+    VOLUME_STATE_MAP = {
+        'RESTORING': VolumeState.CONFIGURING,
+        'PENDING': VolumeState.CONFIGURING,
+        'READY': VolumeState.AVAILABLE,
+        'DONE': VolumeState.AVAILABLE,
+        'RUNNING': VolumeState.IN_USE,
+    }
+
+    def __init__(self, provider, volume):
+        super(GCEVolume, self).__init__(provider)
+        self._volume = volume
+
+    @property
+    def id(self):
+        return self._volume.get('selfLink')
+
+    @property
+    def name(self):
+        """
+        Get the volume name.
+        """
+        return self._volume.get('name')
+
+    @name.setter
+    def name(self, value):
+        """
+        Set the volume name.
+        """
+        raise NotImplementedError('Not supported by this provider.')
+
+    @property
+    def description(self):
+        labels = self._volume.get('labels')
+        if not labels or 'description' not in labels:
+            return ''
+        return labels.get('description', '')
+
+    @description.setter
+    def description(self, value):
+        request_body = {
+            'labels': {'description': value.replace(' ', '_').lower(),},
+            'labelFingerprint': self._volume.get('labelFingerprint'),
+        }
+        try:
+            response = (self._provider.gce_compute
+                        .disks()
+                        .setLabels(
+                            project=self._provider.project_name,
+                            zone=self._provider.default_zone,
+                            resource=self.name,
+                            body=request_body).execute())
+        except Exception as e:
+            cb.log.warning('Exception while setting volume description: %s', e)
+        self.refresh()
+
+    @property
+    def size(self):
+        return self._volume.get('sizeGb')
+
+    @property
+    def create_time(self):
+        return self._volume.get('creationTimestamp')
+
+    @property
+    def zone_id(self):
+        return self._volume.get('zone')
+
+    @property
+    def source(self):
+        return self._volume.get('sourceSnapshot')
+
+    @property
+    def attachments(self):
+        # GCE Persistent Disk supports multiple instances attaching a READ-ONLY
+        # disk. In cloudbridge, volume usage pattern is that a disk is attached
+        # to a single instance in a read-write mode. Therefore, we only check
+        # the first user of a disk.
+        if 'users' in self._volume and len(self._volume) > 0:
+            if len(self._volume) > 1:
+                cb.log.warning("This volume is attached to multiple instances.")
+            return BaseAttachmentInfo(self,
+                                      self._volume.get('users')[0],
+                                      None)
+        else:
+            return None
+
+    def attach(self, instance, device):
+        """
+        Attach this volume to an instance.
+
+        instance: The ID of an instance or an ``Instance`` object to
+                  which this volume will be attached.
+
+        To use the disk, the user needs to mount the disk so that the operating
+        system can use the available storage space.
+        https://cloud.google.com/compute/docs/disks/add-persistent-disk
+        """
+        attach_disk_body = {
+            "source": self.id,
+            "deviceName": device,
+        }
+        instance_name = instance.name if isinstance(
+            instance,
+            GCEInstance) else instance
+        response = (self._provider.gce_compute
+                        .instances()
+                        .attachDisk(
+                            project=self._provider.project_name,
+                            zone=self._provider.default_zone,
+                            instance=instance_name,
+                            body=attach_disk_body).execute())
+
+    def detach(self, force=False):
+        """
+        Detach this volume from an instance.
+        """
+        # Check whether this volume is attached to an instance.
+        if not self.attachments:
+            return
+        instance_data = self._provider.get_gce_resource_data(
+            self.attachments.instance_id)
+        # Check whether the instance has this volume attached.
+        if 'disks' not in instance_data:
+            return
+        device_name = None
+        for disk in instance_data['disks']:
+            if ('source' in disk and 'deviceName' in disk and
+                disk['source'] == self.id):
+                device_name = disk['deviceName']
+        if not device_name:
+            return
+        response = (self._provider.gce_compute
+                        .instances()
+                        .detachDisk(
+                            project=self._provider.project_name,
+                            zone=self._provider.default_zone,
+                            instance=instance_data.get('name'),
+                            deviceName=device_name).execute())
+
+    def create_snapshot(self, name, description=None):
+        """
+        Create a snapshot of this Volume.
+        """
+        return self._provider.block_store.snapshots.create(
+            name, self, description)
+
+    def delete(self):
+        """
+        Delete this volume.
+        """
+        response = (self._provider.gce_compute
+                        .disks()
+                        .delete(
+                            project=self._provider.project_name,
+                            zone=self._provider.default_zone,
+                            disk=self.name).execute())
+
+    @property
+    def state(self):
+        return GCEVolume.VOLUME_STATE_MAP.get(
+            self._volume.get('status'), VolumeState.UNKNOWN)
+
+    def refresh(self):
+        """
+        Refreshes the state of this volume by re-querying the cloud provider
+        for its latest state.
+        """
+        self._volume = self._provider.get_gce_resource_data(
+            self._volume.get('selfLink'))
+
+
+class GCESnapshot(BaseSnapshot):
+
+    SNAPSHOT_STATE_MAP = {
+        'PENDING': SnapshotState.PENDING,
+        'READY': SnapshotState.AVAILABLE,
+    }
+
+    def __init__(self, provider, snapshot):
+        super(GCESnapshot, self).__init__(provider)
+        self._snapshot = snapshot
+
+    @property
+    def id(self):
+        return self._snapshot.get('selfLink')
+
+    @property
+    def name(self):
+        """
+        Get the snapshot name.
+         """
+        return self._snapshot.get('name')
+
+    @name.setter
+    # pylint:disable=arguments-differ
+    def name(self, value):
+        """
+        Set the snapshot name.
+        """
+        raise NotImplementedError('Not supported by this provider.')
+
+    @property
+    def description(self):
+        return self._snapshot.get('description')
+
+    @description.setter
+    def description(self, value):
+        raise NotImplementedError('Not supported by this provider.')
+
+    @property
+    def size(self):
+        return self._snapshot.get('diskSizeGb')
+
+    @property
+    def volume_id(self):
+        return self._snapshot.get('sourceDisk')
+
+    @property
+    def create_time(self):
+        return self._snapshot.get('creationTimestamp')
+
+    @property
+    def state(self):
+        return GCESnapshot.SNAPSHOT_STATE_MAP.get(
+            self._snapshot.get('status'), SnapshotState.UNKNOWN)
+
+    def refresh(self):
+        """
+        Refreshes the state of this snapshot by re-querying the cloud provider
+        for its latest state.
+        """
+        self._snapshot = self._provider.get_gce_resource_data(
+            self._snapshot.get('selfLink'))
+
+    def delete(self):
+        """
+        Delete this snapshot.
+        """
+        response = (self._provider.gce_compute
+                        .snapshots()
+                        .delete(
+                            project=self._provider.project_name,
+                            snapshot=self.name).execute())
+
+    def create_volume(self, placement, size=None, volume_type=None, iops=None):
+        """
+        Create a new Volume from this Snapshot.
+
+        Args:
+            placement: GCE zone name, e.g. 'us-central1-f'.
+            size: The size of the new volume, in GiB (optional). Defaults to
+                the size of the snapshot.
+            volume_type: Type of persistent disk. Either 'pd-standard' or
+                'pd-ssd'.
+            iops: Not supported by GCE.
+        """
+        vol_type = 'zones/{0}/diskTypes/{1}'.format(
+            placement,
+            'pd-standard' if (volume_type != 'pd-standard' or
+                              volume_type != 'pd-ssd') else volume_type)
+        disk_body = {
+            'name': 'created-from-{0}'.format(self.name),
+            'sizeGb': size if size is not None else self.size,
+            'type': vol_type,
+            'sourceSnapshot': self.id
+        }
+        operation = (self._provider.gce_compute
+                         .disks()
+                         .insert(
+                             project=self._provider.project_name,
+                             zone=placement,
+                             body=disk_body).execute())
+        return self._provider.block_store.volumes.get(
+            operation.get('targetLink'))

+ 215 - 7
cloudbridge/cloud/providers/gce/services.py

@@ -1,5 +1,6 @@
 from cloudbridge.cloud.base.resources import ClientPagedResultList
 from cloudbridge.cloud.base.resources import ServerPagedResultList
+from cloudbridge.cloud.base.services import BaseBlockStoreService
 from cloudbridge.cloud.base.services import BaseComputeService
 from cloudbridge.cloud.base.services import BaseImageService
 from cloudbridge.cloud.base.services import BaseInstanceService
@@ -9,6 +10,9 @@ from cloudbridge.cloud.base.services import BaseNetworkService
 from cloudbridge.cloud.base.services import BaseRegionService
 from cloudbridge.cloud.base.services import BaseSecurityGroupService
 from cloudbridge.cloud.base.services import BaseSecurityService
+from cloudbridge.cloud.base.services import BaseSnapshotService
+from cloudbridge.cloud.base.services import BaseVolumeService
+from cloudbridge.cloud.interfaces.resources import PlacementZone
 from cloudbridge.cloud.interfaces.resources import SecurityGroup
 from cloudbridge.cloud.providers.gce import helpers
 import cloudbridge as cb
@@ -32,6 +36,8 @@ from .resources import GCENetwork
 from .resources import GCERegion
 from .resources import GCESecurityGroup
 from .resources import GCESecurityGroupRule
+from .resources import GCESnapshot
+from .resources import GCEVolume
 
 
 class GCESecurityService(BaseSecurityService):
@@ -475,13 +481,18 @@ class GCEInstanceService(BaseInstanceService):
                     config['tags']['items'] = sg_names
         else:
             config = launch_config
-        response = self.provider.gce_compute \
-                                .instances() \
-                                .insert(
-                                    project=self.provider.project_name,
-                                    zone=self.provider.default_zone,
-                                    body=config) \
-                                .execute()
+        operation = (self.provider.gce_compute.instances()
+                         .insert(
+                             project=self.provider.project_name,
+                             zone=self.provider.default_zone,
+                             body=config)
+                         .execute())
+        if 'zone' not in operation:
+            return None
+        gce_zone = self.provider.get_gce_resource_data(operation['zone'])
+        instance_id = operation.get('targetLink')
+        self.provider.wait_for_operation(operation, zone=gce_zone.get('name'))
+        return self.get(instance_id)
 
     def get(self, instance_id):
         """
@@ -664,3 +675,200 @@ class GCENetworkService(BaseNetworkService):
 
     def create_router(self, name=None):
         raise NotImplementedError('To be implemented')
+
+
+class GCEBlockStoreService(BaseBlockStoreService):
+
+    def __init__(self, provider):
+        super(GCEBlockStoreService, self).__init__(provider)
+
+        # Initialize provider services
+        self._volume_svc = GCEVolumeService(self.provider)
+        self._snapshot_svc = GCESnapshotService(self.provider)
+
+    @property
+    def volumes(self):
+        return self._volume_svc
+
+    @property
+    def snapshots(self):
+        return self._snapshot_svc
+
+
+class GCEVolumeService(BaseVolumeService):
+
+    def __init__(self, provider):
+        super(GCEVolumeService, self).__init__(provider)
+
+    def get(self, volume_id):
+        """
+        Returns a volume given its id.
+        """
+        try:
+            response = self.provider.get_gce_resource_data(volume_id)
+            if response:
+                return GCEVolume(self.provider, response)
+        except googleapiclient.errors.HttpError as http_error:
+            # If the volume is not found, the API will raise
+            # googleapiclient.errors.HttpError.
+            cb.log.warning(
+                "googleapiclient.errors.HttpError: {0}".format(http_error))
+        return None
+
+    def find(self, name, limit=None, marker=None):
+        """
+        Searches for a volume by a given list of attributes.
+        """
+        filtr = 'name eq ' + name
+        max_result = limit if limit is not None and limit < 500 else 500
+        response = (self.provider
+                        .gce_compute.disks()
+                        .list(project=self.provider.project_name,
+                              zone=self.provider.default_zone,
+                              filter=filtr,
+                              maxResults=max_result,
+                              pageToken=marker).execute())
+        if 'items' not in response:
+            return []
+        gce_vols = [GCEVolume(self.provider, vol)
+                    for vol in response['items']]
+        return ServerPagedResultList(len(gce_vols) > max_result,
+                                     response.get('nextPageToken'),
+                                     False, data=gce_vols)
+
+    def list(self, limit=None, marker=None):
+        """
+        List all volumes.
+
+        limit: The maximum number of volumes to return. The returned
+               ResultList's is_truncated property can be used to determine
+               whether more records are available.
+        """
+        # For GCE API, Acceptable values are 0 to 500, inclusive.
+        # (Default: 500).
+        max_result = limit if limit is not None and limit < 500 else 500
+        response = (self.provider
+                        .gce_compute.disks()
+                        .list(project=self.provider.project_name,
+                              zone=self.provider.default_zone,
+                              maxResults=max_result,
+                              pageToken=marker).execute())
+        if 'items' not in response:
+            return []
+        gce_vols = [GCEVolume(self.provider, vol)
+                    for vol in response['items']]
+        return ServerPagedResultList(len(gce_vols) > max_result,
+                                     response.get('nextPageToken'),
+                                     False, data=gce_vols)
+
+    def create(self, name, size, zone, snapshot=None, description=None):
+        """
+        Creates a new volume.
+
+        Argument `name` must be 1-63 characters long, and comply with RFC1035.
+        Specifically, the name must be 1-63 characters long and match the
+        regular expression [a-z]([-a-z0-9]*[a-z0-9])? which means the first
+        character must be a lowercase letter, and all following characters must
+        be a dash, lowercase letter, or digit, except the last character, which
+        cannot be a dash.
+        """
+        zone_name = zone.name if isinstance(zone, PlacementZone) else zone
+        snapshot_id = snapshot.id if isinstance(
+            snapshot, GCESnapshot) and snapshot else snapshot
+        disk_body = {
+            'name': name,
+            'sizeGb': size,
+            'type': 'zones/{0}/diskTypes/{1}'.format(zone_name, 'pd-standard'),
+            'sourceSnapshot': snapshot_id,
+            'description': description,
+        }
+        operation = (self.provider.gce_compute.disks()
+                         .insert(
+                             project=self._provider.project_name,
+                             zone=zone_name,
+                             body=disk_body).execute())
+        return self.get(operation.get('targetLink'))
+
+
+class GCESnapshotService(BaseSnapshotService):
+
+    def __init__(self, provider):
+        super(GCESnapshotService, self).__init__(provider)
+
+    def get(self, snapshot_id):
+        """
+        Returns a snapshot given its id.
+        """
+        try:
+            response = self.provider.get_gce_resource_data(snapshot_id)
+            if response:
+                return GCESnapshot(self.provider, response)
+        except googleapiclient.errors.HttpError as http_error:
+            # If the volume is not found, the API will raise
+            # googleapiclient.errors.HttpError.
+            cb.log.warning(
+                "googleapiclient.errors.HttpError: {0}".format(http_error))
+        return None
+
+    def find(self, name, limit=None, marker=None):
+        """
+        Searches for a snapshot by a given list of attributes.
+        """
+        filtr = 'name eq ' + name
+        max_result = limit if limit is not None and limit < 500 else 500
+        response = (self.provider
+                        .gce_compute.snapshots()
+                        .list(project=self.provider.project_name,
+                              filter=filtr,
+                              maxResults=max_result,
+                              pageToken=marker).execute())
+        if 'items' not in response:
+            return []
+        snapshots = [GCESnapshot(self.provider, snapshot)
+                    for snapshot in response['items']]
+        return ServerPagedResultList(len(snapshots) > max_result,
+                                     response.get('nextPageToken'),
+                                     False, data=snapshots)
+
+    def list(self, limit=None, marker=None):
+        """
+        List all snapshots.
+        """
+        max_result = limit if limit is not None and limit < 500 else 500
+        response = (self.provider
+                        .gce_compute.snapshots()
+                        .list(project=self.provider.project_name,
+                              maxResults=max_result,
+                              pageToken=marker).execute())
+        if 'items' not in response:
+            return []
+        snapshots = [GCESnapshot(self.provider, snapshot)
+                    for snapshot in response['items']]
+        return ServerPagedResultList(len(snapshots) > max_result,
+                                     response.get('nextPageToken'),
+                                     False, data=snapshots)
+
+    def create(self, name, volume, description=None):
+        """
+        Creates a new snapshot of a given volume.
+        """
+        volume_name = volume.name if isinstance(volume, GCEVolume) else volume
+        snapshot_body = {
+            "name": name,
+            "description": description
+        }
+        operation = (self.provider
+                         .gce_compute.disks()
+                         .createSnapshot(
+                             project=self.provider.project_name,
+                             zone=self.provider.default_zone,
+                             disk=volume_name, body=snapshot_body).execute())
+        if 'zone' not in operation:
+            return None
+        gce_zone = self.provider.get_gce_resource_data(operation['zone'])
+        self.provider.wait_for_operation(operation, zone=gce_zone.get('name'))
+        snapshots = self.provider.block_store.snapshots.find(name=name)
+        if snapshots:
+            return snapshots[0]
+        else:
+            return None