topology.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. // Copyright 2019 the Kilo authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mesh
  15. import (
  16. "errors"
  17. "net"
  18. "sort"
  19. "github.com/squat/kilo/pkg/encapsulation"
  20. "github.com/squat/kilo/pkg/wireguard"
  21. "github.com/vishvananda/netlink"
  22. "golang.org/x/sys/unix"
  23. )
  24. // Topology represents the logical structure of the overlay network.
  25. type Topology struct {
  26. // key is the private key of the node creating the topology.
  27. key []byte
  28. port uint32
  29. // Location is the logical location of the local host.
  30. location string
  31. segments []*segment
  32. peers []*Peer
  33. // hostname is the hostname of the local host.
  34. hostname string
  35. // leader represents whether or not the local host
  36. // is the segment leader.
  37. leader bool
  38. // subnet is the entire subnet from which IPs
  39. // for the WireGuard interfaces will be allocated.
  40. subnet *net.IPNet
  41. // privateIP is the private IP address of the local node.
  42. privateIP *net.IPNet
  43. // wireGuardCIDR is the allocated CIDR of the WireGuard
  44. // interface of the local node. If the local node is not
  45. // the leader, then it is nil.
  46. wireGuardCIDR *net.IPNet
  47. }
  48. type segment struct {
  49. allowedIPs []*net.IPNet
  50. endpoint net.IP
  51. key []byte
  52. // Location is the logical location of this segment.
  53. location string
  54. // cidrs is a slice of subnets of all peers in the segment.
  55. cidrs []*net.IPNet
  56. // hostnames is a slice of the hostnames of the peers in the segment.
  57. hostnames []string
  58. // leader is the index of the leader of the segment.
  59. leader int
  60. // privateIPs is a slice of private IPs of all peers in the segment.
  61. privateIPs []net.IP
  62. // wireGuardIP is the allocated IP address of the WireGuard
  63. // interface on the leader of the segment.
  64. wireGuardIP net.IP
  65. }
  66. // NewTopology creates a new Topology struct from a given set of nodes and peers.
  67. func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Granularity, hostname string, port uint32, key []byte, subnet *net.IPNet) (*Topology, error) {
  68. topoMap := make(map[string][]*Node)
  69. for _, node := range nodes {
  70. var location string
  71. switch granularity {
  72. case LogicalGranularity:
  73. location = node.Location
  74. case FullGranularity:
  75. location = node.Name
  76. }
  77. topoMap[location] = append(topoMap[location], node)
  78. }
  79. var localLocation string
  80. switch granularity {
  81. case LogicalGranularity:
  82. localLocation = nodes[hostname].Location
  83. case FullGranularity:
  84. localLocation = hostname
  85. }
  86. t := Topology{key: key, port: port, hostname: hostname, location: localLocation, subnet: subnet, privateIP: nodes[hostname].InternalIP}
  87. for location := range topoMap {
  88. // Sort the location so the result is stable.
  89. sort.Slice(topoMap[location], func(i, j int) bool {
  90. return topoMap[location][i].Name < topoMap[location][j].Name
  91. })
  92. leader := findLeader(topoMap[location])
  93. if location == localLocation && topoMap[location][leader].Name == hostname {
  94. t.leader = true
  95. }
  96. var allowedIPs []*net.IPNet
  97. var cidrs []*net.IPNet
  98. var hostnames []string
  99. var privateIPs []net.IP
  100. for _, node := range topoMap[location] {
  101. // Allowed IPs should include:
  102. // - the node's allocated subnet
  103. // - the node's WireGuard IP
  104. // - the node's internal IP
  105. allowedIPs = append(allowedIPs, node.Subnet, oneAddressCIDR(node.InternalIP.IP))
  106. cidrs = append(cidrs, node.Subnet)
  107. hostnames = append(hostnames, node.Name)
  108. privateIPs = append(privateIPs, node.InternalIP.IP)
  109. }
  110. t.segments = append(t.segments, &segment{
  111. allowedIPs: allowedIPs,
  112. endpoint: topoMap[location][leader].ExternalIP.IP,
  113. key: topoMap[location][leader].Key,
  114. location: location,
  115. cidrs: cidrs,
  116. hostnames: hostnames,
  117. leader: leader,
  118. privateIPs: privateIPs,
  119. })
  120. }
  121. // Sort the Topology segments so the result is stable.
  122. sort.Slice(t.segments, func(i, j int) bool {
  123. return t.segments[i].location < t.segments[j].location
  124. })
  125. for _, peer := range peers {
  126. t.peers = append(t.peers, peer)
  127. }
  128. // Sort the Topology peers so the result is stable.
  129. sort.Slice(t.peers, func(i, j int) bool {
  130. return t.peers[i].Name < t.peers[j].Name
  131. })
  132. // We need to defensively deduplicate peer allowed IPs. If two peers claim the same IP,
  133. // the WireGuard configuration could flap, causing the interface to churn.
  134. t.peers = deduplicatePeerIPs(t.peers)
  135. // Allocate IPs to the segment leaders in a stable, coordination-free manner.
  136. a := newAllocator(*subnet)
  137. for _, segment := range t.segments {
  138. ipNet := a.next()
  139. if ipNet == nil {
  140. return nil, errors.New("failed to allocate an IP address; ran out of IP addresses")
  141. }
  142. segment.wireGuardIP = ipNet.IP
  143. segment.allowedIPs = append(segment.allowedIPs, ipNet)
  144. if t.leader && segment.location == t.location {
  145. t.wireGuardCIDR = &net.IPNet{IP: ipNet.IP, Mask: t.subnet.Mask}
  146. }
  147. }
  148. return &t, nil
  149. }
  150. // RemoteSubnets identifies the subnets of the hosts in segments different than the host's.
  151. func (t *Topology) RemoteSubnets() []*net.IPNet {
  152. var remote []*net.IPNet
  153. for _, s := range t.segments {
  154. if s == nil || s.location == t.location {
  155. continue
  156. }
  157. remote = append(remote, s.cidrs...)
  158. }
  159. return remote
  160. }
  161. // Routes generates a slice of routes for a given Topology.
  162. func (t *Topology) Routes(kiloIface, privIface, tunlIface int, local bool, enc encapsulation.Encapsulator) []*netlink.Route {
  163. var routes []*netlink.Route
  164. if !t.leader {
  165. // Find the GW for this segment.
  166. // This will be the an IP of the leader.
  167. // In an IPIP encapsulated mesh it is the leader's private IP.
  168. var gw net.IP
  169. for _, segment := range t.segments {
  170. if segment.location == t.location {
  171. gw = enc.Gw(segment.endpoint, segment.privateIPs[segment.leader], segment.cidrs[segment.leader])
  172. break
  173. }
  174. }
  175. for _, segment := range t.segments {
  176. // First, add a route to the WireGuard IP of the segment.
  177. routes = append(routes, encapsulateRoute(&netlink.Route{
  178. Dst: oneAddressCIDR(segment.wireGuardIP),
  179. Flags: int(netlink.FLAG_ONLINK),
  180. Gw: gw,
  181. LinkIndex: privIface,
  182. Protocol: unix.RTPROT_STATIC,
  183. }, enc.Strategy(), t.privateIP, tunlIface))
  184. // Add routes for the current segment if local is true.
  185. if segment.location == t.location {
  186. if local {
  187. for i := range segment.cidrs {
  188. // Don't add routes for the local node.
  189. if segment.privateIPs[i].Equal(t.privateIP.IP) {
  190. continue
  191. }
  192. routes = append(routes, encapsulateRoute(&netlink.Route{
  193. Dst: segment.cidrs[i],
  194. Flags: int(netlink.FLAG_ONLINK),
  195. Gw: segment.privateIPs[i],
  196. LinkIndex: privIface,
  197. Protocol: unix.RTPROT_STATIC,
  198. }, enc.Strategy(), t.privateIP, tunlIface))
  199. }
  200. }
  201. continue
  202. }
  203. for i := range segment.cidrs {
  204. // Add routes to the Pod CIDRs of nodes in other segments.
  205. routes = append(routes, encapsulateRoute(&netlink.Route{
  206. Dst: segment.cidrs[i],
  207. Flags: int(netlink.FLAG_ONLINK),
  208. Gw: gw,
  209. LinkIndex: privIface,
  210. Protocol: unix.RTPROT_STATIC,
  211. }, enc.Strategy(), t.privateIP, tunlIface))
  212. // Add routes to the private IPs of nodes in other segments.
  213. // Number of CIDRs and private IPs always match so
  214. // we can reuse the loop.
  215. routes = append(routes, encapsulateRoute(&netlink.Route{
  216. Dst: oneAddressCIDR(segment.privateIPs[i]),
  217. Flags: int(netlink.FLAG_ONLINK),
  218. Gw: gw,
  219. LinkIndex: privIface,
  220. Protocol: unix.RTPROT_STATIC,
  221. }, enc.Strategy(), t.privateIP, tunlIface))
  222. }
  223. }
  224. // Add routes for the allowed IPs of peers.
  225. for _, peer := range t.peers {
  226. for i := range peer.AllowedIPs {
  227. routes = append(routes, encapsulateRoute(&netlink.Route{
  228. Dst: peer.AllowedIPs[i],
  229. Flags: int(netlink.FLAG_ONLINK),
  230. Gw: gw,
  231. LinkIndex: privIface,
  232. Protocol: unix.RTPROT_STATIC,
  233. }, enc.Strategy(), t.privateIP, tunlIface))
  234. }
  235. }
  236. return routes
  237. }
  238. for _, segment := range t.segments {
  239. // Add routes for the current segment if local is true.
  240. if segment.location == t.location {
  241. if local {
  242. for i := range segment.cidrs {
  243. // Don't add routes for the local node.
  244. if segment.privateIPs[i].Equal(t.privateIP.IP) {
  245. continue
  246. }
  247. routes = append(routes, encapsulateRoute(&netlink.Route{
  248. Dst: segment.cidrs[i],
  249. Flags: int(netlink.FLAG_ONLINK),
  250. Gw: segment.privateIPs[i],
  251. LinkIndex: privIface,
  252. Protocol: unix.RTPROT_STATIC,
  253. }, enc.Strategy(), t.privateIP, tunlIface))
  254. }
  255. }
  256. continue
  257. }
  258. for i := range segment.cidrs {
  259. // Add routes to the Pod CIDRs of nodes in other segments.
  260. routes = append(routes, &netlink.Route{
  261. Dst: segment.cidrs[i],
  262. Flags: int(netlink.FLAG_ONLINK),
  263. Gw: segment.wireGuardIP,
  264. LinkIndex: kiloIface,
  265. Protocol: unix.RTPROT_STATIC,
  266. })
  267. // Don't add routes through Kilo if the private IP
  268. // equals the external IP. This means that the node
  269. // is only accessible through an external IP and we
  270. // cannot encapsulate traffic to an IP through the IP.
  271. if segment.privateIPs[i].Equal(segment.endpoint) {
  272. continue
  273. }
  274. // Add routes to the private IPs of nodes in other segments.
  275. // Number of CIDRs and private IPs always match so
  276. // we can reuse the loop.
  277. routes = append(routes, &netlink.Route{
  278. Dst: oneAddressCIDR(segment.privateIPs[i]),
  279. Flags: int(netlink.FLAG_ONLINK),
  280. Gw: segment.wireGuardIP,
  281. LinkIndex: kiloIface,
  282. Protocol: unix.RTPROT_STATIC,
  283. })
  284. }
  285. }
  286. // Add routes for the allowed IPs of peers.
  287. for _, peer := range t.peers {
  288. for i := range peer.AllowedIPs {
  289. routes = append(routes, &netlink.Route{
  290. Dst: peer.AllowedIPs[i],
  291. LinkIndex: kiloIface,
  292. Protocol: unix.RTPROT_STATIC,
  293. })
  294. }
  295. }
  296. return routes
  297. }
  298. func encapsulateRoute(route *netlink.Route, encapsulate encapsulation.Strategy, subnet *net.IPNet, tunlIface int) *netlink.Route {
  299. if encapsulate == encapsulation.Always || (encapsulate == encapsulation.CrossSubnet && !subnet.Contains(route.Gw)) {
  300. route.LinkIndex = tunlIface
  301. }
  302. return route
  303. }
  304. // Conf generates a WireGuard configuration file for a given Topology.
  305. func (t *Topology) Conf() *wireguard.Conf {
  306. c := &wireguard.Conf{
  307. Interface: &wireguard.Interface{
  308. PrivateKey: t.key,
  309. ListenPort: t.port,
  310. },
  311. }
  312. for _, s := range t.segments {
  313. if s.location == t.location {
  314. continue
  315. }
  316. peer := &wireguard.Peer{
  317. AllowedIPs: s.allowedIPs,
  318. Endpoint: &wireguard.Endpoint{
  319. IP: s.endpoint,
  320. Port: uint32(t.port),
  321. },
  322. PublicKey: s.key,
  323. }
  324. c.Peers = append(c.Peers, peer)
  325. }
  326. for _, p := range t.peers {
  327. peer := &wireguard.Peer{
  328. AllowedIPs: p.AllowedIPs,
  329. PersistentKeepalive: p.PersistentKeepalive,
  330. PublicKey: p.PublicKey,
  331. Endpoint: p.Endpoint,
  332. }
  333. c.Peers = append(c.Peers, peer)
  334. }
  335. return c
  336. }
  337. // AsPeer generates the WireGuard peer configuration for the local location of the given Topology.
  338. // This configuration can be used to configure this location as a peer of another WireGuard interface.
  339. func (t *Topology) AsPeer() *wireguard.Peer {
  340. for _, s := range t.segments {
  341. if s.location != t.location {
  342. continue
  343. }
  344. return &wireguard.Peer{
  345. AllowedIPs: s.allowedIPs,
  346. Endpoint: &wireguard.Endpoint{
  347. IP: s.endpoint,
  348. Port: uint32(t.port),
  349. },
  350. PublicKey: s.key,
  351. }
  352. }
  353. return nil
  354. }
  355. // PeerConf generates a WireGuard configuration file for a given peer in a Topology.
  356. func (t *Topology) PeerConf(name string) *wireguard.Conf {
  357. c := &wireguard.Conf{}
  358. for _, s := range t.segments {
  359. peer := &wireguard.Peer{
  360. AllowedIPs: s.allowedIPs,
  361. Endpoint: &wireguard.Endpoint{
  362. IP: s.endpoint,
  363. Port: uint32(t.port),
  364. },
  365. PublicKey: s.key,
  366. }
  367. c.Peers = append(c.Peers, peer)
  368. }
  369. for _, p := range t.peers {
  370. if p.Name == name {
  371. continue
  372. }
  373. peer := &wireguard.Peer{
  374. AllowedIPs: p.AllowedIPs,
  375. PersistentKeepalive: p.PersistentKeepalive,
  376. PublicKey: p.PublicKey,
  377. Endpoint: p.Endpoint,
  378. }
  379. c.Peers = append(c.Peers, peer)
  380. }
  381. return c
  382. }
  383. // oneAddressCIDR takes an IP address and returns a CIDR
  384. // that contains only that address.
  385. func oneAddressCIDR(ip net.IP) *net.IPNet {
  386. return &net.IPNet{IP: ip, Mask: net.CIDRMask(len(ip)*8, len(ip)*8)}
  387. }
  388. // findLeader selects a leader for the nodes in a segment;
  389. // it will select the first node that says it should lead
  390. // or the first node in the segment if none have volunteered,
  391. // always preferring those with a public external IP address,
  392. func findLeader(nodes []*Node) int {
  393. var leaders, public []int
  394. for i := range nodes {
  395. if nodes[i].Leader {
  396. if isPublic(nodes[i].ExternalIP) {
  397. return i
  398. }
  399. leaders = append(leaders, i)
  400. }
  401. if isPublic(nodes[i].ExternalIP) {
  402. public = append(public, i)
  403. }
  404. }
  405. if len(leaders) != 0 {
  406. return leaders[0]
  407. }
  408. if len(public) != 0 {
  409. return public[0]
  410. }
  411. return 0
  412. }
  413. func deduplicatePeerIPs(peers []*Peer) []*Peer {
  414. ps := make([]*Peer, len(peers))
  415. ips := make(map[string]struct{})
  416. for i, peer := range peers {
  417. p := Peer{
  418. Name: peer.Name,
  419. Peer: wireguard.Peer{
  420. Endpoint: peer.Endpoint,
  421. PersistentKeepalive: peer.PersistentKeepalive,
  422. PublicKey: peer.PublicKey,
  423. },
  424. }
  425. for _, ip := range peer.AllowedIPs {
  426. if _, ok := ips[ip.String()]; ok {
  427. continue
  428. }
  429. p.AllowedIPs = append(p.AllowedIPs, ip)
  430. ips[ip.String()] = struct{}{}
  431. }
  432. ps[i] = &p
  433. }
  434. return ps
  435. }