// Copyright 2019 the Kilo authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package calico

import (
	"errors"
	"net"
	"time"

	"k8s.io/apimachinery/pkg/labels"
	v1informers "k8s.io/client-go/informers/core/v1"
	"k8s.io/client-go/kubernetes"
	v1listers "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/tools/cache"

	"github.com/squat/kilo/pkg/ipset"
	"github.com/squat/kilo/pkg/mesh"
)

type Compatibility interface {
	Apply(*mesh.Topology, mesh.Encapsulate) error
	Backend(mesh.Backend) mesh.Backend
	CleanUp() error
	Run(stop <-chan struct{}) (<-chan error, error)
}

// calico is a Calico compatibility layer.
type calico struct {
	client kubernetes.Interface
	errors chan error
	ipset  *ipset.Set
}

// New generates a new ipset.
func New(c kubernetes.Interface) Compatibility {
	return &calico{
		client: c,
		errors: make(chan error),
		// This is a patch until Calico supports
		// other hosts adding IPIP iptables rules.
		ipset: ipset.New("cali40all-hosts-net"),
	}
}

// Run implements the mesh.Compatibility interface.
// It runs the ipset controller and forwards errors along.
func (c *calico) Run(stop <-chan struct{}) (<-chan error, error) {
	return c.ipset.Run(stop)
}

// CleanUp stops the compatibility layer's controllers.
func (c *calico) CleanUp() error {
	return c.ipset.CleanUp()
}

type backend struct {
	backend  mesh.Backend
	client   kubernetes.Interface
	events   chan *mesh.NodeEvent
	informer cache.SharedIndexInformer
	lister   v1listers.NodeLister
}

func (c *calico) Apply(t *mesh.Topology, encapsulate mesh.Encapsulate, location string) error {
	if encapsulate == mesh.NeverEncapsulate {
		return nil
	}
	var peers []net.IP
	for _, s := range t.segments {
		if s.location == location {
			peers = s.privateIPs
			break
		}
	}
	return c.ipset.Set(peers)
}

func (c *calico) Backend(b mesh.Backend) mesh.Backend {
	ni := v1informers.NewNodeInformer(c.client, 5*time.Minute, nil)
	return &backend{
		backend:  b,
		events:   make(chan *mesh.NodeEvent),
		informer: ni,
		lister:   v1listers.NewNodeLister(ni.GetIndexer()),
	}
}

// Nodes implements the mesh.Backend interface.
func (b *backend) Nodes() mesh.NodeBackend {
	return b
}

// Peers implements the mesh.Backend interface.
func (b *backend) Peers() mesh.PeerBackend {
	// The Calico compatibility backend only wraps the node backend.
	return b.backend.Peers()
}

// CleanUp removes configuration applied to the backend.
func (b *backend) CleanUp(name string) error {
	return b.backend.Nodes().CleanUp(name)
}

// Get gets a single Node by name.
func (b *backend) Get(name string) (*mesh.Node, error) {
	n, err := b.lister.Get(name)
	if err != nil {
		return nil, err
	}
	m, err := b.backend.Nodes().Get(name)
	if err != nil {
		return nil, err
	}
	return translateNode(n, m), nil
}

// Init initializes the backend; for this backend that means
// syncing the informer cache and the wrapped backend.
func (b *backend) Init(stop <-chan struct{}) error {
	if err := b.backend.Nodes().Init(stop); err != nil {
		return err
	}
	go b.informer.Run(stop)
	if ok := cache.WaitForCacheSync(stop, func() bool {
		return b.informer.HasSynced()
	}); !ok {
		return errors.New("failed to sync node cache")
	}
	go func() {
		w := b.backend.Nodes().Watch()
		var ne *mesh.NodeEvent
		for {
			select {
			case ne = <-w:
				b.events <- &mesh.NodeEvent{Type: ne.Type, Node: translateNode(n, ne.Node)}
			case <-stop:
				return
			}
		}
	}()
	return nil
}

// List gets all the Nodes in the cluster.
func (b *backend) List() ([]*mesh.Node, error) {
	ns, err := b.lister.List(labels.Everything())
	if err != nil {
		return nil, err
	}
	nodes := make([]*mesh.Node, len(ns))
	for i := range ns {
		nodes[i] = translateNode(ns[i])
	}
	return nodes, nil
}

// Set sets the fields of a node.
func (b *backend) Set(name string, node *mesh.Node) error {
	// The Calico compatibility backend is read-only.
	// Proxy all writes to the underlying backend.
	return b.backend.Nodes().Set(name, node)
}

// Watch returns a chan of node events.
func (b *backend) Watch() <-chan *mesh.NodeEvent {
	return b.events
}
