calico 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. // Copyright 2019 the Kilo authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package calico
  15. import (
  16. "errors"
  17. "net"
  18. "time"
  19. "k8s.io/apimachinery/pkg/labels"
  20. v1informers "k8s.io/client-go/informers/core/v1"
  21. "k8s.io/client-go/kubernetes"
  22. v1listers "k8s.io/client-go/listers/core/v1"
  23. "k8s.io/client-go/tools/cache"
  24. "github.com/squat/kilo/pkg/ipset"
  25. "github.com/squat/kilo/pkg/mesh"
  26. )
  27. type Compatibility interface {
  28. Apply(*mesh.Topology, mesh.Encapsulate) error
  29. Backend(mesh.Backend) mesh.Backend
  30. CleanUp() error
  31. Run(stop <-chan struct{}) (<-chan error, error)
  32. }
  33. // calico is a Calico compatibility layer.
  34. type calico struct {
  35. client kubernetes.Interface
  36. errors chan error
  37. ipset *ipset.Set
  38. }
  39. // New generates a new ipset.
  40. func New(c kubernetes.Interface) Compatibility {
  41. return &calico{
  42. client: c,
  43. errors: make(chan error),
  44. // This is a patch until Calico supports
  45. // other hosts adding IPIP iptables rules.
  46. ipset: ipset.New("cali40all-hosts-net"),
  47. }
  48. }
  49. // Run implements the mesh.Compatibility interface.
  50. // It runs the ipset controller and forwards errors along.
  51. func (c *calico) Run(stop <-chan struct{}) (<-chan error, error) {
  52. return c.ipset.Run(stop)
  53. }
  54. // CleanUp stops the compatibility layer's controllers.
  55. func (c *calico) CleanUp() error {
  56. return c.ipset.CleanUp()
  57. }
  58. type backend struct {
  59. backend mesh.Backend
  60. client kubernetes.Interface
  61. events chan *mesh.NodeEvent
  62. informer cache.SharedIndexInformer
  63. lister v1listers.NodeLister
  64. }
  65. func (c *calico) Apply(t *mesh.Topology, encapsulate mesh.Encapsulate, location string) error {
  66. if encapsulate == mesh.NeverEncapsulate {
  67. return nil
  68. }
  69. var peers []net.IP
  70. for _, s := range t.segments {
  71. if s.location == location {
  72. peers = s.privateIPs
  73. break
  74. }
  75. }
  76. return c.ipset.Set(peers)
  77. }
  78. func (c *calico) Backend(b mesh.Backend) mesh.Backend {
  79. ni := v1informers.NewNodeInformer(c.client, 5*time.Minute, nil)
  80. return &backend{
  81. backend: b,
  82. events: make(chan *mesh.NodeEvent),
  83. informer: ni,
  84. lister: v1listers.NewNodeLister(ni.GetIndexer()),
  85. }
  86. }
  87. // Nodes implements the mesh.Backend interface.
  88. func (b *backend) Nodes() mesh.NodeBackend {
  89. return b
  90. }
  91. // Peers implements the mesh.Backend interface.
  92. func (b *backend) Peers() mesh.PeerBackend {
  93. // The Calico compatibility backend only wraps the node backend.
  94. return b.backend.Peers()
  95. }
  96. // CleanUp removes configuration applied to the backend.
  97. func (b *backend) CleanUp(name string) error {
  98. return b.backend.Nodes().CleanUp(name)
  99. }
  100. // Get gets a single Node by name.
  101. func (b *backend) Get(name string) (*mesh.Node, error) {
  102. n, err := b.lister.Get(name)
  103. if err != nil {
  104. return nil, err
  105. }
  106. m, err := b.backend.Nodes().Get(name)
  107. if err != nil {
  108. return nil, err
  109. }
  110. return translateNode(n, m), nil
  111. }
  112. // Init initializes the backend; for this backend that means
  113. // syncing the informer cache and the wrapped backend.
  114. func (b *backend) Init(stop <-chan struct{}) error {
  115. if err := b.backend.Nodes().Init(stop); err != nil {
  116. return err
  117. }
  118. go b.informer.Run(stop)
  119. if ok := cache.WaitForCacheSync(stop, func() bool {
  120. return b.informer.HasSynced()
  121. }); !ok {
  122. return errors.New("failed to sync node cache")
  123. }
  124. go func() {
  125. w := b.backend.Nodes().Watch()
  126. var ne *mesh.NodeEvent
  127. for {
  128. select {
  129. case ne = <-w:
  130. b.events <- &mesh.NodeEvent{Type: ne.Type, Node: translateNode(n, ne.Node)}
  131. case <-stop:
  132. return
  133. }
  134. }
  135. }()
  136. return nil
  137. }
  138. // List gets all the Nodes in the cluster.
  139. func (b *backend) List() ([]*mesh.Node, error) {
  140. ns, err := b.lister.List(labels.Everything())
  141. if err != nil {
  142. return nil, err
  143. }
  144. nodes := make([]*mesh.Node, len(ns))
  145. for i := range ns {
  146. nodes[i] = translateNode(ns[i])
  147. }
  148. return nodes, nil
  149. }
  150. // Set sets the fields of a node.
  151. func (b *backend) Set(name string, node *mesh.Node) error {
  152. // The Calico compatibility backend is read-only.
  153. // Proxy all writes to the underlying backend.
  154. return b.backend.Nodes().Set(name, node)
  155. }
  156. // Watch returns a chan of node events.
  157. func (b *backend) Watch() <-chan *mesh.NodeEvent {
  158. return b.events
  159. }