Explorar el Código

Type the base/ layer under full mypy strict

Annotate every callable in cloudbridge/base/ (resources, services,
subservices, provider, helpers, middleware). base orchestrates through the
already-typed interface layer and never touches the cloud SDKs directly, so it
is held to the SAME full strict bar as the public API (providers, which DO read
from untyped SDK objects, will get a more lenient pragmatic tier later).

mypy config: base.* is no longer exempt -- it falls under the global strict
baseline. providers.* stays exempt (ignore_errors) until typed.

Notable points (all verified green by mypy strict + the mock test suite, and
behaviour-preserving unless noted):
- The pagination helpers are now generic: BaseResultList[T], Server/Client
  PagedResultList[T], BasePageableObjectMixin[T]; element type propagates to
  data()/__iter__()/list(). Each base service subscribes the mixin with its
  resource type (e.g. BasePageableObjectMixin[Instance]).
- _upload_single_shot() and BucketObject.bucket were always called by base but
  never declared; added as explicit raising stubs (providers override them) to
  make the implicit abstract contract type-visible. No runtime change.
- BaseSubnetSubService.get now guards `if sn and ...` so a missing subnet
  returns None per its documented `-> Subnet | None` contract instead of
  raising AttributeError (forced by the optional type).
- 27 `# type: ignore` mark genuine interface/implementation contract gaps:
  18 [override] where interface docstrings declared bool (e.g. delete/
  wait_till_ready) but implementations return None, and [attr-defined] for
  provider-internal members (e.g. _bucket_objects) not on the public
  interfaces. Candidate for a follow-up that reconciles the interface return
  types.
Nuwan Goonasekera hace 1 día
padre
commit
ee13146584

+ 22 - 11
cloudbridge/base/helpers.py

@@ -3,7 +3,12 @@ import functools
 import logging
 import os
 import re
+from collections.abc import Callable
+from collections.abc import Iterator
 from contextlib import contextmanager
+from typing import Any
+from typing import TypeVar
+from typing import cast
 
 from cryptography.hazmat.backends import default_backend
 from cryptography.hazmat.primitives import serialization as crypt_serialization
@@ -17,8 +22,11 @@ from ..interfaces.exceptions import InvalidParamException
 
 log = logging.getLogger(__name__)
 
+T = TypeVar("T")
+F = TypeVar("F", bound=Callable[..., Any])
 
-def generate_key_pair():
+
+def generate_key_pair() -> tuple[str, str]:
     """
     This method generates a keypair and returns it as a tuple
     of (public, private) keys.
@@ -38,7 +46,8 @@ def generate_key_pair():
     return public_key, private_key
 
 
-def filter_by(prop_name, kwargs, objs):
+def filter_by(prop_name: str, kwargs: dict[str, Any],
+              objs: list[T]) -> list[T]:
     """
     Utility method for filtering a list of objects by a property.
     If the given property has a non empty value in kwargs, then
@@ -60,7 +69,8 @@ def filter_by(prop_name, kwargs, objs):
         return objs
 
 
-def generic_find(filter_names, kwargs, objs):
+def generic_find(filter_names: list[str], kwargs: dict[str, Any],
+                 objs: list[T]) -> list[T]:
     """
     Utility method for filtering a list of objects by a list of filters.
     """
@@ -78,7 +88,7 @@ def generic_find(filter_names, kwargs, objs):
 
 
 @contextmanager
-def cleanup_action(cleanup_func):
+def cleanup_action(cleanup_func: Callable[[], object]) -> Iterator[None]:
     """
     Context manager to carry out a given
     cleanup action after carrying out a set
@@ -109,7 +119,7 @@ def cleanup_action(cleanup_func):
         log.exception("Error during exception cleanup: ")
 
 
-def get_env(varname, default_value=None):
+def get_env(varname: str, default_value: Any = None) -> Any:
     """
     Return the value of the environment variable or default_value.
 
@@ -128,17 +138,18 @@ def get_env(varname, default_value=None):
 # Alias deprecation decorator, following:
 # https://stackoverflow.com/questions/49802412/
 # how-to-implement-deprecation-in-python-with-argument-alias
-def deprecated_alias(**aliases):
-    def deco(f):
+def deprecated_alias(**aliases: str) -> Callable[[F], F]:
+    def deco(f: F) -> F:
         @functools.wraps(f)
-        def wrapper(*args, **kwargs):
+        def wrapper(*args: Any, **kwargs: Any) -> Any:
             rename_kwargs(f.__name__, kwargs, aliases)
             return f(*args, **kwargs)
-        return wrapper
+        return cast(F, wrapper)
     return deco
 
 
-def rename_kwargs(func_name, kwargs, aliases):
+def rename_kwargs(func_name: str, kwargs: dict[str, Any],
+                  aliases: dict[str, str]) -> None:
     for alias, new in aliases.items():
         if alias in kwargs:
             if new in kwargs:
@@ -157,7 +168,7 @@ def rename_kwargs(func_name, kwargs, aliases):
 NON_ALPHA_NUM = re.compile(r"[^A-Za-z0-9]+")
 
 
