Просмотр исходного кода

Fix list limit issues

This commit fixes two bugs:
1- In some places, when we use ServerPagedResultList, we set the is_truncated
   parameter incorrectly. Practically, we are always setting it to False, even
   if the API call to list returns an impartial result.

2- In some places, we are ignoring the fact that the maxResult's default value
   is 500, even if we do not explicitly set it. So, if there are more than 500
   items, we just process the first 500 and ignore the rest.
Ehsan Chiniforooshan 8 лет назад
Родитель
Сommit
ff9da1e85d
2 измененных файлов с 182 добавлено и 98 удалено
  1. 80 44
      cloudbridge/cloud/providers/gce/resources.py
  2. 102 54
      cloudbridge/cloud/providers/gce/services.py

+ 80 - 44
cloudbridge/cloud/providers/gce/resources.py

@@ -26,7 +26,7 @@ from cloudbridge.cloud.base.resources import BaseSecurityGroupRule
 from cloudbridge.cloud.base.resources import BaseSnapshot
 from cloudbridge.cloud.base.resources import BaseSubnet
 from cloudbridge.cloud.base.resources import BaseVolume
-from cloudbridge.cloud.base.resources import ClientPagedResultList
+from cloudbridge.cloud.base.resources import ServerPagedResultList
 from cloudbridge.cloud.interfaces.resources import InstanceState
 from cloudbridge.cloud.interfaces.resources import MachineImageState
 from cloudbridge.cloud.interfaces.resources import RouterState
@@ -411,11 +411,24 @@ class GCEFirewallsDelegate(object):
         """
         Sync the local cache of all firewalls with the server.
         """
-        self._list_response = (self._provider
-                                   .gce_compute
-                                   .firewalls()
-                                   .list(project=self._provider.project_name)
-                                   .execute())
+        self._list_response = None
+        token = None
+        while True:
+            list_response = (self._provider
+                                 .gce_compute
+                                 .firewalls()
+                                 .list(project=self._provider.project_name,
+                                       pageToken=token)
+                                 .execute())
+            if self._list_response is None:
+                self._list_response = list_response
+                if 'items' not in self._list_response:
+                    self._list_response['items'] = []
+            else:
+                self._list_response['items'] += list_response.get('items', [])
+            if 'nextPageToken' not in list_response:
+                return
+            token = list_response['nextPageToken']
 
     def _check_list_in_dict(self, dictionary, field_name, value):
         """
@@ -948,18 +961,24 @@ class GCEInstance(BaseInstance):
         """
         self_url = self._provider.parse_url(self._gce_instance['selfLink'])
         try:
-            response = (self._provider
-                            .gce_compute
-                            .targetInstances()
-                            .list(project=self_url.parameters['project'],
-                                  zone=self_url.parameters['zone'])
-                            .execute())
-            if 'items' not in response:
-                return None
-            for target_instance in response['items']:
-                url = self._provider.parse_url(target_instance['instance'])
-                if url.parameters['instance'] == self.name:
-                    return target_instance
+            token = None
+            while True:
+                response = (self._provider
+                                .gce_compute
+                                .targetInstances()
+                                .list(project=self_url.parameters['project'],
+                                      zone=self_url.parameters['zone'],
+                                      pageToken=token)
+                                .execute())
+                if 'items' not in response:
+                    break
+                for target_instance in response['items']:
+                    url = self._provider.parse_url(target_instance['instance'])
+                    if url.parameters['instance'] == self.name:
+                        return target_instance
+                if 'nextPageToken' not in response:
+                    break
+                token = response['nextPageToken']
         except Exception as e:
             cb.log.warning('Exception while listing target instances: %s', e)
         return None
@@ -1005,17 +1024,21 @@ class GCEInstance(BaseInstance):
                                   .parameters['zone'])
         new_name = target_instance['name']
         new_url = target_instance['selfLink']
+        token = None
         try:
-            response = (self._provider
-                            .gce_compute
-                            .forwardingRules()
-                            .list(project=self._provider.project_name,
-                                  region=ip.region)
-                            .execute())
-            if 'items' not in response:
-                return False
-            for rule in response['items']:
-                if rule['IPAddress'] == ip.public_ip:
+            while True:
+                response = (self._provider
+                                .gce_compute
+                                .forwardingRules()
+                                .list(project=self._provider.project_name,
+                                      region=ip.region,
+                                      pageToken=token)
+                                .execute())
+                if 'items' not in response:
+                    break
+                for rule in response['items']:
+                    if rule['IPAddress'] != ip.public_ip:
+                        continue
                     parsed_target_url = self._provider.parse_url(
                             rule['target'])
                     old_zone = parsed_target_url.parameters['zone']
