topology.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. // Copyright 2021 the Kilo authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mesh
  15. import (
  16. "errors"
  17. "net"
  18. "sort"
  19. "time"
  20. "github.com/go-kit/kit/log"
  21. "github.com/go-kit/kit/log/level"
  22. "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
  23. "github.com/squat/kilo/pkg/wireguard"
  24. )
  25. const (
  26. logicalLocationPrefix = "location:"
  27. nodeLocationPrefix = "node:"
  28. )
  29. // Topology represents the logical structure of the overlay network.
  30. type Topology struct {
  31. // key is the private key of the node creating the topology.
  32. key wgtypes.Key
  33. port int
  34. // Location is the logical location of the local host.
  35. location string
  36. segments []*segment
  37. peers []*Peer
  38. // hostname is the hostname of the local host.
  39. hostname string
  40. // leader represents whether or not the local host
  41. // is the segment leader.
  42. leader bool
  43. // persistentKeepalive is the interval in seconds of the emission
  44. // of keepalive packets by the local node to its peers.
  45. persistentKeepalive time.Duration
  46. // privateIP is the private IP address of the local node.
  47. privateIP *net.IPNet
  48. // subnet is the Pod subnet of the local node.
  49. subnet *net.IPNet
  50. // wireGuardCIDR is the allocated CIDR of the WireGuard
  51. // interface of the local node within the Kilo subnet.
  52. // If the local node is not the leader of a location, then
  53. // the IP is the 0th address in the subnet, i.e. the CIDR
  54. // is equal to the Kilo subnet.
  55. wireGuardCIDR *net.IPNet
  56. // serviceCIDRs are the known service CIDRs of the Kubernetes cluster.
  57. // They are not strictly needed, however if they are known,
  58. // then the topology can avoid masquerading packets destined to service IPs.
  59. serviceCIDRs []*net.IPNet
  60. // discoveredEndpoints is the updated map of valid discovered Endpoints
  61. discoveredEndpoints map[string]*net.UDPAddr
  62. logger log.Logger
  63. }
  64. // segment represents one logical unit in the topology that is united by one common WireGuard IP.
  65. type segment struct {
  66. allowedIPs []net.IPNet
  67. endpoint *wireguard.Endpoint
  68. key wgtypes.Key
  69. persistentKeepalive time.Duration
  70. // Location is the logical location of this segment.
  71. location string
  72. // cidrs is a slice of subnets of all peers in the segment.
  73. cidrs []*net.IPNet
  74. // hostnames is a slice of the hostnames of the peers in the segment.
  75. hostnames []string
  76. // leader is the index of the leader of the segment.
  77. leader int
  78. // privateIPs is a slice of private IPs of all peers in the segment.
  79. privateIPs []net.IP
  80. // cniCompatibilityIPs is a slice of CNI compatibility IPs of all peers in the segment.
  81. cniCompatibilityIPs []*net.IPNet
  82. // wireGuardIP is the allocated IP address of the WireGuard
  83. // interface on the leader of the segment.
  84. wireGuardIP net.IP
  85. // allowedLocationIPs are not part of the cluster and are not peers.
  86. // They are directly routable from nodes within the segment.
  87. // A classic example is a printer that ought to be routable from other locations.
  88. allowedLocationIPs []net.IPNet
  89. }
  90. // NewTopology creates a new Topology struct from a given set of nodes and peers.
  91. func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Granularity, hostname string, port int, key wgtypes.Key, subnet *net.IPNet, serviceCIDRs []*net.IPNet, persistentKeepalive time.Duration, logger log.Logger) (*Topology, error) {
  92. if logger == nil {
  93. logger = log.NewNopLogger()
  94. }
  95. topoMap := make(map[string][]*Node)
  96. for _, node := range nodes {
  97. var location string
  98. switch granularity {
  99. case LogicalGranularity:
  100. location = logicalLocationPrefix + node.Location
  101. // Put node in a different location, if no private
  102. // IP was found.
  103. if node.InternalIP == nil {
  104. location = nodeLocationPrefix + node.Name
  105. }
  106. case FullGranularity:
  107. location = nodeLocationPrefix + node.Name
  108. }
  109. topoMap[location] = append(topoMap[location], node)
  110. }
  111. var localLocation string
  112. switch granularity {
  113. case LogicalGranularity:
  114. localLocation = logicalLocationPrefix + nodes[hostname].Location
  115. if nodes[hostname].InternalIP == nil {
  116. localLocation = nodeLocationPrefix + hostname
  117. }
  118. case FullGranularity:
  119. localLocation = nodeLocationPrefix + hostname
  120. }
  121. t := Topology{
  122. key: key,
  123. port: port,
  124. hostname: hostname,
  125. location: localLocation,
  126. persistentKeepalive: persistentKeepalive,
  127. privateIP: nodes[hostname].InternalIP,
  128. subnet: nodes[hostname].Subnet,
  129. wireGuardCIDR: subnet,
  130. serviceCIDRs: serviceCIDRs,
  131. discoveredEndpoints: make(map[string]*net.UDPAddr),
  132. logger: logger,
  133. }
  134. for location := range topoMap {
  135. // Sort the location so the result is stable.
  136. sort.Slice(topoMap[location], func(i, j int) bool {
  137. return topoMap[location][i].Name < topoMap[location][j].Name
  138. })
  139. leader := findLeader(topoMap[location])
  140. if location == localLocation && topoMap[location][leader].Name == hostname {
  141. t.leader = true
  142. }
  143. var allowedIPs []net.IPNet
  144. allowedLocationIPsMap := make(map[string]struct{})
  145. var allowedLocationIPs []net.IPNet
  146. var cidrs []*net.IPNet
  147. var hostnames []string
  148. var privateIPs []net.IP
  149. var cniCompatibilityIPs []*net.IPNet
  150. for _, node := range topoMap[location] {
  151. // Allowed IPs should include:
  152. // - the node's allocated subnet
  153. // - the node's WireGuard IP
  154. // - the node's internal IP
  155. // - IPs that were specified by the allowed-location-ips annotation
  156. if node.Subnet != nil {
  157. allowedIPs = append(allowedIPs, *node.Subnet)
  158. }
  159. for _, ip := range node.AllowedLocationIPs {
  160. if _, ok := allowedLocationIPsMap[ip.String()]; !ok {
  161. allowedLocationIPs = append(allowedLocationIPs, ip)
  162. allowedLocationIPsMap[ip.String()] = struct{}{}
  163. }
  164. }
  165. if node.InternalIP != nil {
  166. allowedIPs = append(allowedIPs, *oneAddressCIDR(node.InternalIP.IP))
  167. privateIPs = append(privateIPs, node.InternalIP.IP)
  168. }
  169. cniCompatibilityIPs = append(cniCompatibilityIPs, node.CNICompatibilityIP)
  170. cidrs = append(cidrs, node.Subnet)
  171. hostnames = append(hostnames, node.Name)
  172. }
  173. // The sorting has no function, but makes testing easier.
  174. sort.Slice(allowedLocationIPs, func(i, j int) bool {
  175. return allowedLocationIPs[i].String() < allowedLocationIPs[j].String()
  176. })
  177. t.segments = append(t.segments, &segment{
  178. allowedIPs: allowedIPs,
  179. endpoint: topoMap[location][leader].Endpoint,
  180. key: topoMap[location][leader].Key,
  181. persistentKeepalive: topoMap[location][leader].PersistentKeepalive,
  182. location: location,
  183. cidrs: cidrs,
  184. hostnames: hostnames,
  185. leader: leader,
  186. privateIPs: privateIPs,
  187. cniCompatibilityIPs: cniCompatibilityIPs,
  188. allowedLocationIPs: allowedLocationIPs,
  189. })
  190. _ = level.Debug(t.logger).Log("msg", "generated segment", "location", location, "allowedIPs", allowedIPs, "endpoint", topoMap[location][leader].Endpoint, "cidrs", cidrs, "hostnames", hostnames, "leader", leader, "privateIPs", privateIPs, "allowedLocationIPs", allowedLocationIPs)
  191. }
  192. // Sort the Topology segments so the result is stable.
  193. sort.Slice(t.segments, func(i, j int) bool {
  194. return t.segments[i].location < t.segments[j].location
  195. })
  196. for _, peer := range peers {
  197. t.peers = append(t.peers, peer)
  198. }
  199. // Sort the Topology peers so the result is stable.
  200. sort.Slice(t.peers, func(i, j int) bool {
  201. return t.peers[i].Name < t.peers[j].Name
  202. })
  203. // We need to defensively deduplicate peer allowed IPs. If two peers claim the same IP,
  204. // the WireGuard configuration could flap, causing the interface to churn.
  205. t.peers = deduplicatePeerIPs(t.peers)
  206. // Copy the host node DiscoveredEndpoints in the topology as a starting point.
  207. for key := range nodes[hostname].DiscoveredEndpoints {
  208. t.discoveredEndpoints[key] = nodes[hostname].DiscoveredEndpoints[key]
  209. }
  210. // Allocate IPs to the segment leaders in a stable, coordination-free manner.
  211. a := newAllocator(*subnet)
  212. for _, segment := range t.segments {
  213. ipNet := a.next()
  214. if ipNet == nil {
  215. return nil, errors.New("failed to allocate an IP address; ran out of IP addresses")
  216. }
  217. segment.wireGuardIP = ipNet.IP
  218. segment.allowedIPs = append(segment.allowedIPs, *oneAddressCIDR(ipNet.IP))
  219. if t.leader && segment.location == t.location {
  220. t.wireGuardCIDR = &net.IPNet{IP: ipNet.IP, Mask: subnet.Mask}
  221. }
  222. // Now that the topology is ordered, update the discoveredEndpoints map
  223. // add new ones by going through the ordered topology: segments, nodes
  224. for _, node := range topoMap[segment.location] {
  225. for key := range node.DiscoveredEndpoints {
  226. if _, ok := t.discoveredEndpoints[key]; !ok {
  227. t.discoveredEndpoints[key] = node.DiscoveredEndpoints[key]
  228. }
  229. }
  230. }
  231. // Check for intersecting IPs in allowed location IPs
  232. segment.allowedLocationIPs = t.filterAllowedLocationIPs(segment.allowedLocationIPs, segment.location)
  233. }
  234. _ = level.Debug(t.logger).Log("msg", "generated topology", "location", t.location, "hostname", t.hostname, "wireGuardIP", t.wireGuardCIDR, "privateIP", t.privateIP, "subnet", t.subnet, "leader", t.leader)
  235. return &t, nil
  236. }
  237. func intersect(n1, n2 net.IPNet) bool {
  238. return n1.Contains(n2.IP) || n2.Contains(n1.IP)
  239. }
  240. func (t *Topology) filterAllowedLocationIPs(ips []net.IPNet, location string) (ret []net.IPNet) {
  241. CheckIPs:
  242. for _, ip := range ips {
  243. for _, s := range t.segments {
  244. // Check if allowed location IPs are also allowed in other locations.
  245. if location != s.location {
  246. for _, i := range s.allowedLocationIPs {
  247. if intersect(ip, i) {
  248. _ = level.Warn(t.logger).Log("msg", "overlapping allowed location IPnets", "IP", ip.String(), "IP2", i.String(), "segment-location", s.location)
  249. continue CheckIPs
  250. }
  251. }
  252. }
  253. // Check if allowed location IPs intersect with the allowed IPs.
  254. // If the allowed location IP strictly contains an allowed IP, that's
  255. // fine - the more specific route will be used. Reject if the allowed
  256. // IP contains or equals the allowed location IP.
  257. for _, i := range s.allowedIPs {
  258. if i.Contains(ip.IP) {
  259. _ = level.Warn(t.logger).Log("msg", "overlapping allowed location IPnet with allowed IPnets", "IP", ip.String(), "IP2", i.String(), "segment-location", s.location)
  260. continue CheckIPs
  261. }
  262. }
  263. // Check if allowed location IPs intersect with the private IPs of the segment.
  264. // If the allowed location IP fully contains a private IP, that's fine.
  265. for _, i := range s.privateIPs {
  266. if ip.Contains(i) {
  267. // This is OK - the allowed location IP contains the private IP,
  268. // so the more specific route to the private IP will still work.
  269. _ = level.Debug(t.logger).Log("msg", "allowed location IPnet contains privateIP", "IP", ip.String(), "IP2", i.String(), "segment-location", s.location)
  270. }
  271. }
  272. }
  273. // Check if allowed location IPs intersect with allowed IPs of peers.
  274. for _, p := range t.peers {
  275. for _, i := range p.AllowedIPs {
  276. if intersect(ip, i) {
  277. _ = level.Warn(t.logger).Log("msg", "overlapping allowed location IPnet with peer IPnet", "IP", ip.String(), "IP2", i.String(), "peer", p.Name)
  278. continue CheckIPs
  279. }
  280. }
  281. }
  282. ret = append(ret, ip)
  283. }
  284. return
  285. }
  286. func (t *Topology) updateEndpoint(endpoint *wireguard.Endpoint, key wgtypes.Key, persistentKeepalive *time.Duration) *wireguard.Endpoint {
  287. // Do not update non-nat peers
  288. if persistentKeepalive == nil || *persistentKeepalive == time.Duration(0) {
  289. return endpoint
  290. }
  291. e, ok := t.discoveredEndpoints[key.String()]
  292. if ok {
  293. return wireguard.NewEndpointFromUDPAddr(e)
  294. }
  295. return endpoint
  296. }
  297. // Conf generates a WireGuard configuration file for a given Topology.
  298. func (t *Topology) Conf() *wireguard.Conf {
  299. c := &wireguard.Conf{
  300. Config: wgtypes.Config{
  301. PrivateKey: &t.key,
  302. ListenPort: &t.port,
  303. ReplacePeers: true,
  304. },
  305. }
  306. for _, s := range t.segments {
  307. if s.location == t.location {
  308. continue
  309. }
  310. peer := wireguard.Peer{
  311. PeerConfig: wgtypes.PeerConfig{
  312. AllowedIPs: append(s.allowedIPs, s.allowedLocationIPs...),
  313. PersistentKeepaliveInterval: &t.persistentKeepalive,
  314. PublicKey: s.key,
  315. ReplaceAllowedIPs: true,
  316. },
  317. Endpoint: t.updateEndpoint(s.endpoint, s.key, &s.persistentKeepalive),
  318. }
  319. c.Peers = append(c.Peers, peer)
  320. }
  321. for _, p := range t.peers {
  322. peer := wireguard.Peer{
  323. PeerConfig: wgtypes.PeerConfig{
  324. AllowedIPs: p.AllowedIPs,
  325. PersistentKeepaliveInterval: &t.persistentKeepalive,
  326. PresharedKey: p.PresharedKey,
  327. PublicKey: p.PublicKey,
  328. ReplaceAllowedIPs: true,
  329. },
  330. Endpoint: t.updateEndpoint(p.Endpoint, p.PublicKey, p.PersistentKeepaliveInterval),
  331. }
  332. c.Peers = append(c.Peers, peer)
  333. }
  334. return c
  335. }
  336. // AsPeer generates the WireGuard peer configuration for the local location of the given Topology.
  337. // This configuration can be used to configure this location as a peer of another WireGuard interface.
  338. func (t *Topology) AsPeer() *wireguard.Peer {
  339. for _, s := range t.segments {
  340. if s.location != t.location {
  341. continue
  342. }
  343. p := &wireguard.Peer{
  344. PeerConfig: wgtypes.PeerConfig{
  345. AllowedIPs: s.allowedIPs,
  346. PublicKey: s.key,
  347. },
  348. Endpoint: s.endpoint,
  349. }
  350. return p
  351. }
  352. return nil
  353. }
  354. // PeerConf generates a WireGuard configuration file for a given peer in a Topology.
  355. func (t *Topology) PeerConf(name string) *wireguard.Conf {
  356. var pka *time.Duration
  357. var psk *wgtypes.Key
  358. for i := range t.peers {
  359. if t.peers[i].Name == name {
  360. pka = t.peers[i].PersistentKeepaliveInterval
  361. psk = t.peers[i].PresharedKey
  362. break
  363. }
  364. }
  365. c := &wireguard.Conf{}
  366. for _, s := range t.segments {
  367. peer := wireguard.Peer{
  368. PeerConfig: wgtypes.PeerConfig{
  369. AllowedIPs: append(s.allowedIPs, s.allowedLocationIPs...),
  370. PersistentKeepaliveInterval: pka,
  371. PresharedKey: psk,
  372. PublicKey: s.key,
  373. },
  374. Endpoint: t.updateEndpoint(s.endpoint, s.key, &s.persistentKeepalive),
  375. }
  376. c.Peers = append(c.Peers, peer)
  377. }
  378. for i := range t.peers {
  379. if t.peers[i].Name == name {
  380. continue
  381. }
  382. peer := wireguard.Peer{
  383. PeerConfig: wgtypes.PeerConfig{
  384. AllowedIPs: t.peers[i].AllowedIPs,
  385. PersistentKeepaliveInterval: pka,
  386. PublicKey: t.peers[i].PublicKey,
  387. },
  388. Endpoint: t.updateEndpoint(t.peers[i].Endpoint, t.peers[i].PublicKey, t.peers[i].PersistentKeepaliveInterval),
  389. }
  390. c.Peers = append(c.Peers, peer)
  391. }
  392. return c
  393. }
  394. // oneAddressCIDR takes an IP address and returns a CIDR
  395. // that contains only that address.
  396. func oneAddressCIDR(ip net.IP) *net.IPNet {
  397. return &net.IPNet{IP: ip, Mask: net.CIDRMask(len(ip)*8, len(ip)*8)}
  398. }
  399. // ipFromIPNet extracts the IP from a *net.IPNet, returning nil if the IPNet is nil.
  400. func ipFromIPNet(n *net.IPNet) net.IP {
  401. if n == nil {
  402. return nil
  403. }
  404. return n.IP
  405. }
  406. // findLeader selects a leader for the nodes in a segment;
  407. // it will select the first node that says it should lead
  408. // or the first node in the segment if none have volunteered,
  409. // always preferring those with a public external IP address,
  410. func findLeader(nodes []*Node) int {
  411. var leaders, public []int
  412. for i := range nodes {
  413. if nodes[i].Leader {
  414. if isPublic(nodes[i].Endpoint.IP()) {
  415. return i
  416. }
  417. leaders = append(leaders, i)
  418. }
  419. if nodes[i].Endpoint.IP() != nil && isPublic(nodes[i].Endpoint.IP()) {
  420. public = append(public, i)
  421. }
  422. }
  423. if len(leaders) != 0 {
  424. return leaders[0]
  425. }
  426. if len(public) != 0 {
  427. return public[0]
  428. }
  429. return 0
  430. }
  431. func deduplicatePeerIPs(peers []*Peer) []*Peer {
  432. ps := make([]*Peer, len(peers))
  433. ips := make(map[string]struct{})
  434. for i, peer := range peers {
  435. p := Peer{
  436. Name: peer.Name,
  437. Peer: wireguard.Peer{
  438. PeerConfig: wgtypes.PeerConfig{
  439. PersistentKeepaliveInterval: peer.PersistentKeepaliveInterval,
  440. PresharedKey: peer.PresharedKey,
  441. PublicKey: peer.PublicKey,
  442. },
  443. Endpoint: peer.Endpoint,
  444. },
  445. }
  446. for _, ip := range peer.AllowedIPs {
  447. if _, ok := ips[ip.String()]; ok {
  448. continue
  449. }
  450. p.AllowedIPs = append(p.AllowedIPs, ip)
  451. ips[ip.String()] = struct{}{}
  452. }
  453. ps[i] = &p
  454. }
  455. return ps
  456. }