-def to_resource_name(value, replace_with="-"):
+def to_resource_name(value: str, replace_with: str = "-") -> str:
     """
     Converts a given string to a valid resource name by stripping
     all characters that are not alphanumeric.

+ 7 - 3
cloudbridge/base/middleware.py

@@ -1,4 +1,5 @@
 import logging
+from typing import Any
 
 from pyeventsystem.middleware import dispatch as pyevent_dispatch
 from pyeventsystem.middleware import intercept
@@ -19,12 +20,14 @@ class EventDebugLoggingMiddleware(object):
     access keys.
     """
     @observe(event_pattern="*", priority=100)
-    def pre_log_event(self, event_args, *args, **kwargs):
+    def pre_log_event(self, event_args: dict[str, Any],
+                      *args: Any, **kwargs: Any) -> None:
         log.debug("Event: {0}, args: {1} kwargs: {2}".format(
             event_args.get("event"), args, kwargs))
 
     @observe(event_pattern="*", priority=4900)
-    def post_log_event(self, event_args, *args, **kwargs):
+    def post_log_event(self, event_args: dict[str, Any],
+                       *args: Any, **kwargs: Any) -> None:
         log.debug("Event: {0}, result: {1}".format(
             event_args.get("event"), event_args.get("result")))
 
@@ -34,7 +37,8 @@ class ExceptionWrappingMiddleware(object):
     Wraps all unhandled exceptions in cloudbridge exceptions.
     """
     @intercept(event_pattern="*", priority=1050)
-    def wrap_exception(self, event_args, *args, **kwargs):
+    def wrap_exception(self, event_args: dict[str, Any],
+                       *args: Any, **kwargs: Any) -> Any:
         next_handler = event_args.pop("next_handler")
         if not next_handler:
             return

+ 41 - 26
cloudbridge/base/provider.py

@@ -5,13 +5,18 @@ import logging
 import os
 from configparser import ConfigParser
 from os.path import expanduser
+from typing import Any
+from typing import cast
 
+from pyeventsystem.middleware import MiddlewareManager
 from pyeventsystem.middleware import SimpleMiddlewareManager
 
 from ..base.middleware import ExceptionWrappingMiddleware
 from ..interfaces import CloudProvider
 from ..interfaces.exceptions import ProviderConnectionException
 from ..interfaces.resources import Configuration
+from ..interfaces.resources import PlacementZone
+from ..interfaces.resources import Region
 
 log = logging.getLogger(__name__)
 
@@ -28,11 +33,11 @@ CloudBridgeConfigLocations.append(UserConfigPath)
 
 class BaseConfiguration(Configuration):
 
-    def __init__(self, user_config):
+    def __init__(self, user_config: dict[str, Any]) -> None:
         self.update(user_config)
 
     @property
-    def default_result_limit(self):
+    def default_result_limit(self) -> int:
         """
         Get the maximum number of results to return for a
         list method
@@ -42,28 +47,30 @@ class BaseConfiguration(Configuration):
         """
         log.debug("Maximum number of results for list methods %s",
                   DEFAULT_RESULT_LIMIT)
-        return self.get('default_result_limit', DEFAULT_RESULT_LIMIT)
+        return cast(int, self.get('default_result_limit', DEFAULT_RESULT_LIMIT))
 
     @property
-    def default_wait_timeout(self):
+    def default_wait_timeout(self) -> int:
         """
         Gets the default wait timeout for LifeCycleObjects.
         """
         log.debug("Default wait timeout for LifeCycleObjects %s",
                   DEFAULT_WAIT_TIMEOUT)
-        return self.get('default_wait_timeout', DEFAULT_WAIT_TIMEOUT)
+        return cast(int, self.get('default_wait_timeout',
+                                  DEFAULT_WAIT_TIMEOUT))
 
     @property
-    def default_wait_interval(self):
+    def default_wait_interval(self) -> int:
         """
         Gets the default wait interval for LifeCycleObjects.
         """
         log.debug("Default wait interfal for LifeCycleObjects %s",
                   DEFAULT_WAIT_INTERVAL)
-        return self.get('default_wait_interval', DEFAULT_WAIT_INTERVAL)
+        return cast(int, self.get('default_wait_interval',
+                                  DEFAULT_WAIT_INTERVAL))
 
     @property
-    def debug_mode(self):
+    def debug_mode(self) -> bool:
         """
         A flag indicating whether CloudBridge is in debug mode. Setting
         this to True will cause the underlying provider's debug