@@ -1034,6 +1057,9 @@ class GCEInstance(BaseInstance):
                     self._provider.wait_for_operation(response,
                                                       region=ip.region)
                     return True
+                if 'nextPageToken' not in response:
+                    break
+                token = response['nextPageToken']
         except Exception as e:
             cb.log.warning(
                 'Exception while listing/changing forwarding rules: %s', e)
@@ -1073,17 +1099,21 @@ class GCEInstance(BaseInstance):
         zone = (self._provider.parse_url(target_instance['zone'])
                               .parameters['zone'])
         name = target_instance['name']
+        token = None
         try:
-            response = (self._provider
-                            .gce_compute
-                            .forwardingRules()
-                            .list(project=self._provider.project_name,
-                                  region=ip.region)
-                            .execute())
-            if 'items' not in response:
-                return False
-            for rule in response['items']:
-                if rule['IPAddress'] == ip.public_ip:
+            while True:
+                response = (self._provider
+                                .gce_compute
+                                .forwardingRules()
+                                .list(project=self._provider.project_name,
+                                      region=ip.region,
+                                      pageToken=token)
+                                .execute())
+                if 'items' not in response:
+                    return False
+                for rule in response['items']:
+                    if rule['IPAddress'] != ip.public_ip:
+                        continue
                     parsed_target_url = self._provider.parse_url(
                             rule['target'])
                     temp_zone = parsed_target_url.parameters['zone']
@@ -1103,6 +1133,10 @@ class GCEInstance(BaseInstance):
                                     .execute())
                     self._provider.wait_for_operation(response,
                                                       region=ip.region)
+                    return True
+                if 'nextPageToken' not in response:
+                    break
+                token = response['nextPageToken']
         except Exception as e:
             cb.log.warning(
                 'Exception while listing/deleting forwarding rules: %s', e)
@@ -1722,7 +1756,8 @@ class GCSObject(BaseBucketObject):
         return io.BytesIO(self._provider
                               .gcp_storage
                               .objects()
-                              .get_media(bucket=self._obj['bucket'], object=self.name)
+                              .get_media(bucket=self._obj['bucket'],
+                                         object=self.name)
                               .execute())
 
     def upload(self, data):
@@ -1806,10 +1841,11 @@ class GCSBucket(BaseBucket):
                                   maxResults=max_result,
                                   pageToken=marker)
                             .execute())
-            if 'error' in response or 'items' not in response:
-                return []
-            objects = [GCSObject(self._provider, self, obj)
-                       for obj in response['items']]
+            if 'error' in response:
+                return ServerPagedResultList(False, None, False, data=[])
+            objects = []
+            for obj in response.get('items', []):
+                objects.append(GCSObject(self._provider, self, obj))
             if len(objects) > max_result:
                 cb.log.warning('Expected at most %d results; got %d',
                                max_result, len(objects))
@@ -1817,7 +1853,7 @@ class GCSBucket(BaseBucket):
                                          response.get('nextPageToken'),
                                          False, data=objects)
         except:
