| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- // Copyright 2019 the Kilo authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package calico
- import (
- "errors"
- "net"
- "time"
- "k8s.io/apimachinery/pkg/labels"
- v1informers "k8s.io/client-go/informers/core/v1"
- "k8s.io/client-go/kubernetes"
- v1listers "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "github.com/squat/kilo/pkg/ipset"
- "github.com/squat/kilo/pkg/mesh"
- )
- type Compatibility interface {
- Apply(*mesh.Topology, mesh.Encapsulate) error
- Backend(mesh.Backend) mesh.Backend
- CleanUp() error
- Run(stop <-chan struct{}) (<-chan error, error)
- }
- // calico is a Calico compatibility layer.
- type calico struct {
- client kubernetes.Interface
- errors chan error
- ipset *ipset.Set
- }
- // New generates a new ipset.
- func New(c kubernetes.Interface) Compatibility {
- return &calico{
- client: c,
- errors: make(chan error),
- // This is a patch until Calico supports
- // other hosts adding IPIP iptables rules.
- ipset: ipset.New("cali40all-hosts-net"),
- }
- }
- // Run implements the mesh.Compatibility interface.
- // It runs the ipset controller and forwards errors along.
- func (c *calico) Run(stop <-chan struct{}) (<-chan error, error) {
- return c.ipset.Run(stop)
- }
- // CleanUp stops the compatibility layer's controllers.
- func (c *calico) CleanUp() error {
- return c.ipset.CleanUp()
- }
- type backend struct {
- backend mesh.Backend
- client kubernetes.Interface
- events chan *mesh.NodeEvent
- informer cache.SharedIndexInformer
- lister v1listers.NodeLister
- }
- func (c *calico) Apply(t *mesh.Topology, encapsulate mesh.Encapsulate, location string) error {
- if encapsulate == mesh.NeverEncapsulate {
- return nil
- }
- var peers []net.IP
- for _, s := range t.segments {
- if s.location == location {
- peers = s.privateIPs
- break
- }
- }
- return c.ipset.Set(peers)
- }
- func (c *calico) Backend(b mesh.Backend) mesh.Backend {
- ni := v1informers.NewNodeInformer(c.client, 5*time.Minute, nil)
- return &backend{
- backend: b,
- events: make(chan *mesh.NodeEvent),
- informer: ni,
- lister: v1listers.NewNodeLister(ni.GetIndexer()),
- }
- }
- // Nodes implements the mesh.Backend interface.
- func (b *backend) Nodes() mesh.NodeBackend {
- return b
- }
- // Peers implements the mesh.Backend interface.
- func (b *backend) Peers() mesh.PeerBackend {
- // The Calico compatibility backend only wraps the node backend.
- return b.backend.Peers()
- }
- // CleanUp removes configuration applied to the backend.
- func (b *backend) CleanUp(name string) error {
- return b.backend.Nodes().CleanUp(name)
- }
- // Get gets a single Node by name.
- func (b *backend) Get(name string) (*mesh.Node, error) {
- n, err := b.lister.Get(name)
- if err != nil {
- return nil, err
- }
- m, err := b.backend.Nodes().Get(name)
- if err != nil {
- return nil, err
- }
- return translateNode(n, m), nil
- }
- // Init initializes the backend; for this backend that means
- // syncing the informer cache and the wrapped backend.
- func (b *backend) Init(stop <-chan struct{}) error {
- if err := b.backend.Nodes().Init(stop); err != nil {
- return err
- }
- go b.informer.Run(stop)
- if ok := cache.WaitForCacheSync(stop, func() bool {
- return b.informer.HasSynced()
- }); !ok {
- return errors.New("failed to sync node cache")
- }
- go func() {
- w := b.backend.Nodes().Watch()
- var ne *mesh.NodeEvent
- for {
- select {
- case ne = <-w:
- b.events <- &mesh.NodeEvent{Type: ne.Type, Node: translateNode(n, ne.Node)}
- case <-stop:
- return
- }
- }
- }()
- return nil
- }
- // List gets all the Nodes in the cluster.
- func (b *backend) List() ([]*mesh.Node, error) {
- ns, err := b.lister.List(labels.Everything())
- if err != nil {
- return nil, err
- }
- nodes := make([]*mesh.Node, len(ns))
- for i := range ns {
- nodes[i] = translateNode(ns[i])
- }
- return nodes, nil
- }
- // Set sets the fields of a node.
- func (b *backend) Set(name string, node *mesh.Node) error {
- // The Calico compatibility backend is read-only.
- // Proxy all writes to the underlying backend.
- return b.backend.Nodes().Set(name, node)
- }
- // Watch returns a chan of node events.
- func (b *backend) Watch() <-chan *mesh.NodeEvent {
- return b.events
- }
|