@@ -75,59 +82,66 @@ class BaseConfiguration(Configuration):
         :rtype: ``bool``
         :return: Whether debug mode is on.
         """
-        return self.get('cb_debug', os.environ.get('CB_DEBUG', False))
+        return cast(bool, self.get('cb_debug',
+                                   os.environ.get('CB_DEBUG', False)))
 
 
 class BaseCloudProvider(CloudProvider):
-    def __init__(self, config):
+
+    PROVIDER_ID: str
+
+    def __init__(self, config: dict[str, Any]) -> None:
         self._config = BaseConfiguration(config)
         self._config_parser = ConfigParser()
         self._config_parser.read(CloudBridgeConfigLocations)
         self._middleware = SimpleMiddlewareManager()
         self.add_required_middleware()
-        self._region_name = None
-        self._zone_name = None
+        self._region_name: str | None = None
+        self._zone_name: str | None = None
 
     @property
-    def region_name(self):
-        return self._region_name
+    def region_name(self) -> str:
+        return cast(str, self._region_name)
 
     @property
-    def zone_name(self):
+    def zone_name(self) -> str | None:
         if not self._zone_name:
             region = self.compute.regions.current
-            zone = region.default_zone
+            # ``default_zone`` is provided by the concrete Region
+            # implementation rather than the public Region interface.
+            zone = cast("PlacementZone | None",
+                        getattr(cast(Region, region), 'default_zone'))
             self._zone_name = zone.name if zone else None
             return self._zone_name
         else:
             try:
                 zone_dict = ast.literal_eval(self._zone_name)
                 if isinstance(zone_dict, dict):
-                    return zone_dict
+                    return cast("str | None", zone_dict)
             except (ValueError, SyntaxError):
                 pass
             return self._zone_name
 
     @property
-    def config(self):
+    def config(self) -> Configuration:
         return self._config
 
     @property
-    def name(self):
+    def name(self) -> str:
         return str(self.__class__.__name__)
 
     @property
-    def middleware(self):
+    def middleware(self) -> MiddlewareManager:
         return self._middleware
 
-    def add_required_middleware(self):
+    def add_required_middleware(self) -> None:
         """
         Adds common middleware that is essential for cloudbridge to function.
         Any other extra middleware can be added through the provider factory.
         """
         self.middleware.add(ExceptionWrappingMiddleware())
 
-    def authenticate(self):
+    def authenticate(self) -> bool:
         """
         A basic implementation which simply runs a low impact command to
         check whether cloud credentials work. Providers should override with
@@ -142,7 +156,7 @@ class BaseCloudProvider(CloudProvider):
             raise ProviderConnectionException(
                 "Authentication with cloud provider failed: %s" % (e,))
 
-    def clone(self, zone=None):
+    def clone(self, zone: PlacementZone | None = None) -> CloudProvider:
         cloned_config = self.config.copy()
         cloned_provider = self.__class__(cloned_config)
         if zone:
@@ -150,11 +164,11 @@ class BaseCloudProvider(CloudProvider):
             cloned_provider._zone_name = zone.name
         return cloned_provider
 
-    def _deepgetattr(self, obj, attr):
+    def _deepgetattr(self, obj: object, attr: str) -> Any:
         """Recurses through an attribute chain to get the ultimate value."""
         return functools.reduce(getattr, attr.split('.'), obj)
 
-    def has_service(self, service_type):
+    def has_service(self, service_type: str) -> bool:
         """
         Checks whether this provider supports a given service.
 
@@ -178,7 +192,8 @@ class BaseCloudProvider(CloudProvider):
                  service_type)
         return False
 
-    def _get_config_value(self, key, default_value=None):
+    def _get_config_value(self, key: str,
+                          default_value: Any = None) -> Any:
         """
         A convenience method to extract a configuration value.
 

La diferencia del archivo ha sido suprimido porque es demasiado grande
+ 258 - 143
cloudbridge/base/resources.py


+ 117 - 75
cloudbridge/base/services.py