-            return ServerPagedResults(False, None, False, data=[])
+            return ServerPagedResultList(False, None, False, data=[])
 
     def delete(self, delete_contents=False):
         """

+ 102 - 54
cloudbridge/cloud/providers/gce/services.py

@@ -338,15 +338,22 @@ class GCERegionService(BaseRegionService):
             return None
 
     def list(self, limit=None, marker=None):
+        max_result = limit if limit is not None and limit < 500 else 500
         regions_response = (self.provider
                                 .gce_compute
                                 .regions()
-                                .list(project=self.provider.project_name)
+                                .list(project=self.provider.project_name,
+                                      maxResults=max_result,
+                                      pageToken=marker)
                                 .execute())
         regions = [GCERegion(self.provider, region)
                    for region in regions_response['items']]
-        return ClientPagedResultList(self.provider, regions,
-                                     limit=limit, marker=marker)
+        if len(regions) > max_result:
+            cb.log.warning('Expected at most %d results; got %d',
+                           max_result, len(regions))
+        return ServerPagedResultList('nextPageToken' in regions_response,
+                                     regions_response.get('nextPageToken'),
+                                     False, data=regions)
 
     @property
     def current(self):
@@ -368,18 +375,23 @@ class GCEImageService(BaseImageService):
         self._public_images = []
         for project in GCEImageService._PUBLIC_IMAGE_PROJECTS:
             try:
-                response = (self.provider
-                                .gce_compute
-                                .images()
-                                .list(project=project)
-                                .execute())
+                token = None
+                while True:
+                    response = (self.provider
+                                    .gce_compute
+                                    .images()
+                                    .list(project=project, pageToken=token)
+                                    .execute())
+                    if 'items' in response:
+                        self._public_images.extend(
+                            [GCEMachineImage(self.provider, image) for image
+                             in response['items']])
+                    if 'nextPageToken' not in response:
+                        break
+                    token = response['nextPageToken']
             except googleapiclient.errors.HttpError as http_error:
                 cb.log.warning("googleapiclient.errors.HttpError: {0}".format(
                     http_error))
-            if 'items' in response:
-                self._public_images.extend(
-                    [GCEMachineImage(self.provider, image) for image
-                     in response['items']])
 
     def get(self, image_id):
         """
@@ -428,14 +440,19 @@ class GCEImageService(BaseImageService):
         if (self.provider.project_name not in
                 GCEImageService._PUBLIC_IMAGE_PROJECTS):
             try:
-                response = (self.provider
-                                .gce_compute
-                                .images()
-                                .list(project=self.provider.project_name)
-                                .execute())
-                if 'items' in response:
-                    images = [GCEMachineImage(self.provider, image) for image
-                              in response['items']]
+                token = None
+                while True:
+                    response = (self.provider
+                                    .gce_compute
+                                    .images()
+                                    .list(project=self.provider.project_name,
+                                          pageToken=token)
+                                    .execute())
+                    for image in response.get('items', []):
+                        images.append(GCEMachineImage(self.provider, image))
+                    if 'nextPageToken' not in response:
+                        break
+                    token = response['nextPageToken']
             except googleapiclient.errors.HttpError as http_error:
                 cb.log.warning(
                     "googleapiclient.errors.HttpError: {0}".format(http_error))
@@ -552,7 +569,10 @@ class GCEInstanceService(BaseInstanceService):
                         .execute())
         instances = [GCEInstance(self.provider, inst)
                      for inst in response['items']]
