2
0

backend.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. // Copyright 2019 the Kilo authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package k8s
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "net"
  20. "path"
  21. "strings"
  22. "time"
  23. "k8s.io/api/core/v1"
  24. "k8s.io/apimachinery/pkg/labels"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/strategicpatch"
  27. v1informers "k8s.io/client-go/informers/core/v1"
  28. "k8s.io/client-go/kubernetes"
  29. v1listers "k8s.io/client-go/listers/core/v1"
  30. "k8s.io/client-go/tools/cache"
  31. "github.com/squat/kilo/pkg/mesh"
  32. )
  33. const (
  34. // Backend is the name of this mesh backend.
  35. Backend = "kubernetes"
  36. externalIPAnnotationKey = "kilo.squat.ai/external-ip"
  37. forceExternalIPAnnotationKey = "kilo.squat.ai/force-external-ip"
  38. internalIPAnnotationKey = "kilo.squat.ai/internal-ip"
  39. keyAnnotationKey = "kilo.squat.ai/key"
  40. leaderAnnotationKey = "kilo.squat.ai/leader"
  41. locationAnnotationKey = "kilo.squat.ai/location"
  42. regionLabelKey = "failure-domain.beta.kubernetes.io/region"
  43. jsonPatchSlash = "~1"
  44. jsonRemovePatch = `{"op": "remove", "path": "%s"}`
  45. )
  46. type backend struct {
  47. client kubernetes.Interface
  48. events chan *mesh.Event
  49. informer cache.SharedIndexInformer
  50. lister v1listers.NodeLister
  51. }
  52. // New creates a new instance of a mesh.Backend.
  53. func New(client kubernetes.Interface) mesh.Backend {
  54. informer := v1informers.NewNodeInformer(client, 5*time.Minute, nil)
  55. b := &backend{
  56. client: client,
  57. events: make(chan *mesh.Event),
  58. informer: informer,
  59. lister: v1listers.NewNodeLister(informer.GetIndexer()),
  60. }
  61. return b
  62. }
  63. // CleanUp removes configuration applied to the backend.
  64. func (b *backend) CleanUp(name string) error {
  65. patch := []byte("[" + strings.Join([]string{
  66. fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(externalIPAnnotationKey, "/", jsonPatchSlash, 1))),
  67. fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(internalIPAnnotationKey, "/", jsonPatchSlash, 1))),
  68. fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(keyAnnotationKey, "/", jsonPatchSlash, 1))),
  69. }, ",") + "]")
  70. if _, err := b.client.CoreV1().Nodes().Patch(name, types.JSONPatchType, patch); err != nil {
  71. return fmt.Errorf("failed to patch node: %v", err)
  72. }
  73. return nil
  74. }
  75. // Get gets a single Node by name.
  76. func (b *backend) Get(name string) (*mesh.Node, error) {
  77. n, err := b.lister.Get(name)
  78. if err != nil {
  79. return nil, err
  80. }
  81. return translateNode(n), nil
  82. }
  83. // Init initializes the backend; for this backend that means
  84. // syncing the informer cache.
  85. func (b *backend) Init(stop <-chan struct{}) error {
  86. go b.informer.Run(stop)
  87. if ok := cache.WaitForCacheSync(stop, func() bool {
  88. return b.informer.HasSynced()
  89. }); !ok {
  90. return errors.New("failed to start sync node cache")
  91. }
  92. b.informer.AddEventHandler(
  93. cache.ResourceEventHandlerFuncs{
  94. AddFunc: func(obj interface{}) {
  95. n, ok := obj.(*v1.Node)
  96. if !ok {
  97. // Failed to decode Node; ignoring...
  98. return
  99. }
  100. b.events <- &mesh.Event{Type: mesh.AddEvent, Node: translateNode(n)}
  101. },
  102. UpdateFunc: func(_, obj interface{}) {
  103. n, ok := obj.(*v1.Node)
  104. if !ok {
  105. // Failed to decode Node; ignoring...
  106. return
  107. }
  108. b.events <- &mesh.Event{Type: mesh.UpdateEvent, Node: translateNode(n)}
  109. },
  110. DeleteFunc: func(obj interface{}) {
  111. n, ok := obj.(*v1.Node)
  112. if !ok {
  113. // Failed to decode Node; ignoring...
  114. return
  115. }
  116. b.events <- &mesh.Event{Type: mesh.DeleteEvent, Node: translateNode(n)}
  117. },
  118. },
  119. )
  120. return nil
  121. }
  122. // List gets all the Nodes in the cluster.
  123. func (b *backend) List() ([]*mesh.Node, error) {
  124. ns, err := b.lister.List(labels.Everything())
  125. if err != nil {
  126. return nil, err
  127. }
  128. nodes := make([]*mesh.Node, len(ns))
  129. for i := range ns {
  130. nodes[i] = translateNode(ns[i])
  131. }
  132. return nodes, nil
  133. }
  134. // Set sets the fields of a node.
  135. func (b *backend) Set(name string, node *mesh.Node) error {
  136. old, err := b.lister.Get(name)
  137. if err != nil {
  138. return fmt.Errorf("failed to find node: %v", err)
  139. }
  140. n := old.DeepCopy()
  141. n.ObjectMeta.Annotations[externalIPAnnotationKey] = node.ExternalIP.String()
  142. n.ObjectMeta.Annotations[internalIPAnnotationKey] = node.InternalIP.String()
  143. n.ObjectMeta.Annotations[keyAnnotationKey] = string(node.Key)
  144. oldData, err := json.Marshal(old)
  145. if err != nil {
  146. return err
  147. }
  148. newData, err := json.Marshal(n)
  149. if err != nil {
  150. return err
  151. }
  152. patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
  153. if err != nil {
  154. return fmt.Errorf("failed to create patch for node %q: %v", n.Name, err)
  155. }
  156. if _, err = b.client.CoreV1().Nodes().Patch(name, types.StrategicMergePatchType, patch); err != nil {
  157. return fmt.Errorf("failed to patch node: %v", err)
  158. }
  159. return nil
  160. }
  161. // Watch returns a chan of node events.
  162. func (b *backend) Watch() <-chan *mesh.Event {
  163. return b.events
  164. }
  165. // translateNode translates a Kubernetes Node to a mesh.Node.
  166. func translateNode(node *v1.Node) *mesh.Node {
  167. if node == nil {
  168. return nil
  169. }
  170. _, subnet, err := net.ParseCIDR(node.Spec.PodCIDR)
  171. // The subnet should only ever fail to parse if the pod CIDR has not been set,
  172. // so in this case set the subnet to nil and let the node be updated.
  173. if err != nil {
  174. subnet = nil
  175. }
  176. _, leader := node.ObjectMeta.Annotations[leaderAnnotationKey]
  177. // Allow the region to be overridden by an explicit location.
  178. location, ok := node.ObjectMeta.Annotations[locationAnnotationKey]
  179. if !ok {
  180. location = node.ObjectMeta.Labels[regionLabelKey]
  181. }
  182. // Allow the external IP to be overridden.
  183. externalIP, ok := node.ObjectMeta.Annotations[forceExternalIPAnnotationKey]
  184. if !ok {
  185. externalIP = node.ObjectMeta.Annotations[externalIPAnnotationKey]
  186. }
  187. return &mesh.Node{
  188. // ExternalIP and InternalIP should only ever fail to parse if the
  189. // remote node's mesh has not yet set its IP address;
  190. // in this case the IP will be nil and
  191. // the mesh can wait for the node to be updated.
  192. ExternalIP: normalizeIP(externalIP),
  193. InternalIP: normalizeIP(node.ObjectMeta.Annotations[internalIPAnnotationKey]),
  194. Key: []byte(node.ObjectMeta.Annotations[keyAnnotationKey]),
  195. Leader: leader,
  196. Location: location,
  197. Name: node.Name,
  198. Subnet: subnet,
  199. }
  200. }
  201. func normalizeIP(ip string) *net.IPNet {
  202. i, ipNet, _ := net.ParseCIDR(ip)
  203. if ipNet == nil {
  204. return ipNet
  205. }
  206. if ip4 := i.To4(); ip4 != nil {
  207. ipNet.IP = ip4
  208. return ipNet
  209. }
  210. ipNet.IP = i.To16()
  211. return ipNet
  212. }