topology.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. // Copyright 2019 the Kilo authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mesh
  15. import (
  16. "errors"
  17. "net"
  18. "sort"
  19. "github.com/squat/kilo/pkg/encapsulation"
  20. "github.com/squat/kilo/pkg/wireguard"
  21. "github.com/vishvananda/netlink"
  22. "golang.org/x/sys/unix"
  23. )
  24. const kiloTableIndex = 1107
  25. // Topology represents the logical structure of the overlay network.
  26. type Topology struct {
  27. // key is the private key of the node creating the topology.
  28. key []byte
  29. port uint32
  30. // Location is the logical location of the local host.
  31. location string
  32. segments []*segment
  33. peers []*Peer
  34. // hostname is the hostname of the local host.
  35. hostname string
  36. // leader represents whether or not the local host
  37. // is the segment leader.
  38. leader bool
  39. // subnet is the Pod subnet of the local node.
  40. subnet *net.IPNet
  41. // privateIP is the private IP address of the local node.
  42. privateIP *net.IPNet
  43. // wireGuardCIDR is the allocated CIDR of the WireGuard
  44. // interface of the local node. If the local node is not
  45. // the leader, then it is nil.
  46. wireGuardCIDR *net.IPNet
  47. }
  48. type segment struct {
  49. allowedIPs []*net.IPNet
  50. endpoint *wireguard.Endpoint
  51. key []byte
  52. // Location is the logical location of this segment.
  53. location string
  54. // cidrs is a slice of subnets of all peers in the segment.
  55. cidrs []*net.IPNet
  56. // hostnames is a slice of the hostnames of the peers in the segment.
  57. hostnames []string
  58. // leader is the index of the leader of the segment.
  59. leader int
  60. // persistentKeepalive is the interval in seconds of the emission
  61. // of keepalive packets to the peer.
  62. persistentKeepalive int
  63. // privateIPs is a slice of private IPs of all peers in the segment.
  64. privateIPs []net.IP
  65. // wireGuardIP is the allocated IP address of the WireGuard
  66. // interface on the leader of the segment.
  67. wireGuardIP net.IP
  68. }
  69. // NewTopology creates a new Topology struct from a given set of nodes and peers.
  70. func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Granularity, hostname string, port uint32, key []byte, subnet *net.IPNet) (*Topology, error) {
  71. topoMap := make(map[string][]*Node)
  72. for _, node := range nodes {
  73. var location string
  74. switch granularity {
  75. case LogicalGranularity:
  76. location = node.Location
  77. case FullGranularity:
  78. location = node.Name
  79. }
  80. topoMap[location] = append(topoMap[location], node)
  81. }
  82. var localLocation string
  83. switch granularity {
  84. case LogicalGranularity:
  85. localLocation = nodes[hostname].Location
  86. case FullGranularity:
  87. localLocation = hostname
  88. }
  89. t := Topology{key: key, port: port, hostname: hostname, location: localLocation, subnet: nodes[hostname].Subnet, privateIP: nodes[hostname].InternalIP}
  90. for location := range topoMap {
  91. // Sort the location so the result is stable.
  92. sort.Slice(topoMap[location], func(i, j int) bool {
  93. return topoMap[location][i].Name < topoMap[location][j].Name
  94. })
  95. leader := findLeader(topoMap[location])
  96. if location == localLocation && topoMap[location][leader].Name == hostname {
  97. t.leader = true
  98. }
  99. var allowedIPs []*net.IPNet
  100. var cidrs []*net.IPNet
  101. var hostnames []string
  102. var privateIPs []net.IP
  103. for _, node := range topoMap[location] {
  104. // Allowed IPs should include:
  105. // - the node's allocated subnet
  106. // - the node's WireGuard IP
  107. // - the node's internal IP
  108. allowedIPs = append(allowedIPs, node.Subnet, oneAddressCIDR(node.InternalIP.IP))
  109. cidrs = append(cidrs, node.Subnet)
  110. hostnames = append(hostnames, node.Name)
  111. privateIPs = append(privateIPs, node.InternalIP.IP)
  112. }
  113. t.segments = append(t.segments, &segment{
  114. allowedIPs: allowedIPs,
  115. endpoint: topoMap[location][leader].Endpoint,
  116. key: topoMap[location][leader].Key,
  117. location: location,
  118. cidrs: cidrs,
  119. hostnames: hostnames,
  120. leader: leader,
  121. privateIPs: privateIPs,
  122. persistentKeepalive: topoMap[location][leader].PersistentKeepalive,
  123. })
  124. }
  125. // Sort the Topology segments so the result is stable.
  126. sort.Slice(t.segments, func(i, j int) bool {
  127. return t.segments[i].location < t.segments[j].location
  128. })
  129. for _, peer := range peers {
  130. t.peers = append(t.peers, peer)
  131. }
  132. // Sort the Topology peers so the result is stable.
  133. sort.Slice(t.peers, func(i, j int) bool {
  134. return t.peers[i].Name < t.peers[j].Name
  135. })
  136. // We need to defensively deduplicate peer allowed IPs. If two peers claim the same IP,
  137. // the WireGuard configuration could flap, causing the interface to churn.
  138. t.peers = deduplicatePeerIPs(t.peers)
  139. // Allocate IPs to the segment leaders in a stable, coordination-free manner.
  140. a := newAllocator(*subnet)
  141. for _, segment := range t.segments {
  142. ipNet := a.next()
  143. if ipNet == nil {
  144. return nil, errors.New("failed to allocate an IP address; ran out of IP addresses")
  145. }
  146. segment.wireGuardIP = ipNet.IP
  147. segment.allowedIPs = append(segment.allowedIPs, oneAddressCIDR(ipNet.IP))
  148. if t.leader && segment.location == t.location {
  149. t.wireGuardCIDR = &net.IPNet{IP: ipNet.IP, Mask: subnet.Mask}
  150. }
  151. }
  152. return &t, nil
  153. }
  154. // Routes generates a slice of routes for a given Topology.
  155. func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface int, local bool, enc encapsulation.Encapsulator) ([]*netlink.Route, []*netlink.Rule) {
  156. var routes []*netlink.Route
  157. var rules []*netlink.Rule
  158. if !t.leader {
  159. // Find the GW for this segment.
  160. // This will be the an IP of the leader.
  161. // In an IPIP encapsulated mesh it is the leader's private IP.
  162. var gw net.IP
  163. for _, segment := range t.segments {
  164. if segment.location == t.location {
  165. gw = enc.Gw(segment.endpoint.IP, segment.privateIPs[segment.leader], segment.cidrs[segment.leader])
  166. break
  167. }
  168. }
  169. for _, segment := range t.segments {
  170. // First, add a route to the WireGuard IP of the segment.
  171. routes = append(routes, encapsulateRoute(&netlink.Route{
  172. Dst: oneAddressCIDR(segment.wireGuardIP),
  173. Flags: int(netlink.FLAG_ONLINK),
  174. Gw: gw,
  175. LinkIndex: privIface,
  176. Protocol: unix.RTPROT_STATIC,
  177. }, enc.Strategy(), t.privateIP, tunlIface))
  178. // Add routes for the current segment if local is true.
  179. if segment.location == t.location {
  180. if local {
  181. for i := range segment.cidrs {
  182. // Don't add routes for the local node.
  183. if segment.privateIPs[i].Equal(t.privateIP.IP) {
  184. continue
  185. }
  186. routes = append(routes, encapsulateRoute(&netlink.Route{
  187. Dst: segment.cidrs[i],
  188. Flags: int(netlink.FLAG_ONLINK),
  189. Gw: segment.privateIPs[i],
  190. LinkIndex: privIface,
  191. Protocol: unix.RTPROT_STATIC,
  192. }, enc.Strategy(), t.privateIP, tunlIface))
  193. // Encapsulate packets from the host's Pod subnet headed
  194. // to private IPs.
  195. if enc.Strategy() == encapsulation.Always || (enc.Strategy() == encapsulation.CrossSubnet && !t.privateIP.Contains(segment.privateIPs[i])) {
  196. routes = append(routes, &netlink.Route{
  197. Dst: oneAddressCIDR(segment.privateIPs[i]),
  198. Flags: int(netlink.FLAG_ONLINK),
  199. Gw: segment.privateIPs[i],
  200. LinkIndex: tunlIface,
  201. Protocol: unix.RTPROT_STATIC,
  202. Table: kiloTableIndex,
  203. })
  204. rules = append(rules, defaultRule(&netlink.Rule{
  205. Src: t.subnet,
  206. Dst: oneAddressCIDR(segment.privateIPs[i]),
  207. Table: kiloTableIndex,
  208. }))
  209. }
  210. }
  211. }
  212. continue
  213. }
  214. for i := range segment.cidrs {
  215. // Add routes to the Pod CIDRs of nodes in other segments.
  216. routes = append(routes, encapsulateRoute(&netlink.Route{
  217. Dst: segment.cidrs[i],
  218. Flags: int(netlink.FLAG_ONLINK),
  219. Gw: gw,
  220. LinkIndex: privIface,
  221. Protocol: unix.RTPROT_STATIC,
  222. }, enc.Strategy(), t.privateIP, tunlIface))
  223. // Add routes to the private IPs of nodes in other segments.
  224. // Number of CIDRs and private IPs always match so
  225. // we can reuse the loop.
  226. routes = append(routes, encapsulateRoute(&netlink.Route{
  227. Dst: oneAddressCIDR(segment.privateIPs[i]),
  228. Flags: int(netlink.FLAG_ONLINK),
  229. Gw: gw,
  230. LinkIndex: privIface,
  231. Protocol: unix.RTPROT_STATIC,
  232. }, enc.Strategy(), t.privateIP, tunlIface))
  233. }
  234. }
  235. // Add routes for the allowed IPs of peers.
  236. for _, peer := range t.peers {
  237. for i := range peer.AllowedIPs {
  238. routes = append(routes, encapsulateRoute(&netlink.Route{
  239. Dst: peer.AllowedIPs[i],
  240. Flags: int(netlink.FLAG_ONLINK),
  241. Gw: gw,
  242. LinkIndex: privIface,
  243. Protocol: unix.RTPROT_STATIC,
  244. }, enc.Strategy(), t.privateIP, tunlIface))
  245. }
  246. }
  247. return routes, rules
  248. }
  249. for _, segment := range t.segments {
  250. // Add routes for the current segment if local is true.
  251. if segment.location == t.location {
  252. if local {
  253. for i := range segment.cidrs {
  254. // Don't add routes for the local node.
  255. if segment.privateIPs[i].Equal(t.privateIP.IP) {
  256. continue
  257. }
  258. routes = append(routes, encapsulateRoute(&netlink.Route{
  259. Dst: segment.cidrs[i],
  260. Flags: int(netlink.FLAG_ONLINK),
  261. Gw: segment.privateIPs[i],
  262. LinkIndex: privIface,
  263. Protocol: unix.RTPROT_STATIC,
  264. }, enc.Strategy(), t.privateIP, tunlIface))
  265. // Encapsulate packets from the host's Pod subnet headed
  266. // to private IPs.
  267. if enc.Strategy() == encapsulation.Always || (enc.Strategy() == encapsulation.CrossSubnet && !t.privateIP.Contains(segment.privateIPs[i])) {
  268. routes = append(routes, &netlink.Route{
  269. Dst: oneAddressCIDR(segment.privateIPs[i]),
  270. Flags: int(netlink.FLAG_ONLINK),
  271. Gw: segment.privateIPs[i],
  272. LinkIndex: tunlIface,
  273. Protocol: unix.RTPROT_STATIC,
  274. Table: kiloTableIndex,
  275. })
  276. rules = append(rules, defaultRule(&netlink.Rule{
  277. Src: t.subnet,
  278. Dst: oneAddressCIDR(segment.privateIPs[i]),
  279. Table: kiloTableIndex,
  280. }))
  281. // Also encapsulate packets from the Kilo interface
  282. // headed to private IPs.
  283. rules = append(rules, defaultRule(&netlink.Rule{
  284. Dst: oneAddressCIDR(segment.privateIPs[i]),
  285. Table: kiloTableIndex,
  286. IifName: kiloIfaceName,
  287. }))
  288. }
  289. }
  290. }
  291. continue
  292. }
  293. for i := range segment.cidrs {
  294. // Add routes to the Pod CIDRs of nodes in other segments.
  295. routes = append(routes, &netlink.Route{
  296. Dst: segment.cidrs[i],
  297. Flags: int(netlink.FLAG_ONLINK),
  298. Gw: segment.wireGuardIP,
  299. LinkIndex: kiloIface,
  300. Protocol: unix.RTPROT_STATIC,
  301. })
  302. // Don't add routes through Kilo if the private IP
  303. // equals the external IP. This means that the node
  304. // is only accessible through an external IP and we
  305. // cannot encapsulate traffic to an IP through the IP.
  306. if segment.privateIPs[i].Equal(segment.endpoint.IP) {
  307. continue
  308. }
  309. // Add routes to the private IPs of nodes in other segments.
  310. // Number of CIDRs and private IPs always match so
  311. // we can reuse the loop.
  312. routes = append(routes, &netlink.Route{
  313. Dst: oneAddressCIDR(segment.privateIPs[i]),
  314. Flags: int(netlink.FLAG_ONLINK),
  315. Gw: segment.wireGuardIP,
  316. LinkIndex: kiloIface,
  317. Protocol: unix.RTPROT_STATIC,
  318. })
  319. }
  320. }
  321. // Add routes for the allowed IPs of peers.
  322. for _, peer := range t.peers {
  323. for i := range peer.AllowedIPs {
  324. routes = append(routes, &netlink.Route{
  325. Dst: peer.AllowedIPs[i],
  326. LinkIndex: kiloIface,
  327. Protocol: unix.RTPROT_STATIC,
  328. })
  329. }
  330. }
  331. return routes, rules
  332. }
  333. func encapsulateRoute(route *netlink.Route, encapsulate encapsulation.Strategy, subnet *net.IPNet, tunlIface int) *netlink.Route {
  334. if encapsulate == encapsulation.Always || (encapsulate == encapsulation.CrossSubnet && !subnet.Contains(route.Gw)) {
  335. route.LinkIndex = tunlIface
  336. }
  337. return route
  338. }
  339. // Conf generates a WireGuard configuration file for a given Topology.
  340. func (t *Topology) Conf() *wireguard.Conf {
  341. c := &wireguard.Conf{
  342. Interface: &wireguard.Interface{
  343. PrivateKey: t.key,
  344. ListenPort: t.port,
  345. },
  346. }
  347. for _, s := range t.segments {
  348. if s.location == t.location {
  349. continue
  350. }
  351. peer := &wireguard.Peer{
  352. AllowedIPs: s.allowedIPs,
  353. Endpoint: s.endpoint,
  354. PublicKey: s.key,
  355. PersistentKeepalive: s.persistentKeepalive,
  356. }
  357. c.Peers = append(c.Peers, peer)
  358. }
  359. for _, p := range t.peers {
  360. peer := &wireguard.Peer{
  361. AllowedIPs: p.AllowedIPs,
  362. PersistentKeepalive: p.PersistentKeepalive,
  363. PublicKey: p.PublicKey,
  364. Endpoint: p.Endpoint,
  365. }
  366. c.Peers = append(c.Peers, peer)
  367. }
  368. return c
  369. }
  370. // AsPeer generates the WireGuard peer configuration for the local location of the given Topology.
  371. // This configuration can be used to configure this location as a peer of another WireGuard interface.
  372. func (t *Topology) AsPeer() *wireguard.Peer {
  373. for _, s := range t.segments {
  374. if s.location != t.location {
  375. continue
  376. }
  377. return &wireguard.Peer{
  378. AllowedIPs: s.allowedIPs,
  379. Endpoint: s.endpoint,
  380. PersistentKeepalive: s.persistentKeepalive,
  381. PublicKey: s.key,
  382. }
  383. }
  384. return nil
  385. }
  386. // PeerConf generates a WireGuard configuration file for a given peer in a Topology.
  387. func (t *Topology) PeerConf(name string) *wireguard.Conf {
  388. c := &wireguard.Conf{}
  389. for _, s := range t.segments {
  390. peer := &wireguard.Peer{
  391. AllowedIPs: s.allowedIPs,
  392. Endpoint: s.endpoint,
  393. PersistentKeepalive: s.persistentKeepalive,
  394. PublicKey: s.key,
  395. }
  396. c.Peers = append(c.Peers, peer)
  397. }
  398. for _, p := range t.peers {
  399. if p.Name == name {
  400. continue
  401. }
  402. peer := &wireguard.Peer{
  403. AllowedIPs: p.AllowedIPs,
  404. PersistentKeepalive: p.PersistentKeepalive,
  405. PublicKey: p.PublicKey,
  406. Endpoint: p.Endpoint,
  407. }
  408. c.Peers = append(c.Peers, peer)
  409. }
  410. return c
  411. }
  412. // oneAddressCIDR takes an IP address and returns a CIDR
  413. // that contains only that address.
  414. func oneAddressCIDR(ip net.IP) *net.IPNet {
  415. return &net.IPNet{IP: ip, Mask: net.CIDRMask(len(ip)*8, len(ip)*8)}
  416. }
  417. // findLeader selects a leader for the nodes in a segment;
  418. // it will select the first node that says it should lead
  419. // or the first node in the segment if none have volunteered,
  420. // always preferring those with a public external IP address,
  421. func findLeader(nodes []*Node) int {
  422. var leaders, public []int
  423. for i := range nodes {
  424. if nodes[i].Leader {
  425. if isPublic(nodes[i].Endpoint.IP) {
  426. return i
  427. }
  428. leaders = append(leaders, i)
  429. }
  430. if isPublic(nodes[i].Endpoint.IP) {
  431. public = append(public, i)
  432. }
  433. }
  434. if len(leaders) != 0 {
  435. return leaders[0]
  436. }
  437. if len(public) != 0 {
  438. return public[0]
  439. }
  440. return 0
  441. }
  442. func deduplicatePeerIPs(peers []*Peer) []*Peer {
  443. ps := make([]*Peer, len(peers))
  444. ips := make(map[string]struct{})
  445. for i, peer := range peers {
  446. p := Peer{
  447. Name: peer.Name,
  448. Peer: wireguard.Peer{
  449. Endpoint: peer.Endpoint,
  450. PersistentKeepalive: peer.PersistentKeepalive,
  451. PublicKey: peer.PublicKey,
  452. },
  453. }
  454. for _, ip := range peer.AllowedIPs {
  455. if _, ok := ips[ip.String()]; ok {
  456. continue
  457. }
  458. p.AllowedIPs = append(p.AllowedIPs, ip)
  459. ips[ip.String()] = struct{}{}
  460. }
  461. ps[i] = &p
  462. }
  463. return ps
  464. }
  465. func defaultRule(rule *netlink.Rule) *netlink.Rule {
  466. base := netlink.NewRule()
  467. base.Src = rule.Src
  468. base.Dst = rule.Dst
  469. base.IifName = rule.IifName
  470. base.Table = rule.Table
  471. return base
  472. }