mesh.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. // Copyright 2019 the Kilo authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mesh
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net"
  19. "os"
  20. "sync"
  21. "time"
  22. "github.com/go-kit/kit/log"
  23. "github.com/go-kit/kit/log/level"
  24. "github.com/prometheus/client_golang/prometheus"
  25. "github.com/vishvananda/netlink"
  26. "github.com/squat/kilo/pkg/iproute"
  27. "github.com/squat/kilo/pkg/ipset"
  28. "github.com/squat/kilo/pkg/iptables"
  29. "github.com/squat/kilo/pkg/route"
  30. "github.com/squat/kilo/pkg/wireguard"
  31. )
  32. const resyncPeriod = 30 * time.Second
  33. const (
  34. // KiloPath is the directory where Kilo stores its configuration.
  35. KiloPath = "/var/lib/kilo"
  36. // PrivateKeyPath is the filepath where the WireGuard private key is stored.
  37. PrivateKeyPath = KiloPath + "/key"
  38. // ConfPath is the filepath where the WireGuard configuration is stored.
  39. ConfPath = KiloPath + "/conf"
  40. )
  41. // Granularity represents the abstraction level at which the network
  42. // should be meshed.
  43. type Granularity string
  44. // Encapsulate identifies what packets within a location should
  45. // be encapsulated.
  46. type Encapsulate string
  47. const (
  48. // DataCenterGranularity indicates that the network should create
  49. // a mesh between data-centers but not between nodes within a
  50. // single data-center.
  51. DataCenterGranularity Granularity = "data-center"
  52. // NodeGranularity indicates that the network should create
  53. // a mesh between every node.
  54. NodeGranularity Granularity = "node"
  55. // NeverEncapsulate indicates that no packets within a location
  56. // should be encapsulated.
  57. NeverEncapsulate Encapsulate = "never"
  58. // CrossSubnetEncapsulate indicates that only packets that
  59. // traverse subnets within a location should be encapsulated.
  60. CrossSubnetEncapsulate Encapsulate = "crosssubnet"
  61. // AlwaysEncapsulate indicates that all packets within a location
  62. // should be encapsulated.
  63. AlwaysEncapsulate Encapsulate = "always"
  64. )
  65. // Node represents a node in the network.
  66. type Node struct {
  67. ExternalIP *net.IPNet
  68. Key []byte
  69. InternalIP *net.IPNet
  70. // Leader is a suggestion to Kilo that
  71. // the node wants to lead its segment.
  72. Leader bool
  73. Location string
  74. Name string
  75. Subnet *net.IPNet
  76. }
  77. // Ready indicates whether or not the node is ready.
  78. func (n *Node) Ready() bool {
  79. return n != nil && n.ExternalIP != nil && n.Key != nil && n.InternalIP != nil && n.Subnet != nil
  80. }
  81. // EventType describes what kind of an action an event represents.
  82. type EventType string
  83. const (
  84. // AddEvent represents an action where an item was added.
  85. AddEvent EventType = "add"
  86. // DeleteEvent represents an action where an item was removed.
  87. DeleteEvent EventType = "delete"
  88. // UpdateEvent represents an action where an item was updated.
  89. UpdateEvent EventType = "update"
  90. )
  91. // Event represents an update event concerning a node in the cluster.
  92. type Event struct {
  93. Type EventType
  94. Node *Node
  95. }
  96. // Backend can get nodes by name, init itself,
  97. // list the nodes that should be meshed,
  98. // set Kilo properties for a node,
  99. // clean up any changes applied to the backend,
  100. // and watch for changes to nodes.
  101. type Backend interface {
  102. CleanUp(string) error
  103. Get(string) (*Node, error)
  104. Init(<-chan struct{}) error
  105. List() ([]*Node, error)
  106. Set(string, *Node) error
  107. Watch() <-chan *Event
  108. }
  109. // Mesh is able to create Kilo network meshes.
  110. type Mesh struct {
  111. Backend
  112. encapsulate Encapsulate
  113. externalIP *net.IPNet
  114. granularity Granularity
  115. hostname string
  116. internalIP *net.IPNet
  117. ipset *ipset.Set
  118. ipTables *iptables.Controller
  119. kiloIface int
  120. key []byte
  121. local bool
  122. port int
  123. priv []byte
  124. privIface int
  125. pub []byte
  126. pubIface int
  127. stop chan struct{}
  128. subnet *net.IPNet
  129. table *route.Table
  130. tunlIface int
  131. // nodes is a mutable field in the struct
  132. // and needs to be guarded.
  133. nodes map[string]*Node
  134. mu sync.Mutex
  135. errorCounter *prometheus.CounterVec
  136. nodesGuage prometheus.Gauge
  137. logger log.Logger
  138. }
  139. // New returns a new Mesh instance.
  140. func New(backend Backend, encapsulate Encapsulate, granularity Granularity, hostname string, port int, subnet *net.IPNet, local bool, logger log.Logger) (*Mesh, error) {
  141. if err := os.MkdirAll(KiloPath, 0700); err != nil {
  142. return nil, fmt.Errorf("failed to create directory to store configuration: %v", err)
  143. }
  144. private, err := ioutil.ReadFile(PrivateKeyPath)
  145. if err != nil {
  146. level.Warn(logger).Log("msg", "no private key found on disk; generating one now")
  147. if private, err = wireguard.GenKey(); err != nil {
  148. return nil, err
  149. }
  150. }
  151. public, err := wireguard.PubKey(private)
  152. if err != nil {
  153. return nil, err
  154. }
  155. if err := ioutil.WriteFile(PrivateKeyPath, private, 0600); err != nil {
  156. return nil, fmt.Errorf("failed to write private key to disk: %v", err)
  157. }
  158. privateIP, publicIP, err := getIP(hostname)
  159. if err != nil {
  160. return nil, fmt.Errorf("failed to find public IP: %v", err)
  161. }
  162. ifaces, err := interfacesForIP(privateIP)
  163. if err != nil {
  164. return nil, fmt.Errorf("failed to find interface for private IP: %v", err)
  165. }
  166. privIface := ifaces[0].Index
  167. ifaces, err = interfacesForIP(publicIP)
  168. if err != nil {
  169. return nil, fmt.Errorf("failed to find interface for public IP: %v", err)
  170. }
  171. pubIface := ifaces[0].Index
  172. kiloIface, err := wireguard.New("kilo")
  173. if err != nil {
  174. return nil, fmt.Errorf("failed to create WireGuard interface: %v", err)
  175. }
  176. var tunlIface int
  177. if encapsulate != NeverEncapsulate {
  178. if tunlIface, err = iproute.NewIPIP(privIface); err != nil {
  179. return nil, fmt.Errorf("failed to create tunnel interface: %v", err)
  180. }
  181. if err := iproute.Set(tunlIface, true); err != nil {
  182. return nil, fmt.Errorf("failed to set tunnel interface up: %v", err)
  183. }
  184. }
  185. level.Debug(logger).Log("msg", fmt.Sprintf("using %s as the private IP address", privateIP.String()))
  186. level.Debug(logger).Log("msg", fmt.Sprintf("using %s as the public IP address", publicIP.String()))
  187. ipTables, err := iptables.New(len(subnet.IP))
  188. if err != nil {
  189. return nil, fmt.Errorf("failed to IP tables controller: %v", err)
  190. }
  191. return &Mesh{
  192. Backend: backend,
  193. encapsulate: encapsulate,
  194. externalIP: publicIP,
  195. granularity: granularity,
  196. hostname: hostname,
  197. internalIP: privateIP,
  198. // This is a patch until Calico supports
  199. // other hosts adding IPIP iptables rules.
  200. ipset: ipset.New("cali40all-hosts-net"),
  201. ipTables: ipTables,
  202. kiloIface: kiloIface,
  203. nodes: make(map[string]*Node),
  204. port: port,
  205. priv: private,
  206. privIface: privIface,
  207. pub: public,
  208. pubIface: pubIface,
  209. local: local,
  210. stop: make(chan struct{}),
  211. subnet: subnet,
  212. table: route.NewTable(),
  213. tunlIface: tunlIface,
  214. errorCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
  215. Name: "kilo_errors_total",
  216. Help: "Number of errors that occurred while administering the mesh.",
  217. }, []string{"event"}),
  218. nodesGuage: prometheus.NewGauge(prometheus.GaugeOpts{
  219. Name: "kilo_nodes",
  220. Help: "Number of in the mesh.",
  221. }),
  222. logger: logger,
  223. }, nil
  224. }
  225. // Run starts the mesh.
  226. func (m *Mesh) Run() error {
  227. if err := m.Init(m.stop); err != nil {
  228. return fmt.Errorf("failed to initialize backend: %v", err)
  229. }
  230. ipsetErrors, err := m.ipset.Run(m.stop)
  231. if err != nil {
  232. return fmt.Errorf("failed to watch for ipset updates: %v", err)
  233. }
  234. ipTablesErrors, err := m.ipTables.Run(m.stop)
  235. if err != nil {
  236. return fmt.Errorf("failed to watch for IP tables updates: %v", err)
  237. }
  238. routeErrors, err := m.table.Run(m.stop)
  239. if err != nil {
  240. return fmt.Errorf("failed to watch for route table updates: %v", err)
  241. }
  242. go func() {
  243. for {
  244. var err error
  245. select {
  246. case err = <-ipsetErrors:
  247. case err = <-ipTablesErrors:
  248. case err = <-routeErrors:
  249. case <-m.stop:
  250. return
  251. }
  252. if err != nil {
  253. level.Error(m.logger).Log("error", err)
  254. m.errorCounter.WithLabelValues("run").Inc()
  255. }
  256. }
  257. }()
  258. defer m.cleanUp()
  259. t := time.NewTimer(resyncPeriod)
  260. w := m.Watch()
  261. for {
  262. var e *Event
  263. select {
  264. case e = <-w:
  265. m.sync(e)
  266. case <-t.C:
  267. m.applyTopology()
  268. t.Reset(resyncPeriod)
  269. case <-m.stop:
  270. return nil
  271. }
  272. }
  273. }
  274. func (m *Mesh) sync(e *Event) {
  275. logger := log.With(m.logger, "event", e.Type)
  276. level.Debug(logger).Log("msg", "syncing", "event", e.Type)
  277. if isSelf(m.hostname, e.Node) {
  278. level.Debug(logger).Log("msg", "processing local node", "node", e.Node)
  279. m.handleLocal(e.Node)
  280. return
  281. }
  282. var diff bool
  283. m.mu.Lock()
  284. if !e.Node.Ready() {
  285. level.Debug(logger).Log("msg", "received incomplete node", "node", e.Node)
  286. // An existing node is no longer valid
  287. // so remove it from the mesh.
  288. if _, ok := m.nodes[e.Node.Name]; ok {
  289. level.Info(logger).Log("msg", "node is no longer in the mesh", "node", e.Node)
  290. delete(m.nodes, e.Node.Name)
  291. diff = true
  292. }
  293. } else {
  294. switch e.Type {
  295. case AddEvent:
  296. fallthrough
  297. case UpdateEvent:
  298. if !nodesAreEqual(m.nodes[e.Node.Name], e.Node) {
  299. m.nodes[e.Node.Name] = e.Node
  300. diff = true
  301. }
  302. case DeleteEvent:
  303. delete(m.nodes, e.Node.Name)
  304. diff = true
  305. }
  306. }
  307. m.mu.Unlock()
  308. if diff {
  309. level.Info(logger).Log("node", e.Node)
  310. m.applyTopology()
  311. }
  312. }
  313. func (m *Mesh) handleLocal(n *Node) {
  314. // Allow the external IP to be overridden.
  315. if n.ExternalIP == nil {
  316. n.ExternalIP = m.externalIP
  317. }
  318. // Compare the given node to the calculated local node.
  319. // Take leader, location, and subnet from the argument, as these
  320. // are not determined by kilo.
  321. local := &Node{ExternalIP: n.ExternalIP, Key: m.pub, InternalIP: m.internalIP, Leader: n.Leader, Location: n.Location, Name: m.hostname, Subnet: n.Subnet}
  322. if !nodesAreEqual(n, local) {
  323. level.Debug(m.logger).Log("msg", "local node differs from backend")
  324. if err := m.Set(m.hostname, local); err != nil {
  325. level.Error(m.logger).Log("error", fmt.Sprintf("failed to set local node: %v", err), "node", local)
  326. m.errorCounter.WithLabelValues("local").Inc()
  327. return
  328. }
  329. level.Debug(m.logger).Log("msg", "successfully reconciled local node against backend")
  330. }
  331. m.mu.Lock()
  332. n = m.nodes[m.hostname]
  333. if n == nil {
  334. n = &Node{}
  335. }
  336. m.mu.Unlock()
  337. if !nodesAreEqual(n, local) {
  338. m.mu.Lock()
  339. m.nodes[local.Name] = local
  340. m.mu.Unlock()
  341. m.applyTopology()
  342. }
  343. }
  344. func (m *Mesh) applyTopology() {
  345. m.mu.Lock()
  346. defer m.mu.Unlock()
  347. // Ensure all unready nodes are removed.
  348. var ready float64
  349. for n := range m.nodes {
  350. if !m.nodes[n].Ready() {
  351. delete(m.nodes, n)
  352. continue
  353. }
  354. ready++
  355. }
  356. m.nodesGuage.Set(ready)
  357. // We cannot do anything with the topology until the local node is available.
  358. if m.nodes[m.hostname] == nil {
  359. return
  360. }
  361. t, err := NewTopology(m.nodes, m.granularity, m.hostname, m.port, m.priv, m.subnet)
  362. if err != nil {
  363. level.Error(m.logger).Log("error", err)
  364. m.errorCounter.WithLabelValues("apply").Inc()
  365. return
  366. }
  367. conf, err := t.Conf()
  368. if err != nil {
  369. level.Error(m.logger).Log("error", err)
  370. m.errorCounter.WithLabelValues("apply").Inc()
  371. }
  372. if err := ioutil.WriteFile(ConfPath, conf, 0600); err != nil {
  373. level.Error(m.logger).Log("error", err)
  374. m.errorCounter.WithLabelValues("apply").Inc()
  375. return
  376. }
  377. var private *net.IPNet
  378. // If we are not encapsulating packets to the local private network,
  379. // then pass the private IP to add an exception to the NAT rule.
  380. if m.encapsulate != AlwaysEncapsulate {
  381. private = t.privateIP
  382. }
  383. rules := iptables.MasqueradeRules(private, m.nodes[m.hostname].Subnet, t.RemoteSubnets())
  384. rules = append(rules, iptables.ForwardRules(m.subnet)...)
  385. if err := m.ipTables.Set(rules); err != nil {
  386. level.Error(m.logger).Log("error", err)
  387. m.errorCounter.WithLabelValues("apply").Inc()
  388. return
  389. }
  390. if m.encapsulate != NeverEncapsulate {
  391. var peers []net.IP
  392. for _, s := range t.Segments {
  393. if s.Location == m.nodes[m.hostname].Location {
  394. peers = s.privateIPs
  395. break
  396. }
  397. }
  398. if err := m.ipset.Set(peers); err != nil {
  399. level.Error(m.logger).Log("error", err)
  400. m.errorCounter.WithLabelValues("apply").Inc()
  401. return
  402. }
  403. if m.local {
  404. if err := iproute.SetAddress(m.tunlIface, oneAddressCIDR(newAllocator(*m.nodes[m.hostname].Subnet).next().IP)); err != nil {
  405. level.Error(m.logger).Log("error", err)
  406. m.errorCounter.WithLabelValues("apply").Inc()
  407. return
  408. }
  409. }
  410. }
  411. if t.leader {
  412. if err := iproute.SetAddress(m.kiloIface, t.wireGuardCIDR); err != nil {
  413. level.Error(m.logger).Log("error", err)
  414. m.errorCounter.WithLabelValues("apply").Inc()
  415. return
  416. }
  417. link, err := linkByIndex(m.kiloIface)
  418. if err != nil {
  419. level.Error(m.logger).Log("error", err)
  420. m.errorCounter.WithLabelValues("apply").Inc()
  421. return
  422. }
  423. oldConf, err := wireguard.ShowConf(link.Attrs().Name)
  424. if err != nil {
  425. level.Error(m.logger).Log("error", err)
  426. m.errorCounter.WithLabelValues("apply").Inc()
  427. return
  428. }
  429. // Setting the WireGuard configuration interrupts existing connections
  430. // so only set the configuration if it has changed.
  431. equal, err := wireguard.CompareConf(conf, oldConf)
  432. if err != nil {
  433. level.Error(m.logger).Log("error", err)
  434. m.errorCounter.WithLabelValues("apply").Inc()
  435. // Don't return here, simply overwrite the old configuration.
  436. equal = false
  437. }
  438. if !equal {
  439. if err := wireguard.SetConf(link.Attrs().Name, ConfPath); err != nil {
  440. level.Error(m.logger).Log("error", err)
  441. m.errorCounter.WithLabelValues("apply").Inc()
  442. return
  443. }
  444. }
  445. if err := iproute.Set(m.kiloIface, true); err != nil {
  446. level.Error(m.logger).Log("error", err)
  447. m.errorCounter.WithLabelValues("apply").Inc()
  448. return
  449. }
  450. } else {
  451. level.Debug(m.logger).Log("msg", "local node is not the leader")
  452. if err := iproute.Set(m.kiloIface, false); err != nil {
  453. level.Error(m.logger).Log("error", err)
  454. m.errorCounter.WithLabelValues("apply").Inc()
  455. return
  456. }
  457. }
  458. // We need to add routes last since they may depend
  459. // on the WireGuard interface.
  460. routes := t.Routes(m.kiloIface, m.privIface, m.tunlIface, m.local, m.encapsulate)
  461. if err := m.table.Set(routes); err != nil {
  462. level.Error(m.logger).Log("error", err)
  463. m.errorCounter.WithLabelValues("apply").Inc()
  464. }
  465. }
  466. // RegisterMetrics registers Prometheus metrics on the given Prometheus
  467. // registerer.
  468. func (m *Mesh) RegisterMetrics(r prometheus.Registerer) {
  469. r.MustRegister(
  470. m.errorCounter,
  471. m.nodesGuage,
  472. )
  473. }
  474. // Stop stops the mesh.
  475. func (m *Mesh) Stop() {
  476. close(m.stop)
  477. }
  478. func (m *Mesh) cleanUp() {
  479. if err := m.ipTables.CleanUp(); err != nil {
  480. level.Error(m.logger).Log("error", fmt.Sprintf("failed to clean up IP tables: %v", err))
  481. m.errorCounter.WithLabelValues("cleanUp").Inc()
  482. }
  483. if err := m.table.CleanUp(); err != nil {
  484. level.Error(m.logger).Log("error", fmt.Sprintf("failed to clean up routes: %v", err))
  485. m.errorCounter.WithLabelValues("cleanUp").Inc()
  486. }
  487. if err := os.Remove(PrivateKeyPath); err != nil {
  488. level.Error(m.logger).Log("error", fmt.Sprintf("failed to delete private key: %v", err))
  489. m.errorCounter.WithLabelValues("cleanUp").Inc()
  490. }
  491. if err := os.Remove(ConfPath); err != nil {
  492. level.Error(m.logger).Log("error", fmt.Sprintf("failed to delete configuration file: %v", err))
  493. m.errorCounter.WithLabelValues("cleanUp").Inc()
  494. }
  495. if err := iproute.RemoveInterface(m.kiloIface); err != nil {
  496. level.Error(m.logger).Log("error", fmt.Sprintf("failed to remove wireguard interface: %v", err))
  497. m.errorCounter.WithLabelValues("cleanUp").Inc()
  498. }
  499. if err := m.CleanUp(m.hostname); err != nil {
  500. level.Error(m.logger).Log("error", fmt.Sprintf("failed to clean up backend: %v", err))
  501. m.errorCounter.WithLabelValues("cleanUp").Inc()
  502. }
  503. if err := m.ipset.CleanUp(); err != nil {
  504. level.Error(m.logger).Log("error", fmt.Sprintf("failed to clean up ipset: %v", err))
  505. m.errorCounter.WithLabelValues("cleanUp").Inc()
  506. }
  507. }
  508. func isSelf(hostname string, node *Node) bool {
  509. return node != nil && node.Name == hostname
  510. }
  511. func nodesAreEqual(a, b *Node) bool {
  512. if !(a != nil) == (b != nil) {
  513. return false
  514. }
  515. if a == b {
  516. return true
  517. }
  518. return ipNetsEqual(a.ExternalIP, b.ExternalIP) && string(a.Key) == string(b.Key) && ipNetsEqual(a.InternalIP, b.InternalIP) && a.Leader == b.Leader && a.Location == b.Location && a.Name == b.Name && subnetsEqual(a.Subnet, b.Subnet)
  519. }
  520. func ipNetsEqual(a, b *net.IPNet) bool {
  521. if a == nil && b == nil {
  522. return true
  523. }
  524. if (a != nil) != (b != nil) {
  525. return false
  526. }
  527. if a.Mask.String() != b.Mask.String() {
  528. return false
  529. }
  530. return a.IP.Equal(b.IP)
  531. }
  532. func subnetsEqual(a, b *net.IPNet) bool {
  533. if a.Mask.String() != b.Mask.String() {
  534. return false
  535. }
  536. if !a.Contains(b.IP) {
  537. return false
  538. }
  539. if !b.Contains(a.IP) {
  540. return false
  541. }
  542. return true
  543. }
  544. func linkByIndex(index int) (netlink.Link, error) {
  545. link, err := netlink.LinkByIndex(index)
  546. if err != nil {
  547. return nil, fmt.Errorf("failed to get interface: %v", err)
  548. }
  549. return link, nil
  550. }