Răsfoiți Sursa

Firewall dummy rule behaviour

Do not list the dummy rule and do not allow it to be deleted unless
the whole firewall is being deleted.
Ehsan Chiniforooshan 7 ani în urmă
părinte
comite
e1e493ad61
2 a modificat fișierele cu 40 adăugiri și 12 ștergeri
  1. 39 1
      cloudbridge/cloud/providers/gce/resources.py
  2. 1 11
      test/test_security_service.py

+ 39 - 1
cloudbridge/cloud/providers/gce/resources.py

@@ -375,6 +375,8 @@ class GCEFirewallsDelegate(object):
             info['network_name'] = self.network_name(firewall)
             if 'direction' in firewall:
                 info['direction'] = firewall['direction']
+            if 'priority' in firewall:
+                info['priority'] = firewall['priority']
             return info
         return info
 
@@ -508,6 +510,7 @@ class GCEVMFirewall(BaseVMFirewall):
     def delete(self):
         for rule in self._rule_container:
             rule.delete()
+        self._rule_container.dummy_rule.force_delete()
 
     def to_json(self):
         attr = inspect.getmembers(self, lambda a: not(inspect.isroutine(a)))
@@ -530,15 +533,26 @@ class GCEVMFirewallRuleContainer(BaseVMFirewallRuleContainer):
     def __init__(self, firewall):
         super(GCEVMFirewallRuleContainer, self).__init__(
                 firewall.delegate.provider, firewall)
+        self._dummy_rule = None
 
     def list(self, limit=None, marker=None):
         rules = []
         for firewall in self.firewall.delegate.iter_firewalls(
                 self.firewall.name, self.firewall.network.name):
-            rules.append(GCEVMFirewallRule(self.firewall, firewall['id']))
+            rule = GCEVMFirewallRule(self.firewall, firewall['id'])
+            if rule.is_dummy_rule():
+                self._dummy_rule = rule
+            else:
+                rules.append(rule)
         return ClientPagedResultList(self._provider, rules,
                                      limit=limit, marker=marker)
 
+    @property
+    def dummy_rule(self):
+        if not self._dummy_rule:
+            self.list()
+        return self._dummy_rule
+
     @staticmethod
     def to_port_range(from_port, to_port):
         if from_port is not None and to_port is not None:
@@ -660,7 +674,31 @@ class GCEVMFirewallRule(BaseVMFirewallRule):
                 self.firewall.delegate, info['src_dest_tag'],
                 self.firewall.network)
 
+    @property
+    def priority(self):
+        info = self.firewall.delegate.get_firewall_info(self._rule)
+        # The default firewall rule priority, when not specified, is 1000.
+        if info is None or 'priority' not in info:
+            return 1000
+        return info['priority']
+
+    def is_dummy_rule(self):
+        if self.priority != 65534:
+            return False
+        if self.direction != TrafficDirection.OUTBOUND:
+            return False
+        if self.protocol != 'tcp':
+            return False
+        if self.cidr != '0.0.0.0/0':
+            return False
+        return True
+
     def delete(self):
+        if (self.is_dummy_rule()):
+            return
+        self.force_delete()
+
+    def force_delete(self):
         self.firewall.delegate.delete_firewall_id(self._rule)
 
 

+ 1 - 11
test/test_security_service.py

@@ -183,16 +183,6 @@ class CloudSecurityServiceTestCase(ProviderTestBase):
             net, _ = helpers.create_test_network(self.provider, name)
             fw = self.provider.security.vm_firewalls.create(
                 name=name, description=name, network_id=net.id)
-            rules = list(fw.rules)
-            self.assertTrue(
-                # TODO: This should be made consistent across all providers.
-                # Currently, OpenStack creates two rules, one for IPV6 and
-                # another for IPV4
-                len(rules) >= 1, "Expected a single VM firewall rule allowing"
-                " all outbound traffic. Got {0}.".format(rules))
-            self.assertEqual(
-                rules[0].direction, TrafficDirection.OUTBOUND,
-                "Expected rule to be outbound. Got {0}.".format(rules))
             rule = fw.rules.create(
                 direction=TrafficDirection.INBOUND, src_dest_fw=fw,
                 protocol='tcp', from_port=1, to_port=65535)
@@ -204,7 +194,7 @@ class CloudSecurityServiceTestCase(ProviderTestBase):
                 r.delete()
             fw = self.provider.security.vm_firewalls.get(fw.id)  # update
             self.assertTrue(
-                fw is None or len(list(fw.rules)) == 0,
+                len(list(fw.rules)) == 0,
                 "Deleting VMFirewallRule should delete it: {0}".format(
                     fw.rules if fw else []))
         fwl = self.provider.security.vm_firewalls.list()