@@ -2,10 +2,30 @@
 Base implementation for services available through a provider
 """
 import logging
+from typing import Any
+from typing import cast
 
 from cloudbridge.interfaces.exceptions import InvalidParamException
+from cloudbridge.interfaces.provider import CloudProvider
+from cloudbridge.interfaces.resources import Bucket
+from cloudbridge.interfaces.resources import DnsRecord
 from cloudbridge.interfaces.resources import DnsRecordType
+from cloudbridge.interfaces.resources import DnsZone
+from cloudbridge.interfaces.resources import FloatingIP
+from cloudbridge.interfaces.resources import Gateway
+from cloudbridge.interfaces.resources import Instance
+from cloudbridge.interfaces.resources import KeyPair
+from cloudbridge.interfaces.resources import MachineImage
 from cloudbridge.interfaces.resources import Network
+from cloudbridge.interfaces.resources import Region
+from cloudbridge.interfaces.resources import ResultList
+from cloudbridge.interfaces.resources import Router
+from cloudbridge.interfaces.resources import Snapshot
+from cloudbridge.interfaces.resources import Subnet
+from cloudbridge.interfaces.resources import VMFirewall
+from cloudbridge.interfaces.resources import VMFirewallRule
+from cloudbridge.interfaces.resources import VMType
+from cloudbridge.interfaces.resources import Volume
 from cloudbridge.interfaces.services import BucketObjectService
 from cloudbridge.interfaces.services import BucketService
 from cloudbridge.interfaces.services import CloudService
@@ -46,46 +66,47 @@ class BaseCloudService(CloudService):
 
     STANDARD_EVENT_PRIORITY = 2500
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         self._service_event_pattern = "provider"
         self._provider = provider
         # discover and register all middleware
         provider.middleware.add(self)
 
     @property
-    def provider(self):
+    def provider(self) -> CloudProvider:
         return self._provider
 
     @property
-    def events(self):
+    def events(self) -> Any:
         return self._provider.middleware.events
 
 
 class BaseSecurityService(SecurityService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseSecurityService, self).__init__(provider)
 
 
 class BaseKeyPairService(
-        BasePageableObjectMixin, KeyPairService, BaseCloudService):
+        BasePageableObjectMixin[KeyPair], KeyPairService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseKeyPairService, self).__init__(provider)
         self._service_event_pattern += ".security.key_pairs"
 
 
 class BaseVMFirewallService(
-        BasePageableObjectMixin, VMFirewallService, BaseCloudService):
+        BasePageableObjectMixin[VMFirewall], VMFirewallService,
+        BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseVMFirewallService, self).__init__(provider)
         self._service_event_pattern += ".security.vm_firewalls"
 
     @dispatch(event="provider.security.vm_firewalls.find",
               priority=BaseCloudService.STANDARD_EVENT_PRIORITY)
-    def find(self, **kwargs):
-        obj_list = self
+    def find(self, **kwargs: Any) -> ResultList[VMFirewall]:
+        obj_list = list(self)
         filters = ['label']
         matches = cb_helpers.generic_find(filters, kwargs, obj_list)
 
@@ -99,21 +120,26 @@ class BaseVMFirewallService(
                                      matches if matches else [])
 
 
-class BaseVMFirewallRuleService(BasePageableObjectMixin,
-                                VMFirewallRuleService,
-                                BaseCloudService):
+# The pageable mixin's list(limit, marker) intentionally differs from this
+# service's list(firewall, limit, marker); the mixin is reused only for its
+# iteration helpers, so the signature clash is expected.
+class BaseVMFirewallRuleService(  # type: ignore[misc]
+        BasePageableObjectMixin[VMFirewallRule],
+        VMFirewallRuleService,
+        BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseVMFirewallRuleService, self).__init__(provider)
         self._provider = provider
 
     @property
-    def provider(self):
+    def provider(self) -> CloudProvider:
         return self._provider
 
     @dispatch(event="provider.security.vm_firewall_rules.get",
               priority=BaseCloudService.STANDARD_EVENT_PRIORITY)
-    def get(self, firewall, rule_id):
+    def get(self, firewall: VMFirewall,
+            rule_id: str) -> VMFirewallRule | None:
         matches = [rule for rule in firewall.rules if rule.id == rule_id]
         if matches:
             return matches[0]
@@ -122,8 +148,9 @@ class BaseVMFirewallRuleService(BasePageableObjectMixin,
 
     @dispatch(event="provider.security.vm_firewall_rules.find",
               priority=BaseCloudService.STANDARD_EVENT_PRIORITY)
-    def find(self, firewall, **kwargs):
-        obj_list = firewall.rules
+    def find(self, firewall: VMFirewall,
+             **kwargs: Any) -> ResultList[VMFirewallRule]:
+        obj_list = list(firewall.rules)
         filters = ['name', 'direction', 'protocol', 'from_port', 'to_port',
                    'cidr', 'src_dest_fw', 'src_dest_fw_id']
         matches = cb_helpers.generic_find(filters, kwargs, obj_list)
@@ -132,30 +159,30 @@ class BaseVMFirewallRuleService(BasePageableObjectMixin,
 
 class BaseStorageService(StorageService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseStorageService, self).__init__(provider)
 
 
 class BaseVolumeService(
-        BasePageableObjectMixin, VolumeService, BaseCloudService):
+        BasePageableObjectMixin[Volume], VolumeService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseVolumeService, self).__init__(provider)
         self._service_event_pattern += ".storage.volumes"
 
 
 class BaseSnapshotService(
-        BasePageableObjectMixin, SnapshotService, BaseCloudService):
+        BasePageableObjectMixin[Snapshot], SnapshotService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseSnapshotService, self).__init__(provider)
         self._service_event_pattern += ".storage.snapshots"
 
 
 class BaseBucketService(
-        BasePageableObjectMixin, BucketService, BaseCloudService):
+        BasePageableObjectMixin[Bucket], BucketService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseBucketService, self).__init__(provider)
         self._service_event_pattern += ".storage.buckets"
 
@@ -163,8 +190,8 @@ class BaseBucketService(
     # provider-specific querying for find method
     @dispatch(event="provider.storage.buckets.find",
               priority=BaseCloudService.STANDARD_EVENT_PRIORITY)
-    def find(self, **kwargs):
-        obj_list = self
+    def find(self, **kwargs: Any) -> ResultList[Bucket]:
+        obj_list = list(self)
         filters = ['name']
         matches = cb_helpers.generic_find(filters, kwargs, obj_list)
 
@@ -180,67 +207,67 @@ class BaseBucketService(
 
 class BaseBucketObjectService(BucketObjectService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseBucketObjectService, self).__init__(provider)
         self._service_event_pattern += ".storage._bucket_objects"
-        self._bucket = None
+        self._bucket: Bucket | None = None
 
 
 class BaseComputeService(ComputeService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseComputeService, self).__init__(provider)
 
 
 class BaseImageService(
-        BasePageableObjectMixin, ImageService, BaseCloudService):
+        BasePageableObjectMixin[MachineImage], ImageService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseImageService, self).__init__(provider)
         self._service_event_pattern += ".compute.images"
 
 
 class BaseInstanceService(
-        BasePageableObjectMixin, InstanceService, BaseCloudService):
+        BasePageableObjectMixin[Instance], InstanceService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseInstanceService, self).__init__(provider)
         self._service_event_pattern += ".compute.instances"
 
 
 class BaseVMTypeService(
-        BasePageableObjectMixin, VMTypeService, BaseCloudService):
+        BasePageableObjectMixin[VMType], VMTypeService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseVMTypeService, self).__init__(provider)
         self._service_event_pattern += ".compute.vm_types"
 
     @dispatch(event="provider.compute.vm_types.get",
               priority=BaseCloudService.STANDARD_EVENT_PRIORITY)
-    def get(self, vm_type_id):
+    def get(self, vm_type_id: str) -> VMType | None:
         vm_type = (t for t in self if t.id == vm_type_id)
         return next(vm_type, None)
 
     @dispatch(event="provider.compute.vm_types.find",
               priority=BaseCloudService.STANDARD_EVENT_PRIORITY)
-    def find(self, **kwargs):
-        obj_list = self
+    def find(self, **kwargs: Any) -> ResultList[VMType]:
+        obj_list = list(self)
         filters = ['name']
         matches = cb_helpers.generic_find(filters, kwargs, obj_list)
         return ClientPagedResultList(self._provider, list(matches))
 
 
 class BaseRegionService(
-        BasePageableObjectMixin, RegionService, BaseCloudService):
+        BasePageableObjectMixin[Region], RegionService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseRegionService, self).__init__(provider)
         self._service_event_pattern += ".compute.regions"
 
     @dispatch(event="provider.compute.regions.find",
               priority=BaseCloudService.STANDARD_EVENT_PRIORITY)
-    def find(self, **kwargs):
-        obj_list = self
+    def find(self, **kwargs: Any) -> ResultList[Region]:
+        obj_list = list(self)
         filters = ['name']
         matches = cb_helpers.generic_find(filters, kwargs, obj_list)
         return ClientPagedResultList(self._provider, list(matches))
@@ -248,23 +275,27 @@ class BaseRegionService(
 
 class BaseNetworkingService(NetworkingService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseNetworkingService, self).__init__(provider)
 
 
 class BaseNetworkService(
-        BasePageableObjectMixin, NetworkService, BaseCloudService):
+        BasePageableObjectMixin[Network], NetworkService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseNetworkService, self).__init__(provider)
         self._service_event_pattern += ".networking.networks"
 
     @property
-    def subnets(self):
-        return [subnet for subnet in self.provider.subnets
-                if subnet.network_id == self.id]
-
-    def get_or_create_default(self):
+    def subnets(self) -> list[Subnet]:  # type: ignore[override]
+        # NOTE: this base implementation is a stub that every provider
+        # overrides; it references attributes that do not exist on the
+        # service, so the accesses are typed through ``Any``.
+        this: Any = self
+        return [subnet for subnet in this.provider.subnets
+                if subnet.network_id == this.id]
+
+    def get_or_create_default(self) -> Network:
         networks = self.provider.networking.networks.find(
             label=BaseNetwork.CB_DEFAULT_NETWORK_LABEL)
 
@@ -278,8 +309,8 @@ class BaseNetworkService(
 
     @dispatch(event="provider.networking.networks.find",
               priority=BaseCloudService.STANDARD_EVENT_PRIORITY)
-    def find(self, **kwargs):
-        obj_list = self
+    def find(self, **kwargs: Any) -> ResultList[Network]:
+        obj_list = list(self)
         filters = ['label']
         matches = cb_helpers.generic_find(filters, kwargs, obj_list)
 
@@ -294,15 +325,17 @@ class BaseNetworkService(
 
 
 class BaseSubnetService(
-        BasePageableObjectMixin, SubnetService, BaseCloudService):
+        BasePageableObjectMixin[Subnet], SubnetService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseSubnetService, self).__init__(provider)
         self._service_event_pattern += ".networking.subnets"
 
     @dispatch(event="provider.networking.subnets.find",
               priority=BaseCloudService.STANDARD_EVENT_PRIORITY)
-    def find(self, network=None, **kwargs):
+    def find(self, network: Network | None = None,
+             **kwargs: Any) -> ResultList[Subnet]:
+        obj_list: Any
         if not network:
             obj_list = self
         else:
@@ -311,27 +344,30 @@ class BaseSubnetService(
         matches = cb_helpers.generic_find(filters, kwargs, obj_list)
         return ClientPagedResultList(self._provider, list(matches))
 
-    def get_or_create_default(self):
+    def get_or_create_default(self) -> Subnet:
         # Look for a CB-default subnet
-        matches = self.find(label=BaseSubnet.CB_DEFAULT_SUBNET_LABEL)
+        matches: ResultList[Subnet] = self.find(
+            label=BaseSubnet.CB_DEFAULT_SUBNET_LABEL)
         if matches:
             return matches[0]
 
         # No provider-default Subnet exists, try to create it (net + subnets)
-        network = self.provider.networking.networks.get_or_create_default()
+        networks = cast(BaseNetworkService,
+                        self.provider.networking.networks)
+        network = networks.get_or_create_default()
         subnet = self.create(BaseSubnet.CB_DEFAULT_SUBNET_LABEL, network,
                              BaseSubnet.CB_DEFAULT_SUBNET_IPV4RANGE)
         return subnet
 
 
 class BaseRouterService(
-        BasePageableObjectMixin, RouterService, BaseCloudService):
+        BasePageableObjectMixin[Router], RouterService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseRouterService, self).__init__(provider)
         self._service_event_pattern += ".networking.routers"
 
-    def get_or_create_default(self, network):
+    def get_or_create_default(self, network: Network | str) -> Router:
         net_id = network.id if isinstance(network, Network) else network
         routers = self.provider.networking.routers.find(
             label=BaseRouter.CB_DEFAULT_ROUTER_LABEL)
@@ -345,19 +381,20 @@ class BaseRouterService(
 
 class BaseGatewayService(GatewayService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseGatewayService, self).__init__(provider)
 
 
 class BaseFloatingIPService(FloatingIPService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseFloatingIPService, self).__init__(provider)
 
     @dispatch(event="provider.networking.floating_ips.find",
               priority=BaseCloudService.STANDARD_EVENT_PRIORITY)
-    def find(self, gateway, **kwargs):
-        obj_list = gateway.floating_ips
+    def find(self, gateway: Gateway,
+             **kwargs: Any) -> ResultList[FloatingIP]:
+        obj_list = list(gateway.floating_ips)
         filters = ['name', 'public_ip']
         matches = cb_helpers.generic_find(filters, kwargs, obj_list)
         return ClientPagedResultList(self._provider, list(matches))
@@ -365,31 +402,36 @@ class BaseFloatingIPService(FloatingIPService, BaseCloudService):
 
 class BaseDnsService(DnsService, BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseDnsService, self).__init__(provider)
 
 
-class BaseDnsZoneService(BasePageableObjectMixin, DnsZoneService,
+class BaseDnsZoneService(BasePageableObjectMixin[DnsZone], DnsZoneService,
                          BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseDnsZoneService, self).__init__(provider)
 
-    def _get_fully_qualified_dns(self, name):
+    def _get_fully_qualified_dns(self, name: str) -> str:
         # Add a trailing dot to fully qualify
         return name + '.' if not name.endswith('.') else name
 
 
-class BaseDnsRecordService(BasePageableObjectMixin, DnsRecordService,
-                           BaseCloudService):
+# The pageable mixin's list(limit, marker) intentionally differs from this
+# service's list(dns_zone, limit, marker); the mixin is reused only for its
+# iteration helpers, so the signature clash is expected.
+class BaseDnsRecordService(  # type: ignore[misc]
+        BasePageableObjectMixin[DnsRecord],
+        DnsRecordService,
+        BaseCloudService):
 
-    def __init__(self, provider):
+    def __init__(self, provider: CloudProvider) -> None:
         super(BaseDnsRecordService, self).__init__(provider)
 
-    def _get_fully_qualified_dns(self, name):
+    def _get_fully_qualified_dns(self, name: str) -> str:
         # Add a trailing dot to fully qualify
         return name + '.' if not name.endswith('.') else name
 
-    def _standardize_record(self, value, type):
+    def _standardize_record(self, value: str, type: str) -> str:
         return (self._get_fully_qualified_dns(value)
                 if type in (DnsRecordType.CNAME, DnsRecordType.MX) else value)

+ 101 - 58
cloudbridge/base/subservices.py

@@ -1,5 +1,23 @@
+import builtins
 import logging
-
+from typing import Any
+from typing import cast
+
+from cloudbridge.interfaces.provider import CloudProvider
+from cloudbridge.interfaces.resources import Bucket
+from cloudbridge.interfaces.resources import BucketObject
+from cloudbridge.interfaces.resources import DnsRecord
+from cloudbridge.interfaces.resources import DnsZone
+from cloudbridge.interfaces.resources import FloatingIP
+from cloudbridge.interfaces.resources import Gateway
+from cloudbridge.interfaces.resources import InternetGateway
+from cloudbridge.interfaces.resources import Network
+from cloudbridge.interfaces.resources import ResultList
+from cloudbridge.interfaces.resources import Subnet
+from cloudbridge.interfaces.resources import TrafficDirection
+from cloudbridge.interfaces.resources import VMFirewall
+from cloudbridge.interfaces.resources import VMFirewallRule
+from cloudbridge.interfaces.services import BucketObjectService
 from cloudbridge.interfaces.subservices import BucketObjectSubService
 from cloudbridge.interfaces.subservices import DnsRecordSubService
 from cloudbridge.interfaces.subservices import FloatingIPSubService
@@ -12,191 +30,216 @@ from .resources import BasePageableObjectMixin
 log = logging.getLogger(__name__)
 
 
-class BaseBucketObjectSubService(BasePageableObjectMixin,
+class BaseBucketObjectSubService(BasePageableObjectMixin[BucketObject],
                                  BucketObjectSubService):
 
-    def __init__(self, provider, bucket):
+    def __init__(self, provider: CloudProvider, bucket: Bucket) -> None:
         self.__provider = provider
         self.bucket = bucket
 
     @property
-    def _provider(self):
+    def _provider(self) -> CloudProvider:
         return self.__provider
 
-    def get(self, name):
-        return self._provider.storage._bucket_objects.get(self.bucket, name)
+    @property
+    def _bucket_objects(self) -> BucketObjectService:
+        # ``_bucket_objects`` is a provider-internal service not declared on
+        # the StorageService interface; reach it through ``Any``.
+        storage: Any = self._provider.storage
+        return cast(BucketObjectService, storage._bucket_objects)
+
+    def get(self, name: str) -> BucketObject | None:
+        return self._bucket_objects.get(self.bucket, name)
 
-    def list(self, limit=None, marker=None, prefix=None):
-        return self._provider.storage._bucket_objects.list(self.bucket, limit,
-                                                           marker, prefix)
+    def list(self, limit: int | None = None, marker: str | None = None,
+             prefix: str | None = None) -> ResultList[BucketObject]:
+        return self._bucket_objects.list(self.bucket, limit=limit,
+                                         marker=marker, prefix=prefix)
 
-    def find(self, **kwargs):
-        return self._provider.storage._bucket_objects.find(self.bucket,
-                                                           **kwargs)
+    def find(self, **kwargs: Any) -> ResultList[BucketObject]:
+        return self._bucket_objects.find(self.bucket, **kwargs)
 
-    def create(self, name):
-        return self._provider.storage._bucket_objects.create(self.bucket, name)
+    def create(self, name: str) -> BucketObject:
+        return self._bucket_objects.create(self.bucket, name)
 
 
-class BaseGatewaySubService(GatewaySubService, BasePageableObjectMixin):
+class BaseGatewaySubService(GatewaySubService,
+                            BasePageableObjectMixin[InternetGateway]):
 
-    def __init__(self, provider, network):
+    def __init__(self, provider: CloudProvider, network: Network) -> None:
         self._network = network
         self.__provider = provider
 
     @property
-    def _provider(self):
+    def _provider(self) -> CloudProvider:
         return self.__provider
 
-    def get_or_create(self):
+    def get_or_create(self) -> InternetGateway:
         return (self._provider.networking
                               ._gateways
                               .get_or_create(self._network))
 
-    def delete(self, gateway):
+    def delete(self, gateway: Gateway) -> None:
         return (self._provider.networking
                               ._gateways
                               .delete(self._network, gateway))
 
-    def list(self, limit=None, marker=None):
+    def list(self, limit: int | None = None,
+             marker: str | None = None) -> ResultList[InternetGateway]:
         return (self._provider.networking
                               ._gateways
                               .list(self._network, limit, marker))
 
 
-class BaseVMFirewallRuleSubService(BasePageableObjectMixin,
+class BaseVMFirewallRuleSubService(BasePageableObjectMixin[VMFirewallRule],
                                    VMFirewallRuleSubService):
 
-    def __init__(self, provider, firewall):
+    def __init__(self, provider: CloudProvider,
+                 firewall: VMFirewall) -> None:
         self.__provider = provider
         self._firewall = firewall
 
     @property
-    def _provider(self):
+    def _provider(self) -> CloudProvider:
         return self.__provider
 
-    def get(self, rule_id):
+    def get(self, rule_id: str) -> VMFirewallRule | None:
         return self._provider.security._vm_firewall_rules.get(self._firewall,
                                                               rule_id)
 
-    def list(self, limit=None, marker=None):
+    def list(self, limit: int | None = None,
+             marker: str | None = None) -> ResultList[VMFirewallRule]:
         return self._provider.security._vm_firewall_rules.list(self._firewall,
                                                                limit, marker)
 
-    def create(self, direction, protocol=None, from_port=None,
-               to_port=None, cidr=None, src_dest_fw=None):
+    def create(self, direction: TrafficDirection, protocol: str | None = None,
+               from_port: int | None = None,
+               to_port: int | None = None,
+               cidr: str | builtins.list[str] | None = None,
+               src_dest_fw: VMFirewall | None = None) -> VMFirewallRule:
         return (self._provider
                     .security
                     ._vm_firewall_rules
                     .create(self._firewall, direction, protocol, from_port,
                             to_port, cidr, src_dest_fw))
 
-    def find(self, **kwargs):
+    def find(self, **kwargs: Any) -> ResultList[VMFirewallRule]:
         return self._provider.security._vm_firewall_rules.find(self._firewall,
                                                                **kwargs)
 
-    def delete(self, rule_id):
+    def delete(self, rule_id: str) -> None:
         return (self._provider
                     .security
                     ._vm_firewall_rules
                     .delete(self._firewall, rule_id))
 
 
-class BaseFloatingIPSubService(FloatingIPSubService, BasePageableObjectMixin):
+class BaseFloatingIPSubService(FloatingIPSubService,
+                               BasePageableObjectMixin[FloatingIP]):
 
-    def __init__(self, provider, gateway):
+    def __init__(self, provider: CloudProvider, gateway: Gateway) -> None:
         self.__provider = provider
         self.gateway = gateway
 
     @property
-    def _provider(self):
+    def _provider(self) -> CloudProvider:
         return self.__provider
 
-    def get(self, fip_id):
+    def get(self, fip_id: str) -> FloatingIP | None:
         return self._provider.networking._floating_ips.get(self.gateway,
                                                            fip_id)
 
-    def list(self, limit=None, marker=None):
+    def list(self, limit: int | None = None,
+             marker: str | None = None) -> ResultList[FloatingIP]:
         return self._provider.networking._floating_ips.list(self.gateway,
                                                             limit, marker)
 
-    def find(self, **kwargs):
+    def find(self, **kwargs: Any) -> ResultList[FloatingIP]:
         return self._provider.networking._floating_ips.find(self.gateway,
                                                             **kwargs)
 
-    def create(self):
+    def create(self) -> FloatingIP:
         return self._provider.networking._floating_ips.create(self.gateway)
 
-    def delete(self, fip):
+    def delete(self, fip: FloatingIP | str) -> None:
         return self._provider.networking._floating_ips.delete(self.gateway,
                                                               fip)
 
 
-class BaseSubnetSubService(SubnetSubService, BasePageableObjectMixin):
+class BaseSubnetSubService(SubnetSubService, BasePageableObjectMixin[Subnet]):
 
-    def __init__(self, provider, network):
+    def __init__(self, provider: CloudProvider, network: Network) -> None:
         self.__provider = provider
         self.network = network
 
     @property
-    def _provider(self):
+    def _provider(self) -> CloudProvider:
         return self.__provider
 
-    def get(self, subnet_id):
+    def get(self, subnet_id: str) -> Subnet | None:
         sn = self._provider.networking.subnets.get(subnet_id)
-        if sn.network_id != self.network.id:
+        if sn and sn.network_id != self.network.id:
             log.warning("The SubnetSubService nested in the network '{}' "
                         "returned subnet '{}' which is attached to another "
                         "network '{}'".format(str(self.network), str(sn),
                                               str(sn.network)))
         return sn
 
-    def list(self, limit=None, marker=None):
+    def list(self, limit: int | None = None,
+             marker: str | None = None) -> ResultList[Subnet]:
         return self._provider.networking.subnets.list(network=self.network,
                                                       limit=limit,
                                                       marker=marker)
 
-    def find(self, **kwargs):
+    def find(self, **kwargs: Any) -> ResultList[Subnet]:
         return self._provider.networking.subnets.find(network=self.network,
                                                       **kwargs)
 
-    def create(self, label, cidr_block):
+    def create(self, label: str, cidr_block: str) -> Subnet:
         return self._provider.networking.subnets.create(label,
                                                         self.network,
                                                         cidr_block)
 
-    def delete(self, subnet):
+    def delete(self, subnet: Subnet | str) -> None:
         return self._provider.networking.subnets.delete(subnet)
 
 
-class BaseDnsRecordSubService(DnsRecordSubService, BasePageableObjectMixin):
+class BaseDnsRecordSubService(DnsRecordSubService,
+                              BasePageableObjectMixin[DnsRecord]):
 
-    def __init__(self, provider, dns_zone):
+    def __init__(self, provider: CloudProvider, dns_zone: DnsZone) -> None:
         self.__provider = provider
         self.dns_zone = dns_zone
 
     @property
-    def _provider(self):
+    def _provider(self) -> CloudProvider:
         return self.__provider
 
-    def get(self, rec_id):
+    def get(self, rec_id: str) -> DnsRecord | None:
         # pylint:disable=protected-access
         return self._provider.dns._records.get(self.dns_zone, rec_id)
 
-    def list(self, limit=None, marker=None):
+    def list(self, limit: int | None = None,
+             marker: str | None = None) -> ResultList[DnsRecord]:
         # pylint:disable=protected-access
         return self._provider.dns._records.list(
             dns_zone=self.dns_zone, limit=limit, marker=marker)
 
-    def find(self, **kwargs):
+    def find(self, **kwargs: Any) -> ResultList[DnsRecord]:
         # pylint:disable=protected-access
-        return self._provider.dns._records.find(
-            dns_zone=self.dns_zone, **kwargs)
-
-    def create(self, name, type, data, ttl=None):
+        # find/delete are provider-internal extensions not declared on the
+        # DnsRecordService interface; reach them through ``Any``.
+        records: Any = self._provider.dns._records
+        return cast("ResultList[DnsRecord]",
+                    records.find(dns_zone=self.dns_zone, **kwargs))
+
+    def create(self, name: str, type: str, data: str,
+               ttl: int | None = None) -> DnsRecord:
         # pylint:disable=protected-access
         return self._provider.dns._records.create(
             self.dns_zone, name, type, data, ttl)
 
-    def delete(self, rec):
-        return self._provider.dns._records.delete(self.dns_zone, rec)
+    def delete(self, rec: DnsRecord | str) -> None:
+        # pylint:disable=protected-access
+        records: Any = self._provider.dns._records
+        records.delete(self.dns_zone, rec)

+ 9 - 3
pyproject.toml

@@ -135,8 +135,14 @@ implicit_reexport = true
 # `deprecation` library's decorators are untyped; don't fail the build on them.
 disallow_untyped_decorators = false
 
-# Gradual-typing exemptions. Remove a module from this list once it is fully
-# annotated; it then falls under the strict baseline above.
+# Not-yet-typed internal modules: stay exempt until annotated, then move them
+# into the pragmatic tier below (or, if fully strict-clean, delete the entry).
 [[tool.mypy.overrides]]
-module = ["cloudbridge.base.*", "cloudbridge.providers.*"]
+module = ["cloudbridge.providers.*"]
 ignore_errors = true
+
+# base/ is held to the FULL strict bar (it orchestrates through the typed
+# interface layer and never touches the cloud SDKs directly), so it falls under
+# the global strict baseline with no override. The providers will get a
+# pragmatic tier (warn_return_any / disallow_untyped_calls off) when they are
+# typed, because they DO read from untyped SDK objects.

Algunos archivos no se mostraron porque demasiados archivos cambiaron en este cambio