| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626 |
- package kubernetes
- import (
- "bufio"
- "bytes"
- "context"
- "fmt"
- "io"
- "strings"
- "github.com/porter-dev/porter/internal/kubernetes/provisioner"
- "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
- "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
- "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
- "github.com/porter-dev/porter/internal/kubernetes/provisioner/do"
- "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/docr"
- "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/doks"
- "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp"
- "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp/gke"
- "github.com/porter-dev/porter/internal/models"
- "github.com/porter-dev/porter/internal/models/integrations"
- "github.com/porter-dev/porter/internal/oauth"
- "github.com/porter-dev/porter/internal/registry"
- "github.com/porter-dev/porter/internal/repository"
- "golang.org/x/oauth2"
- "github.com/gorilla/websocket"
- "github.com/porter-dev/porter/internal/helm/grapher"
- appsv1 "k8s.io/api/apps/v1"
- batchv1 "k8s.io/api/batch/v1"
- v1 "k8s.io/api/core/v1"
- v1beta1 "k8s.io/api/extensions/v1beta1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/cli-runtime/pkg/genericclioptions"
- "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
- "github.com/porter-dev/porter/internal/config"
- )
- // Agent is a Kubernetes agent for performing operations that interact with the
- // api server
- type Agent struct {
- RESTClientGetter genericclioptions.RESTClientGetter
- Clientset kubernetes.Interface
- }
- type Message struct {
- EventType string
- Object interface{}
- Kind string
- }
- type ListOptions struct {
- FieldSelector string
- }
- // ListNamespaces simply lists namespaces
- func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
- return a.Clientset.CoreV1().Namespaces().List(
- context.TODO(),
- metav1.ListOptions{},
- )
- }
- // GetIngress gets ingress given the name and namespace
- func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
- return a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
- context.TODO(),
- name,
- metav1.GetOptions{},
- )
- }
- // GetDeployment gets the deployment given the name and namespace
- func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
- return a.Clientset.AppsV1().Deployments(c.Namespace).Get(
- context.TODO(),
- c.Name,
- metav1.GetOptions{},
- )
- }
- // GetStatefulSet gets the statefulset given the name and namespace
- func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
- return a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
- context.TODO(),
- c.Name,
- metav1.GetOptions{},
- )
- }
- // GetReplicaSet gets the replicaset given the name and namespace
- func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
- return a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
- context.TODO(),
- c.Name,
- metav1.GetOptions{},
- )
- }
- // GetDaemonSet gets the daemonset by name and namespace
- func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
- return a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
- context.TODO(),
- c.Name,
- metav1.GetOptions{},
- )
- }
- // GetPodsByLabel retrieves pods with matching labels
- func (a *Agent) GetPodsByLabel(selector string) (*v1.PodList, error) {
- // Search in all namespaces for matching pods
- return a.Clientset.CoreV1().Pods("").List(
- context.TODO(),
- metav1.ListOptions{
- LabelSelector: selector,
- },
- )
- }
- // GetPodLogs streams real-time logs from a given pod.
- func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
- tails := int64(400)
- // follow logs
- podLogOpts := v1.PodLogOptions{
- Follow: true,
- TailLines: &tails,
- }
- req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
- podLogs, err := req.Stream(context.TODO())
- if err != nil {
- return fmt.Errorf("Cannot open log stream for pod %s", name)
- }
- defer podLogs.Close()
- r := bufio.NewReader(podLogs)
- errorchan := make(chan error)
- go func() {
- // listens for websocket closing handshake
- for {
- if _, _, err := conn.ReadMessage(); err != nil {
- defer conn.Close()
- errorchan <- nil
- fmt.Println("Successfully closed log stream")
- return
- }
- }
- }()
- go func() {
- for {
- select {
- case <-errorchan:
- defer close(errorchan)
- return
- default:
- }
- bytes, err := r.ReadBytes('\n')
- if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
- errorchan <- writeErr
- return
- }
- if err != nil {
- if err != io.EOF {
- errorchan <- err
- return
- }
- errorchan <- nil
- return
- }
- }
- }()
- for {
- select {
- case err = <-errorchan:
- return err
- }
- }
- }
- // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
- // TODO: Support Jobs
- func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string) error {
- factory := informers.NewSharedInformerFactory(
- a.Clientset,
- 0,
- )
- var informer cache.SharedInformer
- // Spins up an informer depending on kind. Convert to lowercase for robustness
- switch strings.ToLower(kind) {
- case "deployment":
- informer = factory.Apps().V1().Deployments().Informer()
- case "statefulset":
- informer = factory.Apps().V1().StatefulSets().Informer()
- case "replicaset":
- informer = factory.Apps().V1().ReplicaSets().Informer()
- case "daemonset":
- informer = factory.Apps().V1().DaemonSets().Informer()
- }
- stopper := make(chan struct{})
- errorchan := make(chan error)
- defer close(errorchan)
- informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
- UpdateFunc: func(oldObj, newObj interface{}) {
- msg := Message{
- EventType: "UPDATE",
- Object: newObj,
- Kind: strings.ToLower(kind),
- }
- if writeErr := conn.WriteJSON(msg); writeErr != nil {
- errorchan <- writeErr
- return
- }
- },
- })
- go func() {
- // listens for websocket closing handshake
- for {
- if _, _, err := conn.ReadMessage(); err != nil {
- defer conn.Close()
- defer close(stopper)
- defer fmt.Println("Successfully closed controller status stream")
- errorchan <- nil
- return
- }
- }
- }()
- go informer.Run(stopper)
- for {
- select {
- case err := <-errorchan:
- return err
- }
- }
- }
- // ProvisionECR spawns a new provisioning pod that creates an ECR instance
- func (a *Agent) ProvisionECR(
- projectID uint,
- awsConf *integrations.AWSIntegration,
- ecrName string,
- repo repository.Repository,
- infra *models.Infra,
- operation provisioner.ProvisionerOperation,
- pgConf *config.DBConf,
- redisConf *config.RedisConf,
- provImageTag string,
- ) (*batchv1.Job, error) {
- id := infra.GetUniqueName()
- prov := &provisioner.Conf{
- ID: id,
- Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
- Kind: provisioner.ECR,
- Operation: operation,
- Redis: redisConf,
- Postgres: pgConf,
- ProvisionerImageTag: provImageTag,
- LastApplied: infra.LastApplied,
- AWS: &aws.Conf{
- AWSRegion: awsConf.AWSRegion,
- AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
- AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
- },
- ECR: &ecr.Conf{
- ECRName: ecrName,
- },
- }
- return a.provision(prov, infra, repo)
- }
- // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
- func (a *Agent) ProvisionEKS(
- projectID uint,
- awsConf *integrations.AWSIntegration,
- eksName string,
- repo repository.Repository,
- infra *models.Infra,
- operation provisioner.ProvisionerOperation,
- pgConf *config.DBConf,
- redisConf *config.RedisConf,
- provImageTag string,
- ) (*batchv1.Job, error) {
- id := infra.GetUniqueName()
- prov := &provisioner.Conf{
- ID: id,
- Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
- Kind: provisioner.EKS,
- Operation: operation,
- Redis: redisConf,
- Postgres: pgConf,
- ProvisionerImageTag: provImageTag,
- LastApplied: infra.LastApplied,
- AWS: &aws.Conf{
- AWSRegion: awsConf.AWSRegion,
- AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
- AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
- },
- EKS: &eks.Conf{
- ClusterName: eksName,
- },
- }
- return a.provision(prov, infra, repo)
- }
- // ProvisionGCR spawns a new provisioning pod that creates a GCR instance
- func (a *Agent) ProvisionGCR(
- projectID uint,
- gcpConf *integrations.GCPIntegration,
- repo repository.Repository,
- infra *models.Infra,
- operation provisioner.ProvisionerOperation,
- pgConf *config.DBConf,
- redisConf *config.RedisConf,
- provImageTag string,
- ) (*batchv1.Job, error) {
- id := infra.GetUniqueName()
- prov := &provisioner.Conf{
- ID: id,
- Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
- Kind: provisioner.GCR,
- Operation: operation,
- Redis: redisConf,
- Postgres: pgConf,
- ProvisionerImageTag: provImageTag,
- LastApplied: infra.LastApplied,
- GCP: &gcp.Conf{
- GCPRegion: gcpConf.GCPRegion,
- GCPProjectID: gcpConf.GCPProjectID,
- GCPKeyData: string(gcpConf.GCPKeyData),
- },
- }
- return a.provision(prov, infra, repo)
- }
- // ProvisionGKE spawns a new provisioning pod that creates a GKE instance
- func (a *Agent) ProvisionGKE(
- projectID uint,
- gcpConf *integrations.GCPIntegration,
- gkeName string,
- repo repository.Repository,
- infra *models.Infra,
- operation provisioner.ProvisionerOperation,
- pgConf *config.DBConf,
- redisConf *config.RedisConf,
- provImageTag string,
- ) (*batchv1.Job, error) {
- id := infra.GetUniqueName()
- prov := &provisioner.Conf{
- ID: id,
- Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
- Kind: provisioner.GKE,
- Operation: operation,
- Redis: redisConf,
- Postgres: pgConf,
- ProvisionerImageTag: provImageTag,
- LastApplied: infra.LastApplied,
- GCP: &gcp.Conf{
- GCPRegion: gcpConf.GCPRegion,
- GCPProjectID: gcpConf.GCPProjectID,
- GCPKeyData: string(gcpConf.GCPKeyData),
- },
- GKE: &gke.Conf{
- ClusterName: gkeName,
- },
- }
- return a.provision(prov, infra, repo)
- }
- // ProvisionDOCR spawns a new provisioning pod that creates a DOCR instance
- func (a *Agent) ProvisionDOCR(
- projectID uint,
- doConf *integrations.OAuthIntegration,
- doAuth *oauth2.Config,
- repo repository.Repository,
- docrName, docrSubscriptionTier string,
- infra *models.Infra,
- operation provisioner.ProvisionerOperation,
- pgConf *config.DBConf,
- redisConf *config.RedisConf,
- provImageTag string,
- ) (*batchv1.Job, error) {
- // get the token
- oauthInt, err := repo.OAuthIntegration.ReadOAuthIntegration(
- infra.DOIntegrationID,
- )
- if err != nil {
- return nil, err
- }
- tok, _, err := oauth.GetAccessToken(oauthInt, doAuth, repo)
- if err != nil {
- return nil, err
- }
- id := infra.GetUniqueName()
- prov := &provisioner.Conf{
- ID: id,
- Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
- Kind: provisioner.DOCR,
- Operation: operation,
- Redis: redisConf,
- Postgres: pgConf,
- ProvisionerImageTag: provImageTag,
- LastApplied: infra.LastApplied,
- DO: &do.Conf{
- DOToken: tok,
- },
- DOCR: &docr.Conf{
- DOCRName: docrName,
- DOCRSubscriptionTier: docrSubscriptionTier,
- },
- }
- return a.provision(prov, infra, repo)
- }
- // ProvisionDOKS spawns a new provisioning pod that creates a DOKS instance
- func (a *Agent) ProvisionDOKS(
- projectID uint,
- doConf *integrations.OAuthIntegration,
- doAuth *oauth2.Config,
- repo repository.Repository,
- doRegion, doksClusterName string,
- infra *models.Infra,
- operation provisioner.ProvisionerOperation,
- pgConf *config.DBConf,
- redisConf *config.RedisConf,
- provImageTag string,
- ) (*batchv1.Job, error) {
- // get the token
- oauthInt, err := repo.OAuthIntegration.ReadOAuthIntegration(
- infra.DOIntegrationID,
- )
- if err != nil {
- return nil, err
- }
- tok, _, err := oauth.GetAccessToken(oauthInt, doAuth, repo)
- if err != nil {
- return nil, err
- }
- id := infra.GetUniqueName()
- prov := &provisioner.Conf{
- ID: id,
- Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
- Kind: provisioner.DOKS,
- Operation: operation,
- Redis: redisConf,
- Postgres: pgConf,
- LastApplied: infra.LastApplied,
- ProvisionerImageTag: provImageTag,
- DO: &do.Conf{
- DOToken: tok,
- },
- DOKS: &doks.Conf{
- DORegion: doRegion,
- DOKSClusterName: doksClusterName,
- },
- }
- return a.provision(prov, infra, repo)
- }
- // ProvisionTest spawns a new provisioning pod that tests provisioning
- func (a *Agent) ProvisionTest(
- projectID uint,
- infra *models.Infra,
- repo repository.Repository,
- operation provisioner.ProvisionerOperation,
- pgConf *config.DBConf,
- redisConf *config.RedisConf,
- provImageTag string,
- ) (*batchv1.Job, error) {
- id := infra.GetUniqueName()
- prov := &provisioner.Conf{
- ID: id,
- Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
- Operation: operation,
- Kind: provisioner.Test,
- Redis: redisConf,
- Postgres: pgConf,
- ProvisionerImageTag: provImageTag,
- }
- return a.provision(prov, infra, repo)
- }
- func (a *Agent) provision(
- prov *provisioner.Conf,
- infra *models.Infra,
- repo repository.Repository,
- ) (*batchv1.Job, error) {
- prov.Namespace = "default"
- job, err := prov.GetProvisionerJobTemplate()
- if err != nil {
- return nil, err
- }
- job, err = a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
- context.TODO(),
- job,
- metav1.CreateOptions{},
- )
- if err != nil {
- return nil, err
- }
- infra.LastApplied = prov.LastApplied
- infra, err = repo.Infra.UpdateInfra(infra)
- if err != nil {
- return nil, err
- }
- return job, nil
- }
- // CreateImagePullSecrets will create the required image pull secrets and
- // return a map from the registry name to the name of the secret.
- func (a *Agent) CreateImagePullSecrets(
- repo repository.Repository,
- namespace string,
- linkedRegs map[string]*models.Registry,
- doAuth *oauth2.Config,
- ) (map[string]string, error) {
- res := make(map[string]string)
- for key, val := range linkedRegs {
- _reg := registry.Registry(*val)
- data, err := _reg.GetDockerConfigJSON(repo, doAuth)
- if err != nil {
- return nil, err
- }
- secretName := fmt.Sprintf("porter-%s-%d", val.Externalize().Service, val.ID)
- secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
- context.TODO(),
- secretName,
- metav1.GetOptions{},
- )
- // if not found, create the secret
- if err != nil && errors.IsNotFound(err) {
- _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
- context.TODO(),
- &v1.Secret{
- ObjectMeta: metav1.ObjectMeta{
- Name: secretName,
- },
- Data: map[string][]byte{
- string(v1.DockerConfigJsonKey): data,
- },
- Type: v1.SecretTypeDockerConfigJson,
- },
- metav1.CreateOptions{},
- )
- if err != nil {
- return nil, err
- }
- // add secret name to the map
- res[key] = secretName
- continue
- } else if err != nil {
- return nil, err
- }
- // otherwise, check that the secret contains the correct data: if
- // if doesn't, update it
- if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
- _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
- context.TODO(),
- &v1.Secret{
- ObjectMeta: metav1.ObjectMeta{
- Name: secretName,
- },
- Data: map[string][]byte{
- string(v1.DockerConfigJsonKey): data,
- },
- Type: v1.SecretTypeDockerConfigJson,
- },
- metav1.UpdateOptions{},
- )
- if err != nil {
- return nil, err
- }
- }
- // add secret name to the map
- res[key] = secretName
- }
- return res, nil
- }
|