| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567 |
- // Copyright 2019 the Kilo authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package k8s
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "net"
- "path"
- "strconv"
- "strings"
- "time"
- "github.com/go-kit/kit/log"
- "github.com/go-kit/kit/log/level"
- "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
- v1 "k8s.io/api/core/v1"
- apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/strategicpatch"
- v1informers "k8s.io/client-go/informers/core/v1"
- "k8s.io/client-go/kubernetes"
- v1listers "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "github.com/squat/kilo/pkg/k8s/apis/kilo/v1alpha1"
- kiloclient "github.com/squat/kilo/pkg/k8s/clientset/versioned"
- v1alpha1informers "github.com/squat/kilo/pkg/k8s/informers/kilo/v1alpha1"
- v1alpha1listers "github.com/squat/kilo/pkg/k8s/listers/kilo/v1alpha1"
- "github.com/squat/kilo/pkg/mesh"
- "github.com/squat/kilo/pkg/wireguard"
- )
- const (
- // Backend is the name of this mesh backend.
- Backend = "kubernetes"
- endpointAnnotationKey = "kilo.squat.ai/endpoint"
- forceEndpointAnnotationKey = "kilo.squat.ai/force-endpoint"
- forceInternalIPAnnotationKey = "kilo.squat.ai/force-internal-ip"
- internalIPAnnotationKey = "kilo.squat.ai/internal-ip"
- keyAnnotationKey = "kilo.squat.ai/key"
- lastSeenAnnotationKey = "kilo.squat.ai/last-seen"
- leaderAnnotationKey = "kilo.squat.ai/leader"
- locationAnnotationKey = "kilo.squat.ai/location"
- persistentKeepaliveKey = "kilo.squat.ai/persistent-keepalive"
- wireGuardIPAnnotationKey = "kilo.squat.ai/wireguard-ip"
- discoveredEndpointsKey = "kilo.squat.ai/discovered-endpoints"
- allowedLocationIPsKey = "kilo.squat.ai/allowed-location-ips"
- granularityKey = "kilo.squat.ai/granularity"
- // RegionLabelKey is the key for the well-known Kubernetes topology region label.
- RegionLabelKey = "topology.kubernetes.io/region"
- jsonPatchSlash = "~1"
- jsonRemovePatch = `{"op": "remove", "path": "%s"}`
- )
- var logger = log.NewNopLogger()
- type backend struct {
- nodes *nodeBackend
- peers *peerBackend
- }
- // Nodes implements the mesh.Backend interface.
- func (b *backend) Nodes() mesh.NodeBackend {
- return b.nodes
- }
- // Peers implements the mesh.Backend interface.
- func (b *backend) Peers() mesh.PeerBackend {
- return b.peers
- }
- type nodeBackend struct {
- client kubernetes.Interface
- events chan *mesh.NodeEvent
- informer cache.SharedIndexInformer
- lister v1listers.NodeLister
- topologyLabel string
- }
- type peerBackend struct {
- client kiloclient.Interface
- extensionsClient apiextensions.Interface
- events chan *mesh.PeerEvent
- informer cache.SharedIndexInformer
- lister v1alpha1listers.PeerLister
- }
- // New creates a new instance of a mesh.Backend.
- func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Interface, topologyLabel string, l log.Logger) mesh.Backend {
- ni := v1informers.NewNodeInformer(c, 5*time.Minute, nil)
- pi := v1alpha1informers.NewPeerInformer(kc, 5*time.Minute, nil)
- logger = l
- return &backend{
- &nodeBackend{
- client: c,
- events: make(chan *mesh.NodeEvent),
- informer: ni,
- lister: v1listers.NewNodeLister(ni.GetIndexer()),
- topologyLabel: topologyLabel,
- },
- &peerBackend{
- client: kc,
- extensionsClient: ec,
- events: make(chan *mesh.PeerEvent),
- informer: pi,
- lister: v1alpha1listers.NewPeerLister(pi.GetIndexer()),
- },
- }
- }
- // CleanUp removes configuration applied to the backend.
- func (nb *nodeBackend) CleanUp(ctx context.Context, name string) error {
- patch := []byte("[" + strings.Join([]string{
- fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(endpointAnnotationKey, "/", jsonPatchSlash, 1))),
- fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(internalIPAnnotationKey, "/", jsonPatchSlash, 1))),
- fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(keyAnnotationKey, "/", jsonPatchSlash, 1))),
- fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(lastSeenAnnotationKey, "/", jsonPatchSlash, 1))),
- fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(wireGuardIPAnnotationKey, "/", jsonPatchSlash, 1))),
- fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(discoveredEndpointsKey, "/", jsonPatchSlash, 1))),
- fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(granularityKey, "/", jsonPatchSlash, 1))),
- }, ",") + "]")
- if _, err := nb.client.CoreV1().Nodes().Patch(ctx, name, types.JSONPatchType, patch, metav1.PatchOptions{}); err != nil {
- return fmt.Errorf("failed to patch node: %v", err)
- }
- return nil
- }
- // Get gets a single Node by name.
- func (nb *nodeBackend) Get(name string) (*mesh.Node, error) {
- n, err := nb.lister.Get(name)
- if err != nil {
- return nil, err
- }
- return translateNode(n, nb.topologyLabel), nil
- }
- // Init initializes the backend; for this backend that means
- // syncing the informer cache.
- func (nb *nodeBackend) Init(ctx context.Context) error {
- go nb.informer.Run(ctx.Done())
- if ok := cache.WaitForCacheSync(ctx.Done(), func() bool {
- return nb.informer.HasSynced()
- }); !ok {
- return errors.New("failed to sync node cache")
- }
- _, err := nb.informer.AddEventHandler(
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- n, ok := obj.(*v1.Node)
- if !ok {
- // Failed to decode Node; ignoring...
- return
- }
- nb.events <- &mesh.NodeEvent{Type: mesh.AddEvent, Node: translateNode(n, nb.topologyLabel)}
- },
- UpdateFunc: func(old, obj interface{}) {
- n, ok := obj.(*v1.Node)
- if !ok {
- // Failed to decode Node; ignoring...
- return
- }
- o, ok := old.(*v1.Node)
- if !ok {
- // Failed to decode Node; ignoring...
- return
- }
- nb.events <- &mesh.NodeEvent{Type: mesh.UpdateEvent, Node: translateNode(n, nb.topologyLabel), Old: translateNode(o, nb.topologyLabel)}
- },
- DeleteFunc: func(obj interface{}) {
- n, ok := obj.(*v1.Node)
- if !ok {
- // Failed to decode Node; ignoring...
- return
- }
- nb.events <- &mesh.NodeEvent{Type: mesh.DeleteEvent, Node: translateNode(n, nb.topologyLabel)}
- },
- },
- )
- return err
- }
- // List gets all the Nodes in the cluster.
- func (nb *nodeBackend) List() ([]*mesh.Node, error) {
- ns, err := nb.lister.List(labels.Everything())
- if err != nil {
- return nil, err
- }
- nodes := make([]*mesh.Node, len(ns))
- for i := range ns {
- nodes[i] = translateNode(ns[i], nb.topologyLabel)
- }
- return nodes, nil
- }
- // Set sets the fields of a node.
- func (nb *nodeBackend) Set(ctx context.Context, name string, node *mesh.Node) error {
- old, err := nb.lister.Get(name)
- if err != nil {
- return fmt.Errorf("failed to find node: %v", err)
- }
- n := old.DeepCopy()
- n.Annotations[endpointAnnotationKey] = node.Endpoint.String()
- if node.InternalIP == nil {
- n.Annotations[internalIPAnnotationKey] = ""
- } else {
- n.Annotations[internalIPAnnotationKey] = node.InternalIP.String()
- }
- n.Annotations[keyAnnotationKey] = node.Key.String()
- n.Annotations[lastSeenAnnotationKey] = strconv.FormatInt(node.LastSeen, 10)
- if node.WireGuardIP == nil {
- n.Annotations[wireGuardIPAnnotationKey] = ""
- } else {
- n.Annotations[wireGuardIPAnnotationKey] = node.WireGuardIP.String()
- }
- if node.DiscoveredEndpoints == nil {
- n.Annotations[discoveredEndpointsKey] = ""
- } else {
- discoveredEndpoints, err := json.Marshal(node.DiscoveredEndpoints)
- if err != nil {
- return err
- }
- n.Annotations[discoveredEndpointsKey] = string(discoveredEndpoints)
- }
- n.Annotations[granularityKey] = string(node.Granularity)
- oldData, err := json.Marshal(old)
- if err != nil {
- return err
- }
- newData, err := json.Marshal(n)
- if err != nil {
- return err
- }
- patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
- if err != nil {
- return fmt.Errorf("failed to create patch for node %q: %v", n.Name, err)
- }
- if _, err = nb.client.CoreV1().Nodes().Patch(ctx, name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}); err != nil {
- return fmt.Errorf("failed to patch node: %v", err)
- }
- return nil
- }
- // Watch returns a chan of node events.
- func (nb *nodeBackend) Watch() <-chan *mesh.NodeEvent {
- return nb.events
- }
- // translateNode translates a Kubernetes Node to a mesh.Node.
- func translateNode(node *v1.Node, topologyLabel string) *mesh.Node {
- if node == nil {
- return nil
- }
- _, subnet, err := net.ParseCIDR(node.Spec.PodCIDR)
- // The subnet should only ever fail to parse if the pod CIDR has not been set,
- // so in this case set the subnet to nil and let the node be updated.
- if err != nil {
- subnet = nil
- }
- _, leader := node.Annotations[leaderAnnotationKey]
- // Allow the region to be overridden by an explicit location.
- location, ok := node.Annotations[locationAnnotationKey]
- if !ok {
- location = node.Labels[topologyLabel]
- }
- // Allow the endpoint to be overridden.
- endpoint := wireguard.ParseEndpoint(node.Annotations[forceEndpointAnnotationKey])
- if endpoint == nil {
- endpoint = wireguard.ParseEndpoint(node.Annotations[endpointAnnotationKey])
- }
- // Allow the internal IP to be overridden.
- internalIP := normalizeIP(node.Annotations[forceInternalIPAnnotationKey])
- if internalIP == nil {
- internalIP = normalizeIP(node.Annotations[internalIPAnnotationKey])
- }
- // Set the ForceInternalIP flag, if force-internal-ip annotation was set to "".
- noInternalIP := false
- if s, ok := node.Annotations[forceInternalIPAnnotationKey]; ok && (s == "" || s == "-") {
- noInternalIP = true
- internalIP = nil
- }
- // Set Wireguard PersistentKeepalive setting for the node.
- var persistentKeepalive time.Duration
- if keepAlive, ok := node.Annotations[persistentKeepaliveKey]; ok {
- // We can ignore the error, because p will be set to 0 if an error occures.
- p, _ := strconv.ParseInt(keepAlive, 10, 64)
- persistentKeepalive = time.Duration(p) * time.Second
- }
- var lastSeen int64
- if ls, ok := node.Annotations[lastSeenAnnotationKey]; !ok {
- lastSeen = 0
- } else {
- if lastSeen, err = strconv.ParseInt(ls, 10, 64); err != nil {
- lastSeen = 0
- }
- }
- var discoveredEndpoints map[string]*net.UDPAddr
- if de, ok := node.Annotations[discoveredEndpointsKey]; ok {
- err := json.Unmarshal([]byte(de), &discoveredEndpoints)
- if err != nil {
- discoveredEndpoints = nil
- }
- }
- // Set allowed IPs for a location.
- var allowedLocationIPs []net.IPNet
- if str, ok := node.Annotations[allowedLocationIPsKey]; ok {
- for _, ip := range strings.Split(str, ",") {
- if ipnet := normalizeIP(ip); ipnet != nil {
- allowedLocationIPs = append(allowedLocationIPs, *ipnet)
- }
- }
- }
- var meshGranularity mesh.Granularity
- if gr, ok := node.Annotations[granularityKey]; ok {
- meshGranularity = mesh.Granularity(gr)
- switch meshGranularity {
- case mesh.LogicalGranularity:
- case mesh.FullGranularity:
- default:
- meshGranularity = ""
- }
- }
- // TODO log some error or warning.
- key, _ := wgtypes.ParseKey(node.Annotations[keyAnnotationKey])
- return &mesh.Node{
- // Endpoint and InternalIP should only ever fail to parse if the
- // remote node's agent has not yet set its IP address;
- // in this case the IP will be nil and
- // the mesh can wait for the node to be updated.
- // It is valid for the InternalIP to be nil,
- // if the given node only has public IP addresses.
- Endpoint: endpoint,
- NoInternalIP: noInternalIP,
- InternalIP: internalIP,
- Key: key,
- LastSeen: lastSeen,
- Leader: leader,
- Location: location,
- Name: node.Name,
- PersistentKeepalive: persistentKeepalive,
- Subnet: subnet,
- // WireGuardIP can fail to parse if the node is not a leader or if
- // the node's agent has not yet reconciled. In either case, the IP
- // will parse as nil.
- WireGuardIP: normalizeIP(node.Annotations[wireGuardIPAnnotationKey]),
- DiscoveredEndpoints: discoveredEndpoints,
- AllowedLocationIPs: allowedLocationIPs,
- Granularity: meshGranularity,
- }
- }
- // translatePeer translates a Peer CRD to a mesh.Peer.
- func translatePeer(peer *v1alpha1.Peer) *mesh.Peer {
- if peer == nil {
- return nil
- }
- var aips []net.IPNet
- for _, aip := range peer.Spec.AllowedIPs {
- aip := normalizeIP(aip)
- // Skip any invalid IPs.
- if aip == nil {
- continue
- }
- aips = append(aips, *aip)
- }
- var endpoint *wireguard.Endpoint
- if peer.Spec.Endpoint != nil {
- ip := net.ParseIP(peer.Spec.Endpoint.IP)
- if ip4 := ip.To4(); ip4 != nil {
- ip = ip4
- } else {
- ip = ip.To16()
- }
- if peer.Spec.Endpoint.Port > 0 {
- if ip != nil {
- endpoint = wireguard.NewEndpoint(ip, int(peer.Spec.Endpoint.Port))
- }
- if peer.Spec.Endpoint.DNS != "" {
- endpoint = wireguard.ParseEndpoint(fmt.Sprintf("%s:%d", peer.Spec.Endpoint.DNS, peer.Spec.Endpoint.Port))
- }
- }
- }
- key, err := wgtypes.ParseKey(peer.Spec.PublicKey)
- if err != nil {
- _ = level.Error(logger).Log("msg", "failed to parse public key", "peer", peer.Name, "err", err.Error())
- }
- var psk *wgtypes.Key
- if k, err := wgtypes.ParseKey(peer.Spec.PresharedKey); err != nil {
- // Set key to nil to avoid setting a key to the zero value wgtypes.Key{}
- psk = nil
- } else {
- psk = &k
- }
- var pka time.Duration
- if peer.Spec.PersistentKeepalive > 0 {
- pka = time.Duration(peer.Spec.PersistentKeepalive) * time.Second
- }
- return &mesh.Peer{
- Name: peer.Name,
- Peer: wireguard.Peer{
- PeerConfig: wgtypes.PeerConfig{
- AllowedIPs: aips,
- PersistentKeepaliveInterval: &pka,
- PresharedKey: psk,
- PublicKey: key,
- },
- Endpoint: endpoint,
- },
- }
- }
- // CleanUp removes configuration applied to the backend.
- func (pb *peerBackend) CleanUp(_ context.Context, _ string) error {
- return nil
- }
- // Get gets a single Peer by name.
- func (pb *peerBackend) Get(name string) (*mesh.Peer, error) {
- p, err := pb.lister.Get(name)
- if err != nil {
- return nil, err
- }
- return translatePeer(p), nil
- }
- // Init initializes the backend; for this backend that means
- // syncing the informer cache.
- func (pb *peerBackend) Init(ctx context.Context) error {
- // Check the presents of the CRD peers.kilo.squat.ai.
- if _, err := pb.extensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, strings.Join([]string{v1alpha1.PeerPlural, v1alpha1.GroupName}, "."), metav1.GetOptions{}); err != nil {
- return fmt.Errorf("CRD is not present: %v", err)
- }
- go pb.informer.Run(ctx.Done())
- if ok := cache.WaitForCacheSync(ctx.Done(), func() bool {
- return pb.informer.HasSynced()
- }); !ok {
- return errors.New("failed to sync peer cache")
- }
- _, err := pb.informer.AddEventHandler(
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- p, ok := obj.(*v1alpha1.Peer)
- if !ok || p.Validate() != nil {
- // Failed to decode Peer; ignoring...
- return
- }
- pb.events <- &mesh.PeerEvent{Type: mesh.AddEvent, Peer: translatePeer(p)}
- },
- UpdateFunc: func(old, obj interface{}) {
- p, ok := obj.(*v1alpha1.Peer)
- if !ok || p.Validate() != nil {
- // Failed to decode Peer; ignoring...
- return
- }
- o, ok := old.(*v1alpha1.Peer)
- if !ok || o.Validate() != nil {
- // Failed to decode Peer; ignoring...
- return
- }
- pb.events <- &mesh.PeerEvent{Type: mesh.UpdateEvent, Peer: translatePeer(p), Old: translatePeer(o)}
- },
- DeleteFunc: func(obj interface{}) {
- p, ok := obj.(*v1alpha1.Peer)
- if !ok || p.Validate() != nil {
- // Failed to decode Peer; ignoring...
- return
- }
- pb.events <- &mesh.PeerEvent{Type: mesh.DeleteEvent, Peer: translatePeer(p)}
- },
- },
- )
- return err
- }
- // List gets all the Peers in the cluster.
- func (pb *peerBackend) List() ([]*mesh.Peer, error) {
- ps, err := pb.lister.List(labels.Everything())
- if err != nil {
- return nil, err
- }
- peers := make([]*mesh.Peer, len(ps))
- for i := range ps {
- // Skip invalid peers.
- if ps[i].Validate() != nil {
- continue
- }
- peers[i] = translatePeer(ps[i])
- }
- return peers, nil
- }
- // Set sets the fields of a peer.
- func (pb *peerBackend) Set(ctx context.Context, name string, peer *mesh.Peer) error {
- old, err := pb.lister.Get(name)
- if err != nil {
- return fmt.Errorf("failed to find peer: %v", err)
- }
- p := old.DeepCopy()
- p.Spec.AllowedIPs = make([]string, len(peer.AllowedIPs))
- for i := range peer.AllowedIPs {
- p.Spec.AllowedIPs[i] = peer.AllowedIPs[i].String()
- }
- if peer.Endpoint != nil {
- p.Spec.Endpoint = &v1alpha1.PeerEndpoint{
- DNSOrIP: v1alpha1.DNSOrIP{
- IP: peer.Endpoint.IP().String(),
- DNS: peer.Endpoint.DNS(),
- },
- Port: uint32(peer.Endpoint.Port()),
- }
- }
- if peer.PersistentKeepaliveInterval == nil {
- p.Spec.PersistentKeepalive = 0
- } else {
- p.Spec.PersistentKeepalive = int(*peer.PersistentKeepaliveInterval / time.Second)
- }
- if peer.PresharedKey == nil {
- p.Spec.PresharedKey = ""
- } else {
- p.Spec.PresharedKey = peer.PresharedKey.String()
- }
- p.Spec.PublicKey = peer.PublicKey.String()
- if _, err = pb.client.KiloV1alpha1().Peers().Update(ctx, p, metav1.UpdateOptions{}); err != nil {
- return fmt.Errorf("failed to update peer: %v", err)
- }
- return nil
- }
- // Watch returns a chan of peer events.
- func (pb *peerBackend) Watch() <-chan *mesh.PeerEvent {
- return pb.events
- }
- func normalizeIP(ip string) *net.IPNet {
- i, ipNet, err := net.ParseCIDR(ip)
- if err != nil || ipNet == nil {
- return nil
- }
- if ip4 := i.To4(); ip4 != nil {
- ipNet.IP = ip4
- return ipNet
- }
- ipNet.IP = i.To16()
- return ipNet
- }
|