Przeglądaj źródła

fix(encapsulation): introduce Cilium support (#409)

* fix(encapsulation): route Cilium IPIP traffic through VxLAN overlay

Rewrite Cilium encapsulator to create IPIP tunnels instead of using
cilium_host interface directly. Each node autodiscovers its cilium_host
IP and advertises it via kilo.squat.ai/cilium-internal-ip annotation,
allowing other nodes to route IPIP outer packets through Cilium's VxLAN
overlay and preventing routing loops.

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>

* fix(tests): fix lint, unit tests and review feedback

Align constant block formatting for gofmt, add ciliumInternalIPs
to expected topology test segments, use bytes.Equal for nil-safe
CiliumInternalIP comparison, and return error from CleanUp.

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>

* fix(lint): use net.IP.Equal instead of bytes.Equal

staticcheck SA1021 requires net.IP.Equal for IP comparison.

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>

* fix(routes): add IPIP return path on leader when local=false

When running with --local=false and --compatibility=cilium, the leader
node did not create IPIP return routes for non-leader nodes in the same
location. This caused asymmetric routing: non-leaders encapsulated
traffic to the leader via IPIP (through Cilium's VxLAN overlay), but
the leader sent replies directly via the physical interface, which could
be dropped by cloud networks blocking IP protocol 4 or by reverse path
filtering on the non-leaders.

Add a new routing block for the !local case that creates:
- Routes in table 1107 using the overlay gateway (Cilium internal IP)
  so IPIP outer packets traverse the VxLAN tunnel
- Policy rules matching traffic arriving on the WireGuard interface
  (iif kilo0) destined for non-leader private IPs

This only activates when the encapsulator returns a gateway different
from the node's private IP, i.e. when an overlay like Cilium is in use.

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>

* fix(encapsulation): use cilium_tunl for IPIP tunnel in Cilium mode

When Cilium's enable-ipip-termination is active, it renames the
kernel's tunl0 to cilium_tunl and creates a receive-only cilium_ipip4
device for DSR. The cilium_ipip4 interface cannot transmit packets
(TX errors), so use cilium_tunl which supports both TX and RX.

If cilium_tunl already exists (Cilium manages it), reuse it. Otherwise,
create it so the interface name is consistent regardless of whether
enable-ipip-termination is active.

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>

* fix(encapsulation): fall back to tunl0 when cilium_tunl is absent

Instead of always creating cilium_tunl, reuse it only when Cilium has
already created it (enable-ipip-termination). Otherwise create the
standard tunl0 — Cilium will rename it later if needed.

Also ensure the interface is brought UP when reusing cilium_tunl,
as Cilium may leave it in DOWN state.

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>

* fix(lint): simplify ObjectMeta.Annotations to Annotations

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>

* refactor(encapsulation): rename LocalIP to CNICompatibilityIP

Rename the encapsulation interface method and all related struct
fields and annotation keys from Cilium-specific names to generic
CNI compatibility names, as requested in review.

Changes:
- LocalIP() -> CNICompatibilityIP() returning *net.IPNet
- CiliumInternalIP -> CNICompatibilityIP in Node struct
- ciliumInternalIPs -> cniCompatibilityIPs in segment struct
- kilo.squat.ai/cilium-internal-ip -> kilo.squat.ai/cni-compatibility-ip
- Add comments for ignored errors and IPv4 filter in cilium.go

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>

---------

Signed-off-by: Andrei Kvapil <kvapss@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
Andrei Kvapil 2 miesięcy temu
rodzic
commit
2805127186

+ 101 - 56
pkg/encapsulation/cilium.go

@@ -17,95 +17,140 @@ package encapsulation
 import (
 	"fmt"
 	"net"
-	"sync"
 
 	"github.com/vishvananda/netlink"
 
+	"github.com/squat/kilo/pkg/iproute"
 	"github.com/squat/kilo/pkg/iptables"
 )
 
-const ciliumDeviceName = "cilium_host"
+const (
+	ciliumHostIface = "cilium_host"
+	// ciliumTunlIface is the kernel's default IPIP tunnel (tunl0) renamed
+	// by Cilium when enable-ipip-termination is enabled.
+	ciliumTunlIface = "cilium_tunl"
+)
 
 type cilium struct {
-	iface    int
-	strategy Strategy
-	ch       chan netlink.LinkUpdate
-	done     chan struct{}
-	// mu guards updates to the iface field.
-	mu sync.Mutex
+	iface      int
+	strategy   Strategy
+	ownsTunnel bool
 }
 
-// NewCilium returns an encapsulator that uses Cilium.
+// NewCilium returns an encapsulator that uses IPIP tunnels
+// routed through Cilium's VxLAN overlay.
 func NewCilium(strategy Strategy) Encapsulator {
-	return &cilium{
-		ch:       make(chan netlink.LinkUpdate),
-		done:     make(chan struct{}),
-		strategy: strategy,
-	}
+	return &cilium{strategy: strategy}
 }
 
-// CleanUp close done channel
-func (f *cilium) CleanUp() error {
-	close(f.done)
-	return nil
+// CleanUp will remove any created IPIP devices.
+// If the tunnel is owned by Cilium, skip removal.
+func (c *cilium) CleanUp() error {
+	if !c.ownsTunnel {
+		return nil
+	}
+	if err := iproute.DeleteAddresses(c.iface); err != nil {
+		return err
+	}
+	return iproute.RemoveInterface(c.iface)
 }
 
 // Gw returns the correct gateway IP associated with the given node.
-func (f *cilium) Gw(_, _ net.IP, subnet *net.IPNet) net.IP {
+// It returns the Cilium internal IP so that the IPIP outer packets are routed
+// through Cilium's VxLAN overlay rather than the host network.
+func (c *cilium) Gw(_, _, cniIP net.IP, subnet *net.IPNet) net.IP {
+	if cniIP != nil {
+		return cniIP
+	}
 	return subnet.IP
 }
 
-// Index returns the index of the Cilium interface.
-func (f *cilium) Index() int {
-	f.mu.Lock()
-	defer f.mu.Unlock()
-	return f.iface
+// CNICompatibilityIP returns the IP address of the cilium_host interface.
+// This IP is advertised to other nodes so they can route IPIP outer
+// packets through Cilium's overlay.
+func (c *cilium) CNICompatibilityIP() *net.IPNet {
+	iface, err := net.InterfaceByName(ciliumHostIface)
+	if err != nil {
+		// cilium_host does not exist; Cilium may not be running.
+		return nil
+	}
+	addrs, err := iface.Addrs()
+	if err != nil {
+		// Unable to list addresses; safe to skip since the
+		// CNI compatibility IP is only used for optimization.
+		return nil
+	}
+	for _, a := range addrs {
+		// IPIP tunnels use IPv4 encapsulation, so only IPv4
+		// addresses are usable as the outer header source/destination.
+		if ipNet, ok := a.(*net.IPNet); ok && ipNet.IP.To4() != nil {
+			return ipNet
+		}
+	}
+	return nil
 }
 
-// Init finds the Cilium interface index.
-func (f *cilium) Init(_ int) error {
-	if err := netlink.LinkSubscribe(f.ch, f.done); err != nil {
-		return fmt.Errorf("failed to subscribe to updates to %s: %v", ciliumDeviceName, err)
-	}
-	go func() {
-		var lu netlink.LinkUpdate
-		for {
-			select {
-			case lu = <-f.ch:
-				if lu.Attrs().Name == ciliumDeviceName {
-					f.mu.Lock()
-					f.iface = lu.Attrs().Index
-					f.mu.Unlock()
-				}
-			case <-f.done:
-				return
+// Index returns the index of the IPIP tunnel interface.
+func (c *cilium) Index() int {
+	return c.iface
+}
+
+// Init initializes the IPIP tunnel interface.
+// If Cilium is running with enable-ipip-termination, it renames the kernel's
+// tunl0 to cilium_tunl. In that case we reuse the existing cilium_tunl.
+// Otherwise we create the standard tunl0 ourselves.
+func (c *cilium) Init(base int) error {
+	// If Cilium created cilium_tunl (enable-ipip-termination), reuse it.
+	if link, err := netlink.LinkByName(ciliumTunlIface); err == nil {
+		c.iface = link.Attrs().Index
+		c.ownsTunnel = false
+		// Ensure the interface is UP — Cilium may leave it DOWN.
+		if link.Attrs().Flags&net.FlagUp == 0 {
+			if err := iproute.Set(c.iface, true); err != nil {
+				return fmt.Errorf("failed to set %s up: %v", ciliumTunlIface, err)
 			}
 		}
-	}()
-	i, err := netlink.LinkByName(ciliumDeviceName)
-	if _, ok := err.(netlink.LinkNotFoundError); ok {
 		return nil
 	}
+	// No cilium_tunl — create standard tunl0.
+	iface, err := iproute.NewIPIP(base)
 	if err != nil {
-		return fmt.Errorf("failed to query for Cilium interface: %v", err)
+		return fmt.Errorf("failed to create tunnel interface: %v", err)
+	}
+	if err := iproute.Set(iface, true); err != nil {
+		return fmt.Errorf("failed to set tunnel interface up: %v", err)
 	}
-	f.mu.Lock()
-	f.iface = i.Attrs().Index
-	f.mu.Unlock()
+	c.iface = iface
+	c.ownsTunnel = true
 	return nil
 }
 
-// Rules is a no-op.
-func (f *cilium) Rules(_ []*net.IPNet) iptables.RuleSet {
-	return iptables.RuleSet{}
+// Rules returns a set of iptables rules that are necessary
+// when traffic between nodes must be encapsulated.
+func (c *cilium) Rules(nodes []*net.IPNet) iptables.RuleSet {
+	rules := iptables.RuleSet{}
+	proto := ipipProtocolName()
+	rules.AddToAppend(iptables.NewIPv4Chain("filter", "KILO-IPIP"))
+	rules.AddToAppend(iptables.NewIPv6Chain("filter", "KILO-IPIP"))
+	rules.AddToAppend(iptables.NewIPv4Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: jump to IPIP chain", "-j", "KILO-IPIP"))
+	rules.AddToAppend(iptables.NewIPv6Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: jump to IPIP chain", "-j", "KILO-IPIP"))
+	for _, n := range nodes {
+		// Accept encapsulated traffic from peers.
+		rules.AddToPrepend(iptables.NewRule(iptables.GetProtocol(n.IP), "filter", "KILO-IPIP", "-s", n.String(), "-m", "comment", "--comment", "Kilo: allow IPIP traffic", "-j", "ACCEPT"))
+	}
+	// Drop all other IPIP traffic.
+	rules.AddToAppend(iptables.NewIPv4Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: reject other IPIP traffic", "-j", "DROP"))
+	rules.AddToAppend(iptables.NewIPv6Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: reject other IPIP traffic", "-j", "DROP"))
+
+	return rules
 }
 
-// Set is a no-op.
-func (f *cilium) Set(_ *net.IPNet) error {
-	return nil
+// Set sets the IP address of the IPIP tunnel interface.
+func (c *cilium) Set(cidr *net.IPNet) error {
+	return iproute.SetAddress(c.iface, cidr)
 }
 
 // Strategy returns the configured strategy for encapsulation.
-func (f *cilium) Strategy() Strategy {
-	return f.strategy
+func (c *cilium) Strategy() Strategy {
+	return c.strategy
 }

+ 4 - 1
pkg/encapsulation/encapsulation.go

@@ -46,9 +46,12 @@ const (
 // * clean up any changes applied to the backend.
 type Encapsulator interface {
 	CleanUp() error
-	Gw(net.IP, net.IP, *net.IPNet) net.IP
+	Gw(net.IP, net.IP, net.IP, *net.IPNet) net.IP
 	Index() int
 	Init(int) error
+	// CNICompatibilityIP returns the local overlay IP that should be
+	// advertised to other nodes for CNI-compatible routing.
+	CNICompatibilityIP() *net.IPNet
 	Rules([]*net.IPNet) iptables.RuleSet
 	Set(*net.IPNet) error
 	Strategy() Strategy

+ 6 - 1
pkg/encapsulation/flannel.go

@@ -50,10 +50,15 @@ func (f *flannel) CleanUp() error {
 }
 
 // Gw returns the correct gateway IP associated with the given node.
-func (f *flannel) Gw(_, _ net.IP, subnet *net.IPNet) net.IP {
+func (f *flannel) Gw(_, _, _ net.IP, subnet *net.IPNet) net.IP {
 	return subnet.IP
 }
 
+// CNICompatibilityIP is a no-op for Flannel.
+func (f *flannel) CNICompatibilityIP() *net.IPNet {
+	return nil
+}
+
 // Index returns the index of the Flannel interface.
 func (f *flannel) Index() int {
 	f.mu.Lock()

+ 6 - 1
pkg/encapsulation/ipip.go

@@ -41,10 +41,15 @@ func (i *ipip) CleanUp() error {
 }
 
 // Gw returns the correct gateway IP associated with the given node.
-func (i *ipip) Gw(_, internal net.IP, _ *net.IPNet) net.IP {
+func (i *ipip) Gw(_, internal, _ net.IP, _ *net.IPNet) net.IP {
 	return internal
 }
 
+// CNICompatibilityIP is a no-op for IPIP.
+func (i *ipip) CNICompatibilityIP() *net.IPNet {
+	return nil
+}
+
 // Index returns the index of the IPIP interface.
 func (i *ipip) Index() int {
 	return i.iface

+ 6 - 1
pkg/encapsulation/noop.go

@@ -29,7 +29,12 @@ func (n Noop) CleanUp() error {
 }
 
 // Gw will also do nothing.
-func (n Noop) Gw(_ net.IP, _ net.IP, _ *net.IPNet) net.IP {
+func (n Noop) Gw(_, _, _ net.IP, _ *net.IPNet) net.IP {
+	return nil
+}
+
+// CNICompatibilityIP will also do nothing.
+func (n Noop) CNICompatibilityIP() *net.IPNet {
 	return nil
 }
 

+ 12 - 6
pkg/iproute/ipip.go

@@ -24,17 +24,23 @@ import (
 )
 
 const (
-	ipipHeaderSize = 20
-	tunnelName     = "tunl0"
+	ipipHeaderSize    = 20
+	DefaultTunnelName = "tunl0"
 )
 
-// NewIPIP creates an IPIP interface using the base interface
+// NewIPIP creates an IPIP interface named tunl0 using the base interface
 // to derive the tunnel's MTU.
 func NewIPIP(baseIndex int) (int, error) {
-	link, err := netlink.LinkByName(tunnelName)
+	return NewIPIPWithName(baseIndex, DefaultTunnelName)
+}
+
+// NewIPIPWithName creates a named IPIP interface using the base interface
+// to derive the tunnel's MTU.
+func NewIPIPWithName(baseIndex int, name string) (int, error) {
+	link, err := netlink.LinkByName(name)
 	if err != nil {
 		// If we failed to find the tunnel, then it probably simply does not exist.
-		cmd := exec.Command("ip", "tunnel", "add", tunnelName, "mode", "ipip")
+		cmd := exec.Command("ip", "tunnel", "add", name, "mode", "ipip")
 		var stderr bytes.Buffer
 		cmd.Stderr = &stderr
 		// Sometimes creating a tunnel returns the error "File exists,"
@@ -42,7 +48,7 @@ func NewIPIP(baseIndex int) (int, error) {
 		if err := cmd.Run(); err != nil && !strings.Contains(stderr.String(), "File exists") {
 			return 0, fmt.Errorf("failed to create IPIP tunnel: %s", stderr.String())
 		}
-		link, err = netlink.LinkByName(tunnelName)
+		link, err = netlink.LinkByName(name)
 		if err != nil {
 			return 0, fmt.Errorf("failed to get tunnel device: %v", err)
 		}

+ 27 - 14
pkg/k8s/backend.go

@@ -49,20 +49,21 @@ import (
 
 const (
 	// Backend is the name of this mesh backend.
-	Backend                      = "kubernetes"
-	endpointAnnotationKey        = "kilo.squat.ai/endpoint"
-	forceEndpointAnnotationKey   = "kilo.squat.ai/force-endpoint"
-	forceInternalIPAnnotationKey = "kilo.squat.ai/force-internal-ip"
-	internalIPAnnotationKey      = "kilo.squat.ai/internal-ip"
-	keyAnnotationKey             = "kilo.squat.ai/key"
-	lastSeenAnnotationKey        = "kilo.squat.ai/last-seen"
-	leaderAnnotationKey          = "kilo.squat.ai/leader"
-	locationAnnotationKey        = "kilo.squat.ai/location"
-	persistentKeepaliveKey       = "kilo.squat.ai/persistent-keepalive"
-	wireGuardIPAnnotationKey     = "kilo.squat.ai/wireguard-ip"
-	discoveredEndpointsKey       = "kilo.squat.ai/discovered-endpoints"
-	allowedLocationIPsKey        = "kilo.squat.ai/allowed-location-ips"
-	granularityKey               = "kilo.squat.ai/granularity"
+	Backend                         = "kubernetes"
+	endpointAnnotationKey           = "kilo.squat.ai/endpoint"
+	forceEndpointAnnotationKey      = "kilo.squat.ai/force-endpoint"
+	forceInternalIPAnnotationKey    = "kilo.squat.ai/force-internal-ip"
+	internalIPAnnotationKey         = "kilo.squat.ai/internal-ip"
+	keyAnnotationKey                = "kilo.squat.ai/key"
+	lastSeenAnnotationKey           = "kilo.squat.ai/last-seen"
+	leaderAnnotationKey             = "kilo.squat.ai/leader"
+	locationAnnotationKey           = "kilo.squat.ai/location"
+	persistentKeepaliveKey          = "kilo.squat.ai/persistent-keepalive"
+	wireGuardIPAnnotationKey        = "kilo.squat.ai/wireguard-ip"
+	discoveredEndpointsKey          = "kilo.squat.ai/discovered-endpoints"
+	allowedLocationIPsKey           = "kilo.squat.ai/allowed-location-ips"
+	granularityKey                  = "kilo.squat.ai/granularity"
+	cniCompatibilityIPAnnotationKey = "kilo.squat.ai/cni-compatibility-ip"
 	// RegionLabelKey is the key for the well-known Kubernetes topology region label.
 	RegionLabelKey  = "topology.kubernetes.io/region"
 	jsonPatchSlash  = "~1"
@@ -241,6 +242,11 @@ func (nb *nodeBackend) Set(ctx context.Context, name string, node *mesh.Node) er
 		n.Annotations[discoveredEndpointsKey] = string(discoveredEndpoints)
 	}
 	n.Annotations[granularityKey] = string(node.Granularity)
+	if node.CNICompatibilityIP != nil {
+		n.Annotations[cniCompatibilityIPAnnotationKey] = node.CNICompatibilityIP.String()
+	} else {
+		n.Annotations[cniCompatibilityIPAnnotationKey] = ""
+	}
 	oldData, err := json.Marshal(old)
 	if err != nil {
 		return err
@@ -342,6 +348,12 @@ func translateNode(node *v1.Node, topologyLabel string) *mesh.Node {
 	// TODO log some error or warning.
 	key, _ := wgtypes.ParseKey(node.Annotations[keyAnnotationKey])
 
+	// Parse the CNI compatibility IP if present.
+	var cniCompatibilityIP *net.IPNet
+	if cipStr, ok := node.Annotations[cniCompatibilityIPAnnotationKey]; ok && cipStr != "" {
+		cniCompatibilityIP = normalizeIP(cipStr)
+	}
+
 	return &mesh.Node{
 		// Endpoint and InternalIP should only ever fail to parse if the
 		// remote node's agent has not yet set its IP address;
@@ -352,6 +364,7 @@ func translateNode(node *v1.Node, topologyLabel string) *mesh.Node {
 		Endpoint:            endpoint,
 		NoInternalIP:        noInternalIP,
 		InternalIP:          internalIP,
+		CNICompatibilityIP:  cniCompatibilityIP,
 		Key:                 key,
 		LastSeen:            lastSeen,
 		Leader:              leader,

+ 5 - 4
pkg/mesh/backend.go

@@ -57,10 +57,11 @@ const (
 
 // Node represents a node in the network.
 type Node struct {
-	Endpoint     *wireguard.Endpoint
-	Key          wgtypes.Key
-	NoInternalIP bool
-	InternalIP   *net.IPNet
+	Endpoint           *wireguard.Endpoint
+	Key                wgtypes.Key
+	NoInternalIP       bool
+	InternalIP         *net.IPNet
+	CNICompatibilityIP *net.IPNet
 	// LastSeen is a Unix time for the last time
 	// the node confirmed it was live.
 	LastSeen int64

+ 2 - 0
pkg/mesh/mesh.go

@@ -412,6 +412,7 @@ func (m *Mesh) handleLocal(ctx context.Context, n *Node) {
 		Key:                 m.pub,
 		NoInternalIP:        n.NoInternalIP,
 		InternalIP:          n.InternalIP,
+		CNICompatibilityIP:  m.enc.CNICompatibilityIP(),
 		LastSeen:            time.Now().Unix(),
 		Leader:              n.Leader,
 		Location:            n.Location,
@@ -699,6 +700,7 @@ func nodesAreEqual(a, b *Node) bool {
 	return a.Key.String() == b.Key.String() &&
 		ipNetsEqual(a.WireGuardIP, b.WireGuardIP) &&
 		ipNetsEqual(a.InternalIP, b.InternalIP) &&
+		ipNetsEqual(a.CNICompatibilityIP, b.CNICompatibilityIP) &&
 		a.Leader == b.Leader &&
 		a.Location == b.Location &&
 		a.Name == b.Name &&

+ 36 - 5
pkg/mesh/routes.go

@@ -40,7 +40,7 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
 		var gw net.IP
 		for _, segment := range t.segments {
 			if segment.location == t.location {
-				gw = enc.Gw(t.updateEndpoint(segment.endpoint, segment.key, &segment.persistentKeepalive).IP(), segment.privateIPs[segment.leader], segment.cidrs[segment.leader])
+				gw = enc.Gw(t.updateEndpoint(segment.endpoint, segment.key, &segment.persistentKeepalive).IP(), segment.privateIPs[segment.leader], ipFromIPNet(segment.cniCompatibilityIPs[segment.leader]), segment.cidrs[segment.leader])
 				break
 			}
 		}
@@ -61,10 +61,11 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
 						if segment.privateIPs[i].Equal(t.privateIP.IP) {
 							continue
 						}
+						nodeGw := enc.Gw(nil, segment.privateIPs[i], ipFromIPNet(segment.cniCompatibilityIPs[i]), segment.cidrs[i])
 						routes = append(routes, encapsulateRoute(&netlink.Route{
 							Dst:       segment.cidrs[i],
 							Flags:     int(netlink.FLAG_ONLINK),
-							Gw:        segment.privateIPs[i],
+							Gw:        nodeGw,
 							LinkIndex: privIface,
 							Protocol:  unix.RTPROT_STATIC,
 						}, enc.Strategy(), t.privateIP, tunlIface))
@@ -74,7 +75,7 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
 							routes = append(routes, &netlink.Route{
 								Dst:       oneAddressCIDR(segment.privateIPs[i]),
 								Flags:     int(netlink.FLAG_ONLINK),
-								Gw:        segment.privateIPs[i],
+								Gw:        nodeGw,
 								LinkIndex: tunlIface,
 								Src:       t.privateIP.IP,
 								Protocol:  unix.RTPROT_STATIC,
@@ -155,10 +156,11 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
 					if segment.privateIPs[i].Equal(t.privateIP.IP) {
 						continue
 					}
+					nodeGw := enc.Gw(nil, segment.privateIPs[i], ipFromIPNet(segment.cniCompatibilityIPs[i]), segment.cidrs[i])
 					routes = append(routes, encapsulateRoute(&netlink.Route{
 						Dst:       segment.cidrs[i],
 						Flags:     int(netlink.FLAG_ONLINK),
-						Gw:        segment.privateIPs[i],
+						Gw:        nodeGw,
 						LinkIndex: privIface,
 						Protocol:  unix.RTPROT_STATIC,
 					}, enc.Strategy(), t.privateIP, tunlIface))
@@ -168,7 +170,7 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
 						routes = append(routes, &netlink.Route{
 							Dst:       oneAddressCIDR(segment.privateIPs[i]),
 							Flags:     int(netlink.FLAG_ONLINK),
-							Gw:        segment.privateIPs[i],
+							Gw:        nodeGw,
 							LinkIndex: tunlIface,
 							Src:       t.privateIP.IP,
 							Protocol:  unix.RTPROT_STATIC,
@@ -189,6 +191,35 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
 					}
 				}
 			}
+			// When not managing local routes, the leader still needs to
+			// route return WireGuard traffic through IPIP when non-leaders
+			// use overlay routing (e.g. Cilium) to reach the leader.
+			// Use the overlay gateway (e.g. Cilium internal IP) so the
+			// IPIP outer packet is routed through the overlay tunnel,
+			// since direct IPIP may be blocked by the cloud network.
+			if !local && t.privateIP != nil && enc.Strategy() != encapsulation.Never {
+				for i := range segment.cidrs {
+					if segment.privateIPs[i].Equal(t.privateIP.IP) {
+						continue
+					}
+					nodeGw := enc.Gw(nil, segment.privateIPs[i], ipFromIPNet(segment.cniCompatibilityIPs[i]), segment.cidrs[i])
+					if nodeGw != nil && !nodeGw.Equal(segment.privateIPs[i]) {
+						routes = append(routes, &netlink.Route{
+							Dst:       oneAddressCIDR(segment.privateIPs[i]),
+							Flags:     int(netlink.FLAG_ONLINK),
+							Gw:        nodeGw,
+							LinkIndex: tunlIface,
+							Protocol:  unix.RTPROT_STATIC,
+							Table:     kiloTableIndex,
+						})
+						rules = append(rules, defaultRule(&netlink.Rule{
+							Dst:     oneAddressCIDR(segment.privateIPs[i]),
+							Table:   kiloTableIndex,
+							IifName: kiloIfaceName,
+						}))
+					}
+				}
+			}
 			// Continuing here prevents leaders form adding routes via WireGuard to
 			// nodes in their own location.
 			continue

+ 13 - 0
pkg/mesh/topology.go

@@ -86,6 +86,8 @@ type segment struct {
 	leader int
 	// privateIPs is a slice of private IPs of all peers in the segment.
 	privateIPs []net.IP
+	// cniCompatibilityIPs is a slice of CNI compatibility IPs of all peers in the segment.
+	cniCompatibilityIPs []*net.IPNet
 	// wireGuardIP is the allocated IP address of the WireGuard
 	// interface on the leader of the segment.
 	wireGuardIP net.IP
@@ -155,6 +157,7 @@ func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Gra
 		var cidrs []*net.IPNet
 		var hostnames []string
 		var privateIPs []net.IP
+		var cniCompatibilityIPs []*net.IPNet
 		for _, node := range topoMap[location] {
 			// Allowed IPs should include:
 			// - the node's allocated subnet
@@ -174,6 +177,7 @@ func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Gra
 				allowedIPs = append(allowedIPs, *oneAddressCIDR(node.InternalIP.IP))
 				privateIPs = append(privateIPs, node.InternalIP.IP)
 			}
+			cniCompatibilityIPs = append(cniCompatibilityIPs, node.CNICompatibilityIP)
 			cidrs = append(cidrs, node.Subnet)
 			hostnames = append(hostnames, node.Name)
 		}
@@ -191,6 +195,7 @@ func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Gra
 			hostnames:           hostnames,
 			leader:              leader,
 			privateIPs:          privateIPs,
+			cniCompatibilityIPs: cniCompatibilityIPs,
 			allowedLocationIPs:  allowedLocationIPs,
 		})
 		_ = level.Debug(t.logger).Log("msg", "generated segment", "location", location, "allowedIPs", allowedIPs, "endpoint", topoMap[location][leader].Endpoint, "cidrs", cidrs, "hostnames", hostnames, "leader", leader, "privateIPs", privateIPs, "allowedLocationIPs", allowedLocationIPs)
@@ -414,6 +419,14 @@ func oneAddressCIDR(ip net.IP) *net.IPNet {
 	return &net.IPNet{IP: ip, Mask: net.CIDRMask(len(ip)*8, len(ip)*8)}
 }
 
+// ipFromIPNet extracts the IP from a *net.IPNet, returning nil if the IPNet is nil.
+func ipFromIPNet(n *net.IPNet) net.IP {
+	if n == nil {
+		return nil
+	}
+	return n.IP
+}
+
 // findLeader selects a leader for the nodes in a segment;
 // it will select the first node that says it should lead
 // or the first node in the segment if none have volunteered,

+ 25 - 0
pkg/mesh/topology_test.go

@@ -152,6 +152,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["a"].Subnet},
 						hostnames:           []string{"a"},
 						privateIPs:          []net.IP{nodes["a"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w1,
 					},
 					{
@@ -163,6 +164,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["b"].Subnet, nodes["c"].Subnet},
 						hostnames:           []string{"b", "c"},
 						privateIPs:          []net.IP{nodes["b"].InternalIP.IP, nodes["c"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil, nil},
 						wireGuardIP:         w2,
 						allowedLocationIPs:  nodes["b"].AllowedLocationIPs,
 					},
@@ -175,6 +177,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["d"].Subnet},
 						hostnames:           []string{"d"},
 						privateIPs:          nil,
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w3,
 					},
 				},
@@ -203,6 +206,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["a"].Subnet},
 						hostnames:           []string{"a"},
 						privateIPs:          []net.IP{nodes["a"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w1,
 					},
 					{
@@ -214,6 +218,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["b"].Subnet, nodes["c"].Subnet},
 						hostnames:           []string{"b", "c"},
 						privateIPs:          []net.IP{nodes["b"].InternalIP.IP, nodes["c"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil, nil},
 						wireGuardIP:         w2,
 						allowedLocationIPs:  nodes["b"].AllowedLocationIPs,
 					},
@@ -226,6 +231,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["d"].Subnet},
 						hostnames:           []string{"d"},
 						privateIPs:          nil,
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w3,
 					},
 				},
@@ -254,6 +260,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["a"].Subnet},
 						hostnames:           []string{"a"},
 						privateIPs:          []net.IP{nodes["a"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w1,
 					},
 					{
@@ -265,6 +272,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["b"].Subnet, nodes["c"].Subnet},
 						hostnames:           []string{"b", "c"},
 						privateIPs:          []net.IP{nodes["b"].InternalIP.IP, nodes["c"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil, nil},
 						wireGuardIP:         w2,
 						allowedLocationIPs:  nodes["b"].AllowedLocationIPs,
 					},
@@ -277,6 +285,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["d"].Subnet},
 						hostnames:           []string{"d"},
 						privateIPs:          nil,
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w3,
 					},
 				},
@@ -305,6 +314,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["a"].Subnet},
 						hostnames:           []string{"a"},
 						privateIPs:          []net.IP{nodes["a"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w1,
 					},
 					{
@@ -316,6 +326,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["b"].Subnet},
 						hostnames:           []string{"b"},
 						privateIPs:          []net.IP{nodes["b"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w2,
 						allowedLocationIPs:  nodes["b"].AllowedLocationIPs,
 					},
@@ -328,6 +339,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["c"].Subnet},
 						hostnames:           []string{"c"},
 						privateIPs:          []net.IP{nodes["c"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w3,
 					},
 					{
@@ -339,6 +351,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["d"].Subnet},
 						hostnames:           []string{"d"},
 						privateIPs:          nil,
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w4,
 					},
 				},
@@ -367,6 +380,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["a"].Subnet},
 						hostnames:           []string{"a"},
 						privateIPs:          []net.IP{nodes["a"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w1,
 					},
 					{
@@ -378,6 +392,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["b"].Subnet},
 						hostnames:           []string{"b"},
 						privateIPs:          []net.IP{nodes["b"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w2,
 						allowedLocationIPs:  nodes["b"].AllowedLocationIPs,
 					},
@@ -390,6 +405,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["c"].Subnet},
 						hostnames:           []string{"c"},
 						privateIPs:          []net.IP{nodes["c"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w3,
 					},
 					{
@@ -401,6 +417,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["d"].Subnet},
 						hostnames:           []string{"d"},
 						privateIPs:          nil,
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w4,
 					},
 				},
@@ -429,6 +446,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["a"].Subnet},
 						hostnames:           []string{"a"},
 						privateIPs:          []net.IP{nodes["a"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w1,
 					},
 					{
@@ -440,6 +458,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["b"].Subnet},
 						hostnames:           []string{"b"},
 						privateIPs:          []net.IP{nodes["b"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w2,
 						allowedLocationIPs:  nodes["b"].AllowedLocationIPs,
 					},
@@ -452,6 +471,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["c"].Subnet},
 						hostnames:           []string{"c"},
 						privateIPs:          []net.IP{nodes["c"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w3,
 					},
 					{
@@ -463,6 +483,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["d"].Subnet},
 						hostnames:           []string{"d"},
 						privateIPs:          nil,
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w4,
 					},
 				},
@@ -491,6 +512,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["a"].Subnet},
 						hostnames:           []string{"a"},
 						privateIPs:          []net.IP{nodes["a"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w1,
 					},
 					{
@@ -502,6 +524,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["b"].Subnet},
 						hostnames:           []string{"b"},
 						privateIPs:          []net.IP{nodes["b"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w2,
 						allowedLocationIPs:  nodes["b"].AllowedLocationIPs,
 					},
@@ -514,6 +537,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["c"].Subnet},
 						hostnames:           []string{"c"},
 						privateIPs:          []net.IP{nodes["c"].InternalIP.IP},
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w3,
 					},
 					{
@@ -525,6 +549,7 @@ func TestNewTopology(t *testing.T) {
 						cidrs:               []*net.IPNet{nodes["d"].Subnet},
 						hostnames:           []string{"d"},
 						privateIPs:          nil,
+						cniCompatibilityIPs: []*net.IPNet{nil},
 						wireGuardIP:         w4,
 					},
 				},