topology.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. // Copyright 2019 the Kilo authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mesh
  15. import (
  16. "errors"
  17. "net"
  18. "sort"
  19. "github.com/go-kit/kit/log"
  20. "github.com/go-kit/kit/log/level"
  21. "github.com/kilo-io/kilo/pkg/wireguard"
  22. )
  23. const (
  24. logicalLocationPrefix = "location:"
  25. nodeLocationPrefix = "node:"
  26. )
  27. // Topology represents the logical structure of the overlay network.
  28. type Topology struct {
  29. // key is the private key of the node creating the topology.
  30. key []byte
  31. port uint32
  32. // Location is the logical location of the local host.
  33. location string
  34. segments []*segment
  35. peers []*Peer
  36. // hostname is the hostname of the local host.
  37. hostname string
  38. // leader represents whether or not the local host
  39. // is the segment leader.
  40. leader bool
  41. // persistentKeepalive is the interval in seconds of the emission
  42. // of keepalive packets by the local node to its peers.
  43. persistentKeepalive int
  44. // privateIP is the private IP address of the local node.
  45. privateIP *net.IPNet
  46. // subnet is the Pod subnet of the local node.
  47. subnet *net.IPNet
  48. // wireGuardCIDR is the allocated CIDR of the WireGuard
  49. // interface of the local node within the Kilo subnet.
  50. // If the local node is not the leader of a location, then
  51. // the IP is the 0th address in the subnet, i.e. the CIDR
  52. // is equal to the Kilo subnet.
  53. wireGuardCIDR *net.IPNet
  54. // discoveredEndpoints is the updated map of valid discovered Endpoints
  55. discoveredEndpoints map[string]*wireguard.Endpoint
  56. logger log.Logger
  57. }
  58. type segment struct {
  59. allowedIPs []*net.IPNet
  60. endpoint *wireguard.Endpoint
  61. key []byte
  62. persistentKeepalive int
  63. // Location is the logical location of this segment.
  64. location string
  65. // cidrs is a slice of subnets of all peers in the segment.
  66. cidrs []*net.IPNet
  67. // hostnames is a slice of the hostnames of the peers in the segment.
  68. hostnames []string
  69. // leader is the index of the leader of the segment.
  70. leader int
  71. // privateIPs is a slice of private IPs of all peers in the segment.
  72. privateIPs []net.IP
  73. // wireGuardIP is the allocated IP address of the WireGuard
  74. // interface on the leader of the segment.
  75. wireGuardIP net.IP
  76. // allowedLocationIPs are not part of the cluster and are not peers.
  77. // They are directly routable from nodes within the segment.
  78. // A classic example is a printer that ought to be routable from other locations.
  79. allowedLocationIPs []*net.IPNet
  80. }
  81. // NewTopology creates a new Topology struct from a given set of nodes and peers.
  82. func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Granularity, hostname string, port uint32, key []byte, subnet *net.IPNet, persistentKeepalive int, logger log.Logger) (*Topology, error) {
  83. if logger == nil {
  84. logger = log.NewNopLogger()
  85. }
  86. topoMap := make(map[string][]*Node)
  87. for _, node := range nodes {
  88. var location string
  89. switch granularity {
  90. case LogicalGranularity:
  91. location = logicalLocationPrefix + node.Location
  92. // Put node in a different location, if no private
  93. // IP was found.
  94. if node.InternalIP == nil {
  95. location = nodeLocationPrefix + node.Name
  96. }
  97. case FullGranularity:
  98. location = nodeLocationPrefix + node.Name
  99. }
  100. topoMap[location] = append(topoMap[location], node)
  101. }
  102. var localLocation string
  103. switch granularity {
  104. case LogicalGranularity:
  105. localLocation = logicalLocationPrefix + nodes[hostname].Location
  106. if nodes[hostname].InternalIP == nil {
  107. localLocation = nodeLocationPrefix + hostname
  108. }
  109. case FullGranularity:
  110. localLocation = nodeLocationPrefix + hostname
  111. }
  112. t := Topology{key: key, port: port, hostname: hostname, location: localLocation, persistentKeepalive: persistentKeepalive, privateIP: nodes[hostname].InternalIP, subnet: nodes[hostname].Subnet, wireGuardCIDR: subnet, discoveredEndpoints: make(map[string]*wireguard.Endpoint), logger: logger}
  113. for location := range topoMap {
  114. // Sort the location so the result is stable.
  115. sort.Slice(topoMap[location], func(i, j int) bool {
  116. return topoMap[location][i].Name < topoMap[location][j].Name
  117. })
  118. leader := findLeader(topoMap[location])
  119. if location == localLocation && topoMap[location][leader].Name == hostname {
  120. t.leader = true
  121. }
  122. var allowedIPs []*net.IPNet
  123. allowedLocationIPsMap := make(map[string]struct{})
  124. var allowedLocationIPs []*net.IPNet
  125. var cidrs []*net.IPNet
  126. var hostnames []string
  127. var privateIPs []net.IP
  128. for _, node := range topoMap[location] {
  129. // Allowed IPs should include:
  130. // - the node's allocated subnet
  131. // - the node's WireGuard IP
  132. // - the node's internal IP
  133. // - IPs that were specified by the allowed-location-ips annotation
  134. allowedIPs = append(allowedIPs, node.Subnet)
  135. for _, ip := range node.AllowedLocationIPs {
  136. if _, ok := allowedLocationIPsMap[ip.String()]; !ok {
  137. allowedLocationIPs = append(allowedLocationIPs, ip)
  138. allowedLocationIPsMap[ip.String()] = struct{}{}
  139. }
  140. }
  141. if node.InternalIP != nil {
  142. allowedIPs = append(allowedIPs, oneAddressCIDR(node.InternalIP.IP))
  143. privateIPs = append(privateIPs, node.InternalIP.IP)
  144. }
  145. cidrs = append(cidrs, node.Subnet)
  146. hostnames = append(hostnames, node.Name)
  147. }
  148. // The sorting has no function, but makes testing easier.
  149. sort.Slice(allowedLocationIPs, func(i, j int) bool {
  150. return allowedLocationIPs[i].String() < allowedLocationIPs[j].String()
  151. })
  152. t.segments = append(t.segments, &segment{
  153. allowedIPs: allowedIPs,
  154. endpoint: topoMap[location][leader].Endpoint,
  155. key: topoMap[location][leader].Key,
  156. persistentKeepalive: topoMap[location][leader].PersistentKeepalive,
  157. location: location,
  158. cidrs: cidrs,
  159. hostnames: hostnames,
  160. leader: leader,
  161. privateIPs: privateIPs,
  162. allowedLocationIPs: allowedLocationIPs,
  163. })
  164. }
  165. // Sort the Topology segments so the result is stable.
  166. sort.Slice(t.segments, func(i, j int) bool {
  167. return t.segments[i].location < t.segments[j].location
  168. })
  169. for _, peer := range peers {
  170. t.peers = append(t.peers, peer)
  171. }
  172. // Sort the Topology peers so the result is stable.
  173. sort.Slice(t.peers, func(i, j int) bool {
  174. return t.peers[i].Name < t.peers[j].Name
  175. })
  176. // We need to defensively deduplicate peer allowed IPs. If two peers claim the same IP,
  177. // the WireGuard configuration could flap, causing the interface to churn.
  178. t.peers = deduplicatePeerIPs(t.peers)
  179. // Copy the host node DiscoveredEndpoints in the topology as a starting point.
  180. for key := range nodes[hostname].DiscoveredEndpoints {
  181. t.discoveredEndpoints[key] = nodes[hostname].DiscoveredEndpoints[key]
  182. }
  183. // Allocate IPs to the segment leaders in a stable, coordination-free manner.
  184. a := newAllocator(*subnet)
  185. for _, segment := range t.segments {
  186. ipNet := a.next()
  187. if ipNet == nil {
  188. return nil, errors.New("failed to allocate an IP address; ran out of IP addresses")
  189. }
  190. segment.wireGuardIP = ipNet.IP
  191. segment.allowedIPs = append(segment.allowedIPs, oneAddressCIDR(ipNet.IP))
  192. if t.leader && segment.location == t.location {
  193. t.wireGuardCIDR = &net.IPNet{IP: ipNet.IP, Mask: subnet.Mask}
  194. }
  195. // Now that the topology is ordered, update the discoveredEndpoints map
  196. // add new ones by going through the ordered topology: segments, nodes
  197. for _, node := range topoMap[segment.location] {
  198. for key := range node.DiscoveredEndpoints {
  199. if _, ok := t.discoveredEndpoints[key]; !ok {
  200. t.discoveredEndpoints[key] = node.DiscoveredEndpoints[key]
  201. }
  202. }
  203. }
  204. // Check for intersecting IPs in allowed location IPs
  205. segment.allowedLocationIPs = t.filterAllowedLocationIPs(segment.allowedLocationIPs, segment.location)
  206. }
  207. return &t, nil
  208. }
  209. func intersect(n1, n2 *net.IPNet) bool {
  210. return n1.Contains(n2.IP) || n2.Contains(n1.IP)
  211. }
  212. func (t *Topology) filterAllowedLocationIPs(ips []*net.IPNet, location string) (ret []*net.IPNet) {
  213. CheckIPs:
  214. for _, ip := range ips {
  215. for _, s := range t.segments {
  216. // Check if allowed location IPs are also allowed in other locations.
  217. if location != s.location {
  218. for _, i := range s.allowedLocationIPs {
  219. if intersect(ip, i) {
  220. level.Warn(t.logger).Log("msg", "overlapping allowed location IPnets", "IP", ip.String(), "IP2", i.String(), "segment-location", s.location)
  221. continue CheckIPs
  222. }
  223. }
  224. }
  225. // Check if allowed location IPs intersect with the allowed IPs.
  226. for _, i := range s.allowedIPs {
  227. if intersect(ip, i) {
  228. level.Warn(t.logger).Log("msg", "overlapping allowed location IPnet with allowed IPnets", "IP", ip.String(), "IP2", i.String(), "segment-location", s.location)
  229. continue CheckIPs
  230. }
  231. }
  232. // Check if allowed location IPs intersect with the private IPs of the segment.
  233. for _, i := range s.privateIPs {
  234. if ip.Contains(i) {
  235. level.Warn(t.logger).Log("msg", "overlapping allowed location IPnet with privateIP", "IP", ip.String(), "IP2", i.String(), "segment-location", s.location)
  236. continue CheckIPs
  237. }
  238. }
  239. }
  240. // Check if allowed location IPs intersect with allowed IPs of peers.
  241. for _, p := range t.peers {
  242. for _, i := range p.AllowedIPs {
  243. if intersect(ip, i) {
  244. level.Warn(t.logger).Log("msg", "overlapping allowed location IPnet with peer IPnet", "IP", ip.String(), "IP2", i.String(), "peer", p.Name)
  245. continue CheckIPs
  246. }
  247. }
  248. }
  249. ret = append(ret, ip)
  250. }
  251. return
  252. }
  253. func (t *Topology) updateEndpoint(endpoint *wireguard.Endpoint, key []byte, persistentKeepalive int) *wireguard.Endpoint {
  254. // Do not update non-nat peers
  255. if persistentKeepalive == 0 {
  256. return endpoint
  257. }
  258. e, ok := t.discoveredEndpoints[string(key)]
  259. if ok {
  260. return e
  261. }
  262. return endpoint
  263. }
  264. // Conf generates a WireGuard configuration file for a given Topology.
  265. func (t *Topology) Conf() *wireguard.Conf {
  266. c := &wireguard.Conf{
  267. Interface: &wireguard.Interface{
  268. PrivateKey: t.key,
  269. ListenPort: t.port,
  270. },
  271. }
  272. for _, s := range t.segments {
  273. if s.location == t.location {
  274. continue
  275. }
  276. peer := &wireguard.Peer{
  277. AllowedIPs: append(s.allowedIPs, s.allowedLocationIPs...),
  278. Endpoint: t.updateEndpoint(s.endpoint, s.key, s.persistentKeepalive),
  279. PersistentKeepalive: t.persistentKeepalive,
  280. PublicKey: s.key,
  281. }
  282. c.Peers = append(c.Peers, peer)
  283. }
  284. for _, p := range t.peers {
  285. peer := &wireguard.Peer{
  286. AllowedIPs: p.AllowedIPs,
  287. Endpoint: t.updateEndpoint(p.Endpoint, p.PublicKey, p.PersistentKeepalive),
  288. PersistentKeepalive: t.persistentKeepalive,
  289. PresharedKey: p.PresharedKey,
  290. PublicKey: p.PublicKey,
  291. }
  292. c.Peers = append(c.Peers, peer)
  293. }
  294. return c
  295. }
  296. // AsPeer generates the WireGuard peer configuration for the local location of the given Topology.
  297. // This configuration can be used to configure this location as a peer of another WireGuard interface.
  298. func (t *Topology) AsPeer() *wireguard.Peer {
  299. for _, s := range t.segments {
  300. if s.location != t.location {
  301. continue
  302. }
  303. return &wireguard.Peer{
  304. AllowedIPs: s.allowedIPs,
  305. Endpoint: s.endpoint,
  306. PublicKey: s.key,
  307. }
  308. }
  309. return nil
  310. }
  311. // PeerConf generates a WireGuard configuration file for a given peer in a Topology.
  312. func (t *Topology) PeerConf(name string) *wireguard.Conf {
  313. var pka int
  314. var psk []byte
  315. for i := range t.peers {
  316. if t.peers[i].Name == name {
  317. pka = t.peers[i].PersistentKeepalive
  318. psk = t.peers[i].PresharedKey
  319. break
  320. }
  321. }
  322. c := &wireguard.Conf{}
  323. for _, s := range t.segments {
  324. peer := &wireguard.Peer{
  325. AllowedIPs: s.allowedIPs,
  326. Endpoint: s.endpoint,
  327. PersistentKeepalive: pka,
  328. PresharedKey: psk,
  329. PublicKey: s.key,
  330. }
  331. c.Peers = append(c.Peers, peer)
  332. }
  333. for i := range t.peers {
  334. if t.peers[i].Name == name {
  335. continue
  336. }
  337. peer := &wireguard.Peer{
  338. AllowedIPs: t.peers[i].AllowedIPs,
  339. PersistentKeepalive: pka,
  340. PublicKey: t.peers[i].PublicKey,
  341. Endpoint: t.peers[i].Endpoint,
  342. }
  343. c.Peers = append(c.Peers, peer)
  344. }
  345. return c
  346. }
  347. // oneAddressCIDR takes an IP address and returns a CIDR
  348. // that contains only that address.
  349. func oneAddressCIDR(ip net.IP) *net.IPNet {
  350. return &net.IPNet{IP: ip, Mask: net.CIDRMask(len(ip)*8, len(ip)*8)}
  351. }
  352. // findLeader selects a leader for the nodes in a segment;
  353. // it will select the first node that says it should lead
  354. // or the first node in the segment if none have volunteered,
  355. // always preferring those with a public external IP address,
  356. func findLeader(nodes []*Node) int {
  357. var leaders, public []int
  358. for i := range nodes {
  359. if nodes[i].Leader {
  360. if isPublic(nodes[i].Endpoint.IP) {
  361. return i
  362. }
  363. leaders = append(leaders, i)
  364. }
  365. if isPublic(nodes[i].Endpoint.IP) {
  366. public = append(public, i)
  367. }
  368. }
  369. if len(leaders) != 0 {
  370. return leaders[0]
  371. }
  372. if len(public) != 0 {
  373. return public[0]
  374. }
  375. return 0
  376. }
  377. func deduplicatePeerIPs(peers []*Peer) []*Peer {
  378. ps := make([]*Peer, len(peers))
  379. ips := make(map[string]struct{})
  380. for i, peer := range peers {
  381. p := Peer{
  382. Name: peer.Name,
  383. Peer: wireguard.Peer{
  384. Endpoint: peer.Endpoint,
  385. PersistentKeepalive: peer.PersistentKeepalive,
  386. PresharedKey: peer.PresharedKey,
  387. PublicKey: peer.PublicKey,
  388. },
  389. }
  390. for _, ip := range peer.AllowedIPs {
  391. if _, ok := ips[ip.String()]; ok {
  392. continue
  393. }
  394. p.AllowedIPs = append(p.AllowedIPs, ip)
  395. ips[ip.String()] = struct{}{}
  396. }
  397. ps[i] = &p
  398. }
  399. return ps
  400. }