-        return ServerPagedResultList(len(instances) > max_result,
+        if len(instances) > max_result:
+            cb.log.warning('Expected at most %d results; got %d',
+                           max_result, len(instances))
+        return ServerPagedResultList('nextPageToken' in response,
                                      response.get('nextPageToken'),
                                      False, data=instances)
 
@@ -662,16 +682,21 @@ class GCENetworkService(BaseNetworkService):
         if not region:
             region = self.provider.region_name
         try:
-            response = (self.provider
-                            .gce_compute
-                            .addresses()
-                            .list(project=self.provider.project_name,
-                                  region=region)
-                            .execute())
             ips = []
-            if 'items' in response:
-                for ip in response['items']:
+            token = None
+            while True:
+                response = (self.provider
+                                .gce_compute
+                                .addresses()
+                                .list(project=self.provider.project_name,
+                                      region=region,
+                                      pageToken=token)
+                                .execute())
+                for ip in response.get('items', []):
                     ips.append(GCEFloatingIP(self.provider, ip))
+                if 'nextPageToken' not in response:
+                    break
+                token = response['nextPageToken']
             # TODO: if network_id is given, filter out IPs that are assigned to
             # resources in a different network.
             return ips
@@ -704,16 +729,21 @@ class GCENetworkService(BaseNetworkService):
         if not region:
             region = self.provider.region_name
         try:
-            response = (self.provider
-                            .gce_compute
-                            .routers()
-                            .list(project=self.provider.project_name,
-                                  region=region)
-                            .execute())
             routers = []
-            if 'items' in response:
-                for router in response['items']:
+            token = None
+            while True:
+                response = (self.provider
+                                .gce_compute
+                                .routers()
+                                .list(project=self.provider.project_name,
+                                      region=region,
+                                      pageToken=token)
+                                .execute())
+                for router in response.get('items', []):
                     routers.append(GCERouter(self.provider, router))
+                if 'nextPageToken' not in response:
+                    break
+                token = response['nextPageToken']
             return routers
         except:
             return []
@@ -759,16 +789,21 @@ class GCESubnetService(BaseSubnetService):
         if not region:
             region = self.provider.region_name
         try:
-            response = (self.provider
-                            .gce_compute
-                            .subnetworks()
-                            .list(project=self.provider.project_name,
-                                  region=region)
-                            .execute())
             subnets = []
-            if 'items' in response:
-                for subnet in response['items']:
+            token = None
+            while True:
+                response = (self.provider
+                                .gce_compute
+                                .subnetworks()
+                                .list(project=self.provider.project_name,
+                                      region=region,
+                                      pageToken=token)
+                                .execute())
+                for subnet in response.get('items', []):
                     subnets.append(GCESubnet(self.provider, subnet))
+                if 'nextPageToken' not in response:
+                    break
+                token = response['nextPageToken']
             return subnets
         except:
             return []
@@ -873,7 +908,10 @@ class GCEVolumeService(BaseVolumeService):
             return []
         gce_vols = [GCEVolume(self.provider, vol)
                     for vol in response['items']]
-        return ServerPagedResultList(len(gce_vols) > max_result,
+        if len(gce_vols) > max_result:
+            cb.log.warning('Expected at most %d results; got %d',
+                           max_result, len(gce_vols))
+        return ServerPagedResultList('nextPageToken' in response,
                                      response.get('nextPageToken'),
                                      False, data=gce_vols)
 
@@ -900,7 +938,10 @@ class GCEVolumeService(BaseVolumeService):
             return []
         gce_vols = [GCEVolume(self.provider, vol)
                     for vol in response['items']]
-        return ServerPagedResultList(len(gce_vols) > max_result,
+        if len(gce_vols) > max_result:
+            cb.log.warning('Expected at most %d results; got %d',
+                           max_result, len(gce_vols))
+        return ServerPagedResultList('nextPageToken' in response,
                                      response.get('nextPageToken'),
                                      False, data=gce_vols)
 
@@ -974,7 +1015,10 @@ class GCESnapshotService(BaseSnapshotService):
             return []
         snapshots = [GCESnapshot(self.provider, snapshot)
                      for snapshot in response['items']]
-        return ServerPagedResultList(len(snapshots) > max_result,
+        if len(snapshots) > max_result:
+            cb.log.warning('Expected at most %d results; got %d',
+                           max_result, len(snapshots))
+        return ServerPagedResultList('nextPageToken' in response,
                                      response.get('nextPageToken'),
                                      False, data=snapshots)
 
@@ -994,7 +1038,10 @@ class GCESnapshotService(BaseSnapshotService):
             return []
         snapshots = [GCESnapshot(self.provider, snapshot)
                      for snapshot in response['items']]
-        return ServerPagedResultList(len(snapshots) > max_result,
+        if len(snapshots) > max_result:
+            cb.log.warning('Expected at most %d results; got %d',
+                           max_result, len(snapshots))
+        return ServerPagedResultList('nextPageToken' in response,
                                      response.get('nextPageToken'),
                                      False, data=snapshots)
 
@@ -1085,10 +1132,11 @@ class GCSObjectStoreService(BaseObjectStoreService):
                                   maxResults=max_result,
                                   pageToken=marker)
                             .execute())
-            if 'error' in response or 'items' not in response:
-                return []
-            buckets = [GCSBucket(self.provider, bucket)
-                       for bucket in response['items']]
+            if 'error' in response:
+                return ServerPagedResultList(False, None, False, data=[])
+            buckets = []
+            for bucket in response.get('items', []):
+                buckets.append(GCSBucket(self.provider, bucket))
             if len(buckets) > max_result:
                 cb.log.warning('Expected at most %d results; got %d',
                                max_result, len(buckets))
@@ -1096,7 +1144,7 @@ class GCSObjectStoreService(BaseObjectStoreService):
                                          response.get('nextPageToken'),
                                          False, data=buckets)
         except:
-            return ServerPagedResults(False, None, False, data=[])
+            return ServerPagedResultList(False, None, False, data=[])
 
     def create(self, name, location=None):
         """