|
|
@@ -3,7 +3,6 @@ package kubernetes
|
|
|
import (
|
|
|
"context"
|
|
|
"encoding/base64"
|
|
|
- "errors"
|
|
|
"fmt"
|
|
|
"os"
|
|
|
"path/filepath"
|
|
|
@@ -64,18 +63,18 @@ func GetDynamicClientOutOfClusterConfig(conf *OutOfClusterConfig) (dynamic.Inter
|
|
|
}
|
|
|
|
|
|
// GetAgentOutOfClusterConfig creates a new Agent using the OutOfClusterConfig
|
|
|
-func GetAgentOutOfClusterConfig(conf *OutOfClusterConfig) (*Agent, error) {
|
|
|
- ctx, span := telemetry.NewSpan(context.Background(), "get-agent-out-of-cluster-config")
|
|
|
+func GetAgentOutOfClusterConfig(ctx context.Context, conf *OutOfClusterConfig) (*Agent, error) {
|
|
|
+ ctx, span := telemetry.NewSpan(ctx, "get-agent-out-of-cluster-config")
|
|
|
defer span.End()
|
|
|
|
|
|
if conf.AllowInClusterConnections && conf.Cluster.AuthMechanism == models.InCluster {
|
|
|
- return GetAgentInClusterConfig(conf.DefaultNamespace)
|
|
|
+ return GetAgentInClusterConfig(ctx, conf.DefaultNamespace)
|
|
|
}
|
|
|
|
|
|
var restConf *rest.Config
|
|
|
|
|
|
if conf.Cluster.ProvisionedBy == "CAPI" {
|
|
|
- telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "capi-provisioned", Value: true})
|
|
|
+ telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "provisioner", Value: conf.Cluster.ProvisionedBy})
|
|
|
|
|
|
rc, err := restConfigForCAPICluster(ctx, conf.CAPIManagementClusterClient, *conf.Cluster)
|
|
|
if err != nil {
|
|
|
@@ -83,6 +82,8 @@ func GetAgentOutOfClusterConfig(conf *OutOfClusterConfig) (*Agent, error) {
|
|
|
}
|
|
|
restConf = rc
|
|
|
} else {
|
|
|
+ telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "provisioner", Value: "non-capi"})
|
|
|
+
|
|
|
rc, err := conf.ToRESTConfig()
|
|
|
if err != nil {
|
|
|
return nil, telemetry.Error(ctx, span, err, "error getting rest config")
|
|
|
@@ -99,7 +100,9 @@ func GetAgentOutOfClusterConfig(conf *OutOfClusterConfig) (*Agent, error) {
|
|
|
return nil, telemetry.Error(ctx, span, err, "error getting new clientset for config")
|
|
|
}
|
|
|
|
|
|
- return &Agent{conf, clientset}, nil
|
|
|
+ agent := NewKubernetesAgent(ctx, conf, clientset)
|
|
|
+
|
|
|
+ return &agent, nil
|
|
|
}
|
|
|
|
|
|
// restConfigForCAPICluster gets the kubernetes rest API client for a CAPI cluster
|
|
|
@@ -196,21 +199,29 @@ func IsInCluster() bool {
|
|
|
|
|
|
// GetAgentInClusterConfig uses the service account that kubernetes
|
|
|
// gives to pods to connect
|
|
|
-func GetAgentInClusterConfig(namespace string) (*Agent, error) {
|
|
|
+func GetAgentInClusterConfig(ctx context.Context, namespace string) (*Agent, error) {
|
|
|
conf, err := rest.InClusterConfig()
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("error getting in cluster config: %w", err)
|
|
|
}
|
|
|
|
|
|
restClientGetter := NewRESTClientGetterFromInClusterConfig(conf, namespace)
|
|
|
clientset, err := kubernetes.NewForConfig(conf)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("error getting new clientset for config: %w", err)
|
|
|
+ }
|
|
|
|
|
|
- return &Agent{restClientGetter, clientset}, nil
|
|
|
+ agent := NewKubernetesAgent(ctx, restClientGetter, clientset)
|
|
|
+
|
|
|
+ return &agent, nil
|
|
|
}
|
|
|
|
|
|
// GetAgentTesting creates a new Agent using an optional existing storage class
|
|
|
+// TODO: this should be in a test package, not here.
|
|
|
func GetAgentTesting(objects ...runtime.Object) *Agent {
|
|
|
- return &Agent{&fakeRESTClientGetter{}, fake.NewSimpleClientset(objects...)}
|
|
|
+ agent := NewKubernetesAgent(context.Background(), &fakeRESTClientGetter{}, fake.NewSimpleClientset(objects...))
|
|
|
+
|
|
|
+ return &agent
|
|
|
}
|
|
|
|
|
|
// OutOfClusterConfig is the set of parameters required for an out-of-cluster connection.
|
|
|
@@ -230,11 +241,18 @@ type OutOfClusterConfig struct {
|
|
|
|
|
|
// ToRESTConfig creates a kubernetes REST client factory -- it calls ClientConfig on
|
|
|
// the result of ToRawKubeConfigLoader, and also adds a custom http transport layer
|
|
|
-// if necessary (required for GCP auth)
|
|
|
+// if necessary (required for GCP auth).
|
|
|
+// TODO: this should be split out from OutOfClusterConfig, and implemented separately in order to wrap the kubernetes RESTGetter interface.
|
|
|
+// Until then, we lose context propagation on all these calls
|
|
|
func (conf *OutOfClusterConfig) ToRESTConfig() (*rest.Config, error) {
|
|
|
ctx, span := telemetry.NewSpan(context.Background(), "ooc-to-rest-config")
|
|
|
defer span.End()
|
|
|
|
|
|
+ telemetry.WithAttributes(span,
|
|
|
+ telemetry.AttributeKV{Key: "cluster-id", Value: conf.Cluster.ID},
|
|
|
+ telemetry.AttributeKV{Key: "project-id", Value: conf.Cluster.ProjectID},
|
|
|
+ )
|
|
|
+
|
|
|
if conf.Cluster.ProvisionedBy == "CAPI" {
|
|
|
telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "capi-provisioned", Value: true})
|
|
|
|
|
|
@@ -360,7 +378,7 @@ func (conf *OutOfClusterConfig) GetClientConfigFromCluster(ctx context.Context)
|
|
|
return clientcmd.NewClientConfigFromBytes(kubeAuth.Kubeconfig)
|
|
|
}
|
|
|
|
|
|
- apiConfig, err := conf.CreateRawConfigFromCluster()
|
|
|
+ apiConfig, err := conf.CreateRawConfigFromCluster(ctx)
|
|
|
if err != nil {
|
|
|
return nil, telemetry.Error(ctx, span, err, "error creating raw config from cluster")
|
|
|
}
|
|
|
@@ -379,7 +397,10 @@ func (conf *OutOfClusterConfig) GetClientConfigFromCluster(ctx context.Context)
|
|
|
return config, nil
|
|
|
}
|
|
|
|
|
|
-func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error) {
|
|
|
+func (conf *OutOfClusterConfig) CreateRawConfigFromCluster(ctx context.Context) (*api.Config, error) {
|
|
|
+ ctx, span := telemetry.NewSpan(ctx, "ooc-create-raw-config-from-cluster")
|
|
|
+ defer span.End()
|
|
|
+
|
|
|
cluster := conf.Cluster
|
|
|
|
|
|
apiConfig := &api.Config{}
|
|
|
@@ -408,6 +429,11 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
authInfoMap[authInfoName].ImpersonateGroups = groups
|
|
|
}
|
|
|
|
|
|
+ telemetry.WithAttributes(span,
|
|
|
+ telemetry.AttributeKV{Key: "auth-mechanism", Value: cluster.AuthMechanism},
|
|
|
+ telemetry.AttributeKV{Key: "server", Value: cluster.Server},
|
|
|
+ )
|
|
|
+
|
|
|
switch cluster.AuthMechanism {
|
|
|
case models.X509:
|
|
|
kubeAuth, err := conf.Repo.KubeIntegration().ReadKubeIntegration(
|
|
|
@@ -415,9 +441,13 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
cluster.KubeIntegrationID,
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "error reading kube integration")
|
|
|
}
|
|
|
|
|
|
+ telemetry.WithAttributes(span,
|
|
|
+ telemetry.AttributeKV{Key: "integration-id", Value: cluster.KubeIntegrationID},
|
|
|
+ )
|
|
|
+
|
|
|
authInfoMap[authInfoName].ClientCertificateData = kubeAuth.ClientCertificateData
|
|
|
authInfoMap[authInfoName].ClientKeyData = kubeAuth.ClientKeyData
|
|
|
case models.Basic:
|
|
|
@@ -426,8 +456,11 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
cluster.KubeIntegrationID,
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "error reading kube integration")
|
|
|
}
|
|
|
+ telemetry.WithAttributes(span,
|
|
|
+ telemetry.AttributeKV{Key: "integration-id", Value: cluster.KubeIntegrationID},
|
|
|
+ )
|
|
|
|
|
|
authInfoMap[authInfoName].Username = string(kubeAuth.Username)
|
|
|
authInfoMap[authInfoName].Password = string(kubeAuth.Password)
|
|
|
@@ -437,7 +470,7 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
cluster.KubeIntegrationID,
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "error reading kube integration")
|
|
|
}
|
|
|
|
|
|
authInfoMap[authInfoName].Token = string(kubeAuth.Token)
|
|
|
@@ -447,8 +480,11 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
cluster.OIDCIntegrationID,
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "error reading oidc integration")
|
|
|
}
|
|
|
+ telemetry.WithAttributes(span,
|
|
|
+ telemetry.AttributeKV{Key: "integration-id", Value: cluster.OIDCIntegrationID},
|
|
|
+ )
|
|
|
|
|
|
authInfoMap[authInfoName].AuthProvider = &api.AuthProviderConfig{
|
|
|
Name: "oidc",
|
|
|
@@ -467,17 +503,24 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
cluster.GCPIntegrationID,
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "error reading gcp integration")
|
|
|
}
|
|
|
|
|
|
+ telemetry.WithAttributes(span,
|
|
|
+ telemetry.AttributeKV{Key: "integration-id", Value: cluster.GCPIntegrationID},
|
|
|
+ )
|
|
|
+
|
|
|
tok, err := gcpAuth.GetBearerToken(
|
|
|
+ ctx,
|
|
|
conf.getTokenCache,
|
|
|
conf.setTokenCache,
|
|
|
"https://www.googleapis.com/auth/cloud-platform",
|
|
|
)
|
|
|
-
|
|
|
- if tok == nil && err != nil {
|
|
|
- return nil, err
|
|
|
+ if err != nil {
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "error getting gcp token")
|
|
|
+ }
|
|
|
+ if tok == nil {
|
|
|
+ return nil, telemetry.Error(ctx, span, nil, "unable to get gcp token")
|
|
|
}
|
|
|
|
|
|
// add this as a bearer token
|
|
|
@@ -488,9 +531,13 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
cluster.AWSIntegrationID,
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "error reading aws integration")
|
|
|
}
|
|
|
|
|
|
+ telemetry.WithAttributes(span,
|
|
|
+ telemetry.AttributeKV{Key: "integration-id", Value: cluster.AWSIntegrationID},
|
|
|
+ )
|
|
|
+
|
|
|
awsClusterID := cluster.Name
|
|
|
shouldOverride := false
|
|
|
|
|
|
@@ -499,9 +546,9 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
shouldOverride = true
|
|
|
}
|
|
|
|
|
|
- tok, err := awsAuth.GetBearerToken(conf.getTokenCache, conf.setTokenCache, awsClusterID, shouldOverride)
|
|
|
+ tok, err := awsAuth.GetBearerToken(ctx, conf.getTokenCache, conf.setTokenCache, awsClusterID, shouldOverride)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "unable to get AWS bearer token")
|
|
|
}
|
|
|
|
|
|
// add this as a bearer token
|
|
|
@@ -512,12 +559,15 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
cluster.DOIntegrationID,
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "error reading oauth integration")
|
|
|
}
|
|
|
+ telemetry.WithAttributes(span,
|
|
|
+ telemetry.AttributeKV{Key: "integration-id", Value: cluster.DOIntegrationID},
|
|
|
+ )
|
|
|
|
|
|
tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, conf.DigitalOceanOAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, conf.Repo))
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "unable to get oauth access token for Digital Ocean")
|
|
|
}
|
|
|
|
|
|
// add this as a bearer token
|
|
|
@@ -528,12 +578,15 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
cluster.AzureIntegrationID,
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, telemetry.Error(ctx, span, err, "error reading azure integration")
|
|
|
}
|
|
|
+ telemetry.WithAttributes(span,
|
|
|
+ telemetry.AttributeKV{Key: "integration-id", Value: cluster.AzureIntegrationID},
|
|
|
+ )
|
|
|
|
|
|
authInfoMap[authInfoName].Token = string(azInt.AKSPassword)
|
|
|
default:
|
|
|
- return nil, errors.New("not a supported auth mechanism")
|
|
|
+ return nil, telemetry.Error(ctx, span, nil, "auth mechanism not supported")
|
|
|
}
|
|
|
|
|
|
// create a context of the cluster name
|
|
|
@@ -553,11 +606,11 @@ func (conf *OutOfClusterConfig) CreateRawConfigFromCluster() (*api.Config, error
|
|
|
return apiConfig, nil
|
|
|
}
|
|
|
|
|
|
-func (conf *OutOfClusterConfig) getTokenCache() (tok *ints.TokenCache, err error) {
|
|
|
+func (conf *OutOfClusterConfig) getTokenCache(ctx context.Context) (tok *ints.TokenCache, err error) {
|
|
|
return &conf.Cluster.TokenCache.TokenCache, nil
|
|
|
}
|
|
|
|
|
|
-func (conf *OutOfClusterConfig) setTokenCache(token string, expiry time.Time) error {
|
|
|
+func (conf *OutOfClusterConfig) setTokenCache(ctx context.Context, token string, expiry time.Time) error {
|
|
|
_, err := conf.Repo.Cluster().UpdateClusterTokenCache(
|
|
|
&ints.ClusterTokenCache{
|
|
|
ClusterID: conf.Cluster.ID,
|