nodestats.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package nodestats
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "github.com/opencost/opencost/core/pkg/clustercache"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/util/worker"
  11. v1 "k8s.io/api/core/v1"
  12. "k8s.io/client-go/rest"
  13. stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
  14. )
  15. type StatSummaryClient interface {
  16. GetNodeData() ([]*stats.Summary, error)
  17. }
  18. // NodeStatsSummaryClient is a client used to retrieve node and container stats summaries from a Kubernetes cluster,
  19. // via communicating with the kubelet API on each node.
  20. type NodeStatsSummaryClient struct {
  21. config *NodeClientConfig
  22. directClient *NodeHttpClient
  23. proxyClient *NodeHttpClient
  24. cache clustercache.ClusterCache
  25. endpoint string
  26. clusterHostUrl string
  27. bearerTokenFile string
  28. }
  29. // NewNodeStatsSummaryClient creates a new NodeStatsSummaryClient with the provided configuration and in-cluster config.
  30. func NewNodeStatsSummaryClient(cache clustercache.ClusterCache, config *NodeClientConfig, inClusterConfig *rest.Config) *NodeStatsSummaryClient {
  31. return &NodeStatsSummaryClient{
  32. config: config,
  33. directClient: NewNodeHttpClient(&http.Client{Transport: config.Transport}),
  34. proxyClient: NewNodeHttpClient(&http.Client{Transport: config.Transport}),
  35. cache: cache,
  36. endpoint: "stats/summary",
  37. clusterHostUrl: inClusterConfig.Host,
  38. bearerTokenFile: inClusterConfig.BearerTokenFile,
  39. }
  40. }
  41. // GetNodeData creates a number of goroutines that attempt to access a specified endpoint and return the
  42. // corresponding stats data in slice of interfaces which can be converted into a stricter format.
  43. func (nssc *NodeStatsSummaryClient) GetNodeData() ([]*stats.Summary, error) {
  44. var bearerToken string
  45. if !nssc.config.ProxyConfig.IsLocalProxy() {
  46. token, err := nssc.loadBearerToken()
  47. if err != nil {
  48. return nil, err
  49. }
  50. bearerToken = token
  51. }
  52. size := nssc.config.ConcurrentPollers
  53. nodes := getReadyNodes(nssc.cache)
  54. work := func(n *clustercache.Node) *stats.Summary {
  55. if n.SpecProviderID == "" {
  56. log.Warnf("node ProviderID not set, skipping for %s", n.Name)
  57. return nil
  58. }
  59. connections := nssc.connectionOptions(n)
  60. resp, err := requestNodeData(connections, nssc.endpoint, bearerToken)
  61. if err != nil {
  62. log.Warnf("error retrieving node data: %s", err)
  63. return nil
  64. }
  65. data, err := nodeResponseToStatSummary(resp)
  66. if err != nil {
  67. log.Warnf("error converting node data: %s", err)
  68. return nil
  69. }
  70. return data
  71. }
  72. return worker.ConcurrentCollectWith(size, work, nodes), nil
  73. }
  74. // connectionOptions returns the connection methods that are allowed for this node based on config
  75. // settings and cluster composition
  76. func (nssc *NodeStatsSummaryClient) connectionOptions(n *clustercache.Node) []*NodeHttpConnection {
  77. var connections []*NodeHttpConnection
  78. clusterHostURL := nssc.clusterHostUrl
  79. if nssc.config.ProxyConfig.IsLocalProxy() {
  80. clusterHostURL = nssc.config.ProxyConfig.LocalProxy
  81. }
  82. proxyFormatter := NewNodeProxyFormatter(clusterHostURL, n.Name)
  83. connections = append(connections, NewNodeHttpConnection(nssc.proxyClient, proxyFormatter))
  84. // Do not allow direct connection to fargate nodes
  85. if !nssc.config.ProxyConfig.ForceKubeProxy && !isFargateNode(n) {
  86. directFormatter, err := NewDirectNodeFormatterFrom(n)
  87. if err != nil {
  88. log.Warnf("error reaching direct node api %s", err)
  89. } else {
  90. connections = append(connections, NewNodeHttpConnection(nssc.directClient, directFormatter))
  91. }
  92. }
  93. return connections
  94. }
  95. // Note: These functions are client-independent and can be reused within another function
  96. // for a different datasource using the same config
  97. type nodeFetchData struct {
  98. nodeName string
  99. ClusterHostURL string
  100. }
  101. // requestNodeData fetches summary and container data for the node
  102. func requestNodeData(connections []*NodeHttpConnection, endpoint string, bearerToken string) (*http.Response, error) {
  103. var errs []error
  104. // Fail after trying all connections the alloted number of retries
  105. for _, connection := range connections {
  106. data, err := connection.AttemptEndPoint(http.MethodGet, endpoint, bearerToken)
  107. if err == nil {
  108. return data, err
  109. }
  110. // otherwise, append the error to the list
  111. errs = append(errs, fmt.Errorf("error retrieving node data from %s: %w", connection.formatter.FormatEndpoint(endpoint), err))
  112. }
  113. return nil, fmt.Errorf("problem getting node address: %v\n%w", endpoint, errors.Join(errs...))
  114. }
  115. // isFargateNode detects if it is a fargate node, disallowing direct connections
  116. func isFargateNode(n *clustercache.Node) bool {
  117. v := n.Labels["eks.amazonaws.com/compute-type"]
  118. if v == "fargate" {
  119. log.Warnf("Fargate node found: %s", n.Name)
  120. return true
  121. }
  122. return false
  123. }
  124. // getReadyNodes returns all nodes from a cache that have the ready status
  125. func getReadyNodes(cache clustercache.ClusterCache) []*clustercache.Node {
  126. nodes := cache.GetAllNodes()
  127. var readyNodes []*clustercache.Node
  128. for _, n := range nodes {
  129. nc := getNodeCondition(&n.Status, v1.NodeReady)
  130. if nc != nil && nc.Type == v1.NodeReady {
  131. readyNodes = append(readyNodes, n)
  132. }
  133. }
  134. if len(readyNodes) == 0 {
  135. log.Warnf("no ready nodes were found")
  136. return nil
  137. }
  138. numReadyNodes := len(readyNodes)
  139. numTotalNodes := len(nodes)
  140. if numReadyNodes != numTotalNodes {
  141. log.Warnf("%v out of %v were in a not ready state when retrieving nodes", numTotalNodes-numReadyNodes, numTotalNodes)
  142. }
  143. return readyNodes
  144. }
  145. // getNodeCondition extracts the provided condition from the given status and returns that, nil if not present.
  146. func getNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) *v1.NodeCondition {
  147. if status == nil {
  148. return nil
  149. }
  150. for i := range status.Conditions {
  151. if status.Conditions[i].Type == conditionType {
  152. return &status.Conditions[i]
  153. }
  154. }
  155. return nil
  156. }
  157. // NodeAddress returns the internal IP address and kubelet port of a given node
  158. func NodeAddress(node *clustercache.Node) (string, int32, error) {
  159. // adapted from k8s.io/kubernetes/pkg/util/node
  160. for _, addr := range node.Status.Addresses {
  161. if addr.Type == v1.NodeInternalIP {
  162. return addr.Address, node.Status.DaemonEndpoints.KubeletEndpoint.Port, nil
  163. }
  164. }
  165. return "", 0, fmt.Errorf("could not find internal IP address for node %s ", node.Name)
  166. }
  167. func nodeResponseToStatSummary(resp *http.Response) (*stats.Summary, error) {
  168. data := &stats.Summary{}
  169. err := json.NewDecoder(resp.Body).Decode(&data)
  170. if err == nil {
  171. return data, nil
  172. }
  173. return nil, err
  174. }
  175. // loadBearerToken reads the service account token
  176. func (nssc *NodeStatsSummaryClient) loadBearerToken() (string, error) {
  177. token, err := os.ReadFile(nssc.bearerTokenFile)
  178. if err != nil {
  179. return "", fmt.Errorf("could not read bearer token from file")
  180. }
  181. return string(token), nil
  182. }