nodestats.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package nodestats
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "os"
  9. "sync"
  10. "github.com/opencost/opencost/core/pkg/clustercache"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/util/worker"
  13. v1 "k8s.io/api/core/v1"
  14. "k8s.io/client-go/rest"
  15. stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
  16. )
  17. type StatSummaryClient interface {
  18. GetNodeData() ([]*stats.Summary, error)
  19. }
  20. // NodeStatsSummaryClient is a client used to retrieve node and container stats summaries from a Kubernetes cluster,
  21. // via communicating with the kubelet API on each node.
  22. type NodeStatsSummaryClient struct {
  23. config *NodeClientConfig
  24. directClient *NodeHttpClient
  25. proxyClient *NodeHttpClient
  26. cache clustercache.ClusterCache
  27. endpoint string
  28. clusterHostUrl string
  29. bearerTokenFile string
  30. }
  31. // NewNodeStatsSummaryClient creates a new NodeStatsSummaryClient with the provided configuration and in-cluster config.
  32. func NewNodeStatsSummaryClient(cache clustercache.ClusterCache, config *NodeClientConfig, inClusterConfig *rest.Config) *NodeStatsSummaryClient {
  33. return &NodeStatsSummaryClient{
  34. config: config,
  35. directClient: NewNodeHttpClient(&http.Client{Transport: config.Transport}),
  36. proxyClient: NewNodeHttpClient(&http.Client{Transport: config.Transport}),
  37. cache: cache,
  38. endpoint: "stats/summary",
  39. clusterHostUrl: inClusterConfig.Host,
  40. bearerTokenFile: inClusterConfig.BearerTokenFile,
  41. }
  42. }
  43. // GetNodeData creates a number of goroutines that attempt to access a specified endpoint and return the
  44. // corresponding stats data in slice of interfaces which can be converted into a stricter format.
  45. func (nssc *NodeStatsSummaryClient) GetNodeData() ([]*stats.Summary, error) {
  46. var bearerToken string
  47. if !nssc.config.ProxyConfig.IsLocalProxy() {
  48. token, err := nssc.loadBearerToken()
  49. if err != nil {
  50. return nil, err
  51. }
  52. bearerToken = token
  53. }
  54. size := nssc.config.ConcurrentPollers
  55. nodes := getReadyNodes(nssc.cache)
  56. var errLock sync.Mutex
  57. var errs []error
  58. work := func(n *clustercache.Node) *stats.Summary {
  59. connections := nssc.connectionOptions(n)
  60. resp, err := requestNodeData(connections, nssc.endpoint, bearerToken)
  61. if err != nil {
  62. errLock.Lock()
  63. errs = append(errs, err)
  64. errLock.Unlock()
  65. log.Warnf("error retrieving node data: %s", err)
  66. return nil
  67. }
  68. data, err := nodeResponseToStatSummary(resp)
  69. if err != nil {
  70. errLock.Lock()
  71. errs = append(errs, err)
  72. errLock.Unlock()
  73. log.Warnf("error converting node data: %s", err)
  74. return nil
  75. }
  76. return data
  77. }
  78. results := worker.ConcurrentCollectWith(size, work, nodes)
  79. // no need to lock, as the concurrent collect blocks until all complete
  80. var err error = nil
  81. if len(errs) > 0 {
  82. err = errors.Join(errs...)
  83. }
  84. return results, err
  85. }
  86. // connectionOptions returns the connection methods that are allowed for this node based on config
  87. // settings and cluster composition
  88. func (nssc *NodeStatsSummaryClient) connectionOptions(n *clustercache.Node) []*NodeHttpConnection {
  89. var connections []*NodeHttpConnection
  90. clusterHostURL := nssc.clusterHostUrl
  91. if nssc.config.ProxyConfig.IsLocalProxy() {
  92. clusterHostURL = nssc.config.ProxyConfig.LocalProxy
  93. }
  94. proxyFormatter := NewNodeProxyFormatter(clusterHostURL, n.Name)
  95. connections = append(connections, NewNodeHttpConnection(nssc.proxyClient, proxyFormatter))
  96. // Do not allow direct connection to fargate nodes
  97. if !nssc.config.ProxyConfig.ForceKubeProxy && !isFargateNode(n) {
  98. directFormatter, err := NewDirectNodeFormatterFrom(n)
  99. if err != nil {
  100. log.Warnf("error reaching direct node api %s", err)
  101. } else {
  102. connections = append(connections, NewNodeHttpConnection(nssc.directClient, directFormatter))
  103. }
  104. }
  105. return connections
  106. }
  107. // Note: These functions are client-independent and can be reused within another function
  108. // for a different datasource using the same config
  109. type nodeFetchData struct {
  110. nodeName string
  111. ClusterHostURL string
  112. }
  113. // requestNodeData fetches summary and container data for the node
  114. func requestNodeData(connections []*NodeHttpConnection, endpoint string, bearerToken string) (*http.Response, error) {
  115. var errs []error
  116. // Fail after trying all connections the alloted number of retries
  117. for _, connection := range connections {
  118. data, err := connection.AttemptEndPoint(http.MethodGet, endpoint, bearerToken)
  119. if err == nil {
  120. return data, err
  121. }
  122. // otherwise, append the error to the list
  123. errs = append(errs, fmt.Errorf("error retrieving node data from %s: %w", connection.formatter.FormatEndpoint(endpoint), err))
  124. }
  125. return nil, fmt.Errorf("problem getting node address: %v\n%w", endpoint, errors.Join(errs...))
  126. }
  127. // isFargateNode detects if it is a fargate node, disallowing direct connections
  128. func isFargateNode(n *clustercache.Node) bool {
  129. v := n.Labels["eks.amazonaws.com/compute-type"]
  130. if v == "fargate" {
  131. log.Warnf("Fargate node found: %s", n.Name)
  132. return true
  133. }
  134. return false
  135. }
  136. // getReadyNodes returns all nodes from a cache that have the ready status
  137. func getReadyNodes(cache clustercache.ClusterCache) []*clustercache.Node {
  138. nodes := cache.GetAllNodes()
  139. var readyNodes []*clustercache.Node
  140. for _, n := range nodes {
  141. nc := getNodeCondition(&n.Status, v1.NodeReady)
  142. if nc != nil && nc.Type == v1.NodeReady {
  143. readyNodes = append(readyNodes, n)
  144. }
  145. }
  146. if len(readyNodes) == 0 {
  147. log.Warnf("no ready nodes were found")
  148. return nil
  149. }
  150. numReadyNodes := len(readyNodes)
  151. numTotalNodes := len(nodes)
  152. if numReadyNodes != numTotalNodes {
  153. log.Warnf("%v out of %v were in a not ready state when retrieving nodes", numTotalNodes-numReadyNodes, numTotalNodes)
  154. }
  155. return readyNodes
  156. }
  157. // getNodeCondition extracts the provided condition from the given status and returns that, nil if not present.
  158. func getNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) *v1.NodeCondition {
  159. if status == nil {
  160. return nil
  161. }
  162. for i := range status.Conditions {
  163. if status.Conditions[i].Type == conditionType {
  164. return &status.Conditions[i]
  165. }
  166. }
  167. return nil
  168. }
  169. // NodeAddress returns the internal IP address and kubelet port of a given node
  170. func NodeAddress(node *clustercache.Node) (string, int32, error) {
  171. // adapted from k8s.io/kubernetes/pkg/util/node
  172. for _, addr := range node.Status.Addresses {
  173. if addr.Type == v1.NodeInternalIP {
  174. return addr.Address, node.Status.DaemonEndpoints.KubeletEndpoint.Port, nil
  175. }
  176. }
  177. return "", 0, fmt.Errorf("could not find internal IP address for node %s ", node.Name)
  178. }
  179. func nodeResponseToStatSummary(resp *http.Response) (*stats.Summary, error) {
  180. if resp == nil || resp.Body == nil {
  181. return nil, fmt.Errorf("response or response body is nil")
  182. }
  183. defer resp.Body.Close()
  184. data := &stats.Summary{}
  185. bytes, err := io.ReadAll(resp.Body)
  186. if err != nil {
  187. return nil, fmt.Errorf("could not read response body: %w", err)
  188. }
  189. err = json.Unmarshal(bytes, data)
  190. if err != nil {
  191. return nil, fmt.Errorf("could not unmarshal response body: %w", err)
  192. }
  193. return data, nil
  194. }
  195. // loadBearerToken reads the service account token
  196. func (nssc *NodeStatsSummaryClient) loadBearerToken() (string, error) {
  197. token, err := os.ReadFile(nssc.bearerTokenFile)
  198. if err != nil {
  199. return "", fmt.Errorf("could not read bearer token from file")
  200. }
  201. return string(token), nil
  202. }