server.go 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265
  1. package mcp
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "encoding/hex"
  6. "fmt"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/go-playground/validator/v10"
  11. "github.com/opencost/opencost/core/pkg/filter"
  12. "github.com/opencost/opencost/core/pkg/filter/allocation"
  13. cloudcostfilter "github.com/opencost/opencost/core/pkg/filter/cloudcost"
  14. "github.com/opencost/opencost/core/pkg/log"
  15. "github.com/opencost/opencost/core/pkg/opencost"
  16. models "github.com/opencost/opencost/pkg/cloud/models"
  17. "github.com/opencost/opencost/pkg/cloudcost"
  18. "github.com/opencost/opencost/pkg/costmodel"
  19. "github.com/opencost/opencost/pkg/env"
  20. )
  21. // QueryType defines the type of query to be executed.
  22. type QueryType string
  23. const (
  24. AllocationQueryType QueryType = "allocation"
  25. AssetQueryType QueryType = "asset"
  26. CloudCostQueryType QueryType = "cloudcost"
  27. EfficiencyQueryType QueryType = "efficiency"
  28. )
  29. // Efficiency calculation constants
  30. const (
  31. efficiencyBufferMultiplier = 1.2 // 20% headroom for stability
  32. efficiencyMinCPU = 0.001 // minimum CPU cores
  33. efficiencyMinRAM = 1024 * 1024 // 1 MB minimum RAM
  34. )
  35. // MCPRequest represents a single turn in a conversation with the OpenCost MCP server.
  36. type MCPRequest struct {
  37. SessionID string `json:"sessionId"`
  38. Query *OpenCostQueryRequest `json:"query"`
  39. }
  40. // MCPResponse is the response from the OpenCost MCP server for a single turn.
  41. type MCPResponse struct {
  42. Data interface{} `json:"data"`
  43. QueryInfo QueryMetadata `json:"queryInfo"`
  44. }
  45. // QueryMetadata contains metadata about the query execution.
  46. type QueryMetadata struct {
  47. QueryID string `json:"queryId"`
  48. Timestamp time.Time `json:"timestamp"`
  49. ProcessingTime time.Duration `json:"processingTime"`
  50. }
  51. // OpenCostQueryRequest provides a unified interface for all OpenCost query types.
  52. type OpenCostQueryRequest struct {
  53. QueryType QueryType `json:"queryType" validate:"required,oneof=allocation asset cloudcost efficiency"`
  54. Window string `json:"window" validate:"required"`
  55. AllocationParams *AllocationQuery `json:"allocationParams,omitempty"`
  56. AssetParams *AssetQuery `json:"assetParams,omitempty"`
  57. CloudCostParams *CloudCostQuery `json:"cloudCostParams,omitempty"`
  58. EfficiencyParams *EfficiencyQuery `json:"efficiencyParams,omitempty"`
  59. }
  60. // AllocationQuery contains the parameters for an allocation query.
  61. type AllocationQuery struct {
  62. Step time.Duration `json:"step,omitempty"`
  63. Accumulate bool `json:"accumulate,omitempty"`
  64. ShareIdle bool `json:"shareIdle,omitempty"`
  65. Aggregate string `json:"aggregate,omitempty"`
  66. IncludeIdle bool `json:"includeIdle,omitempty"`
  67. IdleByNode bool `json:"idleByNode,omitempty"`
  68. IncludeProportionalAssetResourceCosts bool `json:"includeProportionalAssetResourceCosts,omitempty"`
  69. IncludeAggregatedMetadata bool `json:"includeAggregatedMetadata,omitempty"`
  70. ShareLB bool `json:"sharelb,omitempty"`
  71. Filter string `json:"filter,omitempty"` // Filter expression for allocations (e.g., "cluster:production", "namespace:kube-system")
  72. }
  73. // AssetQuery contains the parameters for an asset query.
  74. type AssetQuery struct {
  75. // Currently no specific parameters needed for asset queries as it only takes window as parameter
  76. }
  77. // CloudCostQuery contains the parameters for a cloud cost query.
  78. type CloudCostQuery struct {
  79. Aggregate string `json:"aggregate,omitempty"` // Comma-separated list of aggregation properties
  80. Accumulate string `json:"accumulate,omitempty"` // e.g., "week", "day", "month"
  81. Filter string `json:"filter,omitempty"` // Filter expression for cloud costs
  82. Provider string `json:"provider,omitempty"` // Cloud provider filter (aws, gcp, azure, etc.)
  83. Service string `json:"service,omitempty"` // Service filter (ec2, s3, compute, etc.)
  84. Category string `json:"category,omitempty"` // Category filter (compute, storage, network, etc.)
  85. Region string `json:"region,omitempty"` // Region filter
  86. // Additional explicit fields for filtering
  87. AccountID string `json:"accountID,omitempty"` // Alias of Account; maps to accountID
  88. InvoiceEntityID string `json:"invoiceEntityID,omitempty"` // Invoice entity ID filter
  89. ProviderID string `json:"providerID,omitempty"` // Cloud provider resource ID filter
  90. Labels map[string]string `json:"labels,omitempty"` // Label filters (key->value)
  91. }
  92. // EfficiencyQuery contains the parameters for an efficiency query.
  93. type EfficiencyQuery struct {
  94. Step time.Duration `json:"step,omitempty"` // Query step size; controls peak memory by batching large windows (default: auto-scaled based on window)
  95. Aggregate string `json:"aggregate,omitempty"` // Aggregation properties (e.g., "pod", "namespace", "controller")
  96. Filter string `json:"filter,omitempty"` // Filter expression for allocations (same as AllocationQuery)
  97. EfficiencyBufferMultiplier *float64 `json:"efficiencyBufferMultiplier,omitempty"` // Buffer multiplier for recommendations (default: 1.2 for 20% headroom)
  98. }
  99. // AllocationResponse represents the allocation data returned to the AI agent.
  100. type AllocationResponse struct {
  101. // The allocation data, as a map of allocation sets.
  102. Allocations map[string]*AllocationSet `json:"allocations"`
  103. }
  104. // AllocationSet represents a set of allocation data.
  105. type AllocationSet struct {
  106. // The name of the allocation set.
  107. Name string `json:"name"`
  108. Properties map[string]string `json:"properties"`
  109. Allocations []*Allocation `json:"allocations"`
  110. }
  111. // TotalCost calculates the total cost of all allocations in the set.
  112. func (as *AllocationSet) TotalCost() float64 {
  113. var total float64
  114. for _, alloc := range as.Allocations {
  115. total += alloc.TotalCost
  116. }
  117. return total
  118. }
  119. // Allocation represents a single allocation data point.
  120. type Allocation struct {
  121. Name string `json:"name"` // Allocation key (namespace, cluster, etc.)
  122. CPUCost float64 `json:"cpuCost"` // Cost of CPU usage
  123. GPUCost float64 `json:"gpuCost"` // Cost of GPU usage
  124. RAMCost float64 `json:"ramCost"` // Cost of memory usage
  125. PVCost float64 `json:"pvCost"` // Cost of persistent volumes
  126. NetworkCost float64 `json:"networkCost"` // Cost of network usage
  127. SharedCost float64 `json:"sharedCost"` // Shared/unallocated costs assigned here
  128. ExternalCost float64 `json:"externalCost"` // External costs (cloud services, etc.)
  129. TotalCost float64 `json:"totalCost"` // Sum of all costs above
  130. CPUCoreHours float64 `json:"cpuCoreHours"` // Usage metrics: CPU core-hours
  131. RAMByteHours float64 `json:"ramByteHours"` // Usage metrics: RAM byte-hours
  132. GPUHours float64 `json:"gpuHours"` // Usage metrics: GPU-hours
  133. PVByteHours float64 `json:"pvByteHours"` // Usage metrics: PV byte-hours
  134. Start time.Time `json:"start"` // Start timestamp for this allocation
  135. End time.Time `json:"end"` // End timestamp for this allocation
  136. }
  137. // AssetResponse represents the asset data returned to the AI agent.
  138. type AssetResponse struct {
  139. // The asset data, as a map of asset sets.
  140. Assets map[string]*AssetSet `json:"assets"`
  141. }
  142. // AssetSet represents a set of asset data.
  143. type AssetSet struct {
  144. // The name of the asset set.
  145. Name string `json:"name"`
  146. // The asset data for the set.
  147. Assets []*Asset `json:"assets"`
  148. }
  149. // Asset represents a single asset data point.
  150. type Asset struct {
  151. Type string `json:"type"`
  152. Properties AssetProperties `json:"properties"`
  153. Labels map[string]string `json:"labels,omitempty"`
  154. Start time.Time `json:"start"`
  155. End time.Time `json:"end"`
  156. Minutes float64 `json:"minutes"`
  157. Adjustment float64 `json:"adjustment"`
  158. TotalCost float64 `json:"totalCost"`
  159. // Disk-specific fields
  160. ByteHours float64 `json:"byteHours,omitempty"`
  161. ByteHoursUsed *float64 `json:"byteHoursUsed,omitempty"`
  162. ByteUsageMax *float64 `json:"byteUsageMax,omitempty"`
  163. StorageClass string `json:"storageClass,omitempty"`
  164. VolumeName string `json:"volumeName,omitempty"`
  165. ClaimName string `json:"claimName,omitempty"`
  166. ClaimNamespace string `json:"claimNamespace,omitempty"`
  167. Local float64 `json:"local,omitempty"`
  168. // Node-specific fields
  169. NodeType string `json:"nodeType,omitempty"`
  170. CPUCoreHours float64 `json:"cpuCoreHours,omitempty"`
  171. RAMByteHours float64 `json:"ramByteHours,omitempty"`
  172. GPUHours float64 `json:"gpuHours,omitempty"`
  173. GPUCount float64 `json:"gpuCount,omitempty"`
  174. CPUCost float64 `json:"cpuCost,omitempty"`
  175. GPUCost float64 `json:"gpuCost,omitempty"`
  176. RAMCost float64 `json:"ramCost,omitempty"`
  177. Discount float64 `json:"discount,omitempty"`
  178. Preemptible float64 `json:"preemptible,omitempty"`
  179. // Breakdown fields (can be used for different types)
  180. Breakdown *AssetBreakdown `json:"breakdown,omitempty"`
  181. CPUBreakdown *AssetBreakdown `json:"cpuBreakdown,omitempty"`
  182. RAMBreakdown *AssetBreakdown `json:"ramBreakdown,omitempty"`
  183. // Overhead (Node-specific)
  184. Overhead *NodeOverhead `json:"overhead,omitempty"`
  185. // LoadBalancer-specific fields
  186. Private bool `json:"private,omitempty"`
  187. Ip string `json:"ip,omitempty"`
  188. // Cloud-specific fields
  189. Credit float64 `json:"credit,omitempty"`
  190. }
  191. // NodeOverhead represents node overhead information
  192. type NodeOverhead struct {
  193. RamOverheadFraction float64 `json:"ramOverheadFraction"`
  194. CpuOverheadFraction float64 `json:"cpuOverheadFraction"`
  195. OverheadCostFraction float64 `json:"overheadCostFraction"`
  196. }
  197. type AssetProperties struct {
  198. Category string `json:"category,omitempty"`
  199. Provider string `json:"provider,omitempty"`
  200. Account string `json:"account,omitempty"`
  201. Project string `json:"project,omitempty"`
  202. Service string `json:"service,omitempty"`
  203. Cluster string `json:"cluster,omitempty"`
  204. Name string `json:"name,omitempty"`
  205. ProviderID string `json:"providerID,omitempty"`
  206. }
  207. type AssetBreakdown struct {
  208. Idle float64 `json:"idle"`
  209. Other float64 `json:"other"`
  210. System float64 `json:"system"`
  211. User float64 `json:"user"`
  212. }
  213. // CloudCostResponse represents the cloud cost data returned to the AI agent.
  214. type CloudCostResponse struct {
  215. // The cloud cost data, as a map of cloud cost sets.
  216. CloudCosts map[string]*CloudCostSet `json:"cloudCosts"`
  217. // Summary information
  218. Summary *CloudCostSummary `json:"summary,omitempty"`
  219. }
  220. // CloudCostSummary provides summary information about cloud costs
  221. type CloudCostSummary struct {
  222. TotalNetCost float64 `json:"totalNetCost"`
  223. TotalAmortizedCost float64 `json:"totalAmortizedCost"`
  224. TotalInvoicedCost float64 `json:"totalInvoicedCost"`
  225. KubernetesPercent float64 `json:"kubernetesPercent"`
  226. ProviderBreakdown map[string]float64 `json:"providerBreakdown,omitempty"`
  227. ServiceBreakdown map[string]float64 `json:"serviceBreakdown,omitempty"`
  228. RegionBreakdown map[string]float64 `json:"regionBreakdown,omitempty"`
  229. }
  230. // CloudCostSet represents a set of cloud cost data.
  231. type CloudCostSet struct {
  232. // The name of the cloud cost set.
  233. Name string `json:"name"`
  234. // The cloud cost data for the set.
  235. CloudCosts []*CloudCost `json:"cloudCosts"`
  236. // Aggregation information
  237. AggregationProperties []string `json:"aggregationProperties,omitempty"`
  238. // Time window
  239. Window *TimeWindow `json:"window,omitempty"`
  240. }
  241. // TimeWindow represents a time range
  242. type TimeWindow struct {
  243. Start time.Time `json:"start"`
  244. End time.Time `json:"end"`
  245. }
  246. // CloudCostProperties defines the properties of a cloud cost item.
  247. type CloudCostProperties struct {
  248. ProviderID string `json:"providerID,omitempty"`
  249. Provider string `json:"provider,omitempty"`
  250. AccountID string `json:"accountID,omitempty"`
  251. AccountName string `json:"accountName,omitempty"`
  252. InvoiceEntityID string `json:"invoiceEntityID,omitempty"`
  253. InvoiceEntityName string `json:"invoiceEntityName,omitempty"`
  254. RegionID string `json:"regionID,omitempty"`
  255. AvailabilityZone string `json:"availabilityZone,omitempty"`
  256. Service string `json:"service,omitempty"`
  257. Category string `json:"category,omitempty"`
  258. Labels map[string]string `json:"labels,omitempty"`
  259. }
  260. // CloudCost represents a single cloud cost data point.
  261. type CloudCost struct {
  262. Properties CloudCostProperties `json:"properties"`
  263. Window TimeWindow `json:"window"`
  264. ListCost CostMetric `json:"listCost"`
  265. NetCost CostMetric `json:"netCost"`
  266. AmortizedNetCost CostMetric `json:"amortizedNetCost"`
  267. InvoicedCost CostMetric `json:"invoicedCost"`
  268. AmortizedCost CostMetric `json:"amortizedCost"`
  269. }
  270. // CostMetric represents a cost value with Kubernetes percentage
  271. type CostMetric struct {
  272. Cost float64 `json:"cost"`
  273. KubernetesPercent float64 `json:"kubernetesPercent"`
  274. }
  275. // EfficiencyResponse represents the efficiency data returned to the AI agent.
  276. type EfficiencyResponse struct {
  277. Efficiencies []*EfficiencyMetric `json:"efficiencies"`
  278. }
  279. // EfficiencyMetric represents efficiency data for a single pod/workload.
  280. type EfficiencyMetric struct {
  281. Name string `json:"name"` // Pod/namespace/controller name based on aggregation
  282. // Current state
  283. CPUEfficiency float64 `json:"cpuEfficiency"` // Usage / Request ratio (0-1+)
  284. MemoryEfficiency float64 `json:"memoryEfficiency"` // Usage / Request ratio (0-1+)
  285. // Current requests and usage
  286. CPUCoresRequested float64 `json:"cpuCoresRequested"`
  287. CPUCoresUsed float64 `json:"cpuCoresUsed"`
  288. RAMBytesRequested float64 `json:"ramBytesRequested"`
  289. RAMBytesUsed float64 `json:"ramBytesUsed"`
  290. // Recommendations (based on actual usage with buffer)
  291. RecommendedCPURequest float64 `json:"recommendedCpuRequest"` // Recommended CPU cores
  292. RecommendedRAMRequest float64 `json:"recommendedRamRequest"` // Recommended RAM bytes
  293. // Resulting efficiency after applying recommendations
  294. ResultingCPUEfficiency float64 `json:"resultingCpuEfficiency"`
  295. ResultingMemoryEfficiency float64 `json:"resultingMemoryEfficiency"`
  296. // Cost analysis
  297. CurrentTotalCost float64 `json:"currentTotalCost"` // Current total cost
  298. RecommendedCost float64 `json:"recommendedCost"` // Estimated cost with recommendations
  299. CostSavings float64 `json:"costSavings"` // Potential savings
  300. CostSavingsPercent float64 `json:"costSavingsPercent"` // Savings as percentage
  301. // Buffer multiplier used for recommendations
  302. EfficiencyBufferMultiplier float64 `json:"efficiencyBufferMultiplier"` // Buffer multiplier applied (e.g., 1.2 for 20% headroom)
  303. // Time window
  304. Start time.Time `json:"start"`
  305. End time.Time `json:"end"`
  306. }
  307. // MCPServer holds the dependencies for the MCP API server.
  308. type MCPServer struct {
  309. costModel *costmodel.CostModel
  310. provider models.Provider
  311. cloudQuerier cloudcost.Querier
  312. }
  313. // NewMCPServer creates a new MCP Server.
  314. func NewMCPServer(costModel *costmodel.CostModel, provider models.Provider, cloudQuerier cloudcost.Querier) *MCPServer {
  315. return &MCPServer{
  316. costModel: costModel,
  317. provider: provider,
  318. cloudQuerier: cloudQuerier,
  319. }
  320. }
  321. // ProcessMCPRequest processes an MCP request and returns an MCP response.
  322. // It accepts a context for proper timeout handling and cancellation.
  323. func (s *MCPServer) ProcessMCPRequest(ctx context.Context, request *MCPRequest) (*MCPResponse, error) {
  324. // 1. Validate Request
  325. if err := validate.Struct(request); err != nil {
  326. return nil, fmt.Errorf("validation failed: %w", err)
  327. }
  328. // 2. Query Dispatching
  329. var data interface{}
  330. var err error
  331. queryStart := time.Now()
  332. switch request.Query.QueryType {
  333. case AllocationQueryType:
  334. data, err = s.QueryAllocations(request.Query)
  335. case AssetQueryType:
  336. data, err = s.QueryAssets(request.Query)
  337. case CloudCostQueryType:
  338. data, err = s.QueryCloudCosts(ctx, request.Query)
  339. case EfficiencyQueryType:
  340. data, err = s.QueryEfficiency(request.Query)
  341. default:
  342. return nil, fmt.Errorf("unsupported query type: %s", request.Query.QueryType)
  343. }
  344. if err != nil {
  345. // Handle error appropriately, maybe return a JSON-RPC error response
  346. return nil, err
  347. }
  348. processingTime := time.Since(queryStart)
  349. // 3. Construct Final Response
  350. mcpResponse := &MCPResponse{
  351. Data: data,
  352. QueryInfo: QueryMetadata{
  353. QueryID: generateQueryID(),
  354. Timestamp: time.Now(),
  355. ProcessingTime: processingTime,
  356. },
  357. }
  358. return mcpResponse, nil
  359. }
  360. // validate is the singleton validator instance.
  361. var validate = validator.New()
  362. func generateQueryID() string {
  363. bytes := make([]byte, 8) // 16 hex characters
  364. if _, err := rand.Read(bytes); err != nil {
  365. // Fallback to timestamp-based ID if crypto/rand fails
  366. return fmt.Sprintf("query-%d", time.Now().UnixNano())
  367. }
  368. return fmt.Sprintf("query-%s", hex.EncodeToString(bytes))
  369. }
  370. func (s *MCPServer) QueryAllocations(query *OpenCostQueryRequest) (*AllocationResponse, error) {
  371. // 1. Parse Window
  372. window, err := opencost.ParseWindowWithOffset(query.Window, 0) // 0 offset for UTC
  373. if err != nil {
  374. return nil, fmt.Errorf("failed to parse window '%s': %w", query.Window, err)
  375. }
  376. // 2. Set default parameters
  377. var step time.Duration
  378. var aggregateBy []string
  379. var includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer, shareIdle bool
  380. var accumulateBy opencost.AccumulateOption
  381. var filterString string
  382. // 3. Parse allocation parameters if provided
  383. if query.AllocationParams != nil {
  384. // Set step duration (default to window duration if not specified)
  385. if query.AllocationParams.Step > 0 {
  386. step = query.AllocationParams.Step
  387. } else {
  388. step = window.Duration()
  389. }
  390. // Parse aggregation properties
  391. if query.AllocationParams.Aggregate != "" {
  392. aggregateBy = strings.Split(query.AllocationParams.Aggregate, ",")
  393. }
  394. // Set boolean parameters
  395. includeIdle = query.AllocationParams.IncludeIdle
  396. idleByNode = query.AllocationParams.IdleByNode
  397. includeProportionalAssetResourceCosts = query.AllocationParams.IncludeProportionalAssetResourceCosts
  398. includeAggregatedMetadata = query.AllocationParams.IncludeAggregatedMetadata
  399. sharedLoadBalancer = query.AllocationParams.ShareLB
  400. shareIdle = query.AllocationParams.ShareIdle
  401. // Set filter string
  402. filterString = query.AllocationParams.Filter
  403. // Validate filter string if provided
  404. if filterString != "" {
  405. parser := allocation.NewAllocationFilterParser()
  406. _, err := parser.Parse(filterString)
  407. if err != nil {
  408. return nil, fmt.Errorf("invalid allocation filter '%s': %w", filterString, err)
  409. }
  410. }
  411. // Set accumulation option
  412. if query.AllocationParams.Accumulate {
  413. accumulateBy = opencost.AccumulateOptionAll
  414. } else {
  415. accumulateBy = opencost.AccumulateOptionNone
  416. }
  417. } else {
  418. // Default values when no parameters provided
  419. step = window.Duration()
  420. accumulateBy = opencost.AccumulateOptionNone
  421. filterString = ""
  422. }
  423. // 4. Call the existing QueryAllocation function with all parameters
  424. asr, err := s.costModel.QueryAllocation(
  425. window,
  426. step,
  427. aggregateBy,
  428. includeIdle,
  429. idleByNode,
  430. includeProportionalAssetResourceCosts,
  431. includeAggregatedMetadata,
  432. sharedLoadBalancer,
  433. accumulateBy,
  434. shareIdle,
  435. filterString,
  436. )
  437. if err != nil {
  438. return nil, fmt.Errorf("failed to query allocations: %w", err)
  439. }
  440. // 5. Handle the AllocationSetRange result
  441. if asr == nil || len(asr.Allocations) == 0 {
  442. return &AllocationResponse{
  443. Allocations: make(map[string]*AllocationSet),
  444. }, nil
  445. }
  446. // 6. Transform the result to MCP format
  447. // If we have multiple sets, we'll combine them or return the first one
  448. // For now, let's return the first allocation set
  449. firstSet := asr.Allocations[0]
  450. return transformAllocationSet(firstSet), nil
  451. }
  452. // transformAllocationSet converts an opencost.AllocationSet into the MCP's AllocationResponse format.
  453. func transformAllocationSet(allocSet *opencost.AllocationSet) *AllocationResponse {
  454. if allocSet == nil {
  455. return &AllocationResponse{Allocations: make(map[string]*AllocationSet)}
  456. }
  457. mcpAllocations := make(map[string]*AllocationSet)
  458. // Create a single set for all allocations
  459. mcpSet := &AllocationSet{
  460. Name: "allocations",
  461. Allocations: []*Allocation{},
  462. }
  463. // Convert each allocation
  464. for _, alloc := range allocSet.Allocations {
  465. if alloc == nil {
  466. continue
  467. }
  468. mcpAlloc := &Allocation{
  469. Name: alloc.Name,
  470. CPUCost: alloc.CPUCost,
  471. GPUCost: alloc.GPUCost,
  472. RAMCost: alloc.RAMCost,
  473. PVCost: alloc.PVCost(), // Call the method
  474. NetworkCost: alloc.NetworkCost,
  475. SharedCost: alloc.SharedCost,
  476. ExternalCost: alloc.ExternalCost,
  477. TotalCost: alloc.TotalCost(),
  478. CPUCoreHours: alloc.CPUCoreHours,
  479. RAMByteHours: alloc.RAMByteHours,
  480. GPUHours: alloc.GPUHours,
  481. PVByteHours: alloc.PVBytes(), // Use the method directly
  482. Start: alloc.Start,
  483. End: alloc.End,
  484. }
  485. mcpSet.Allocations = append(mcpSet.Allocations, mcpAlloc)
  486. }
  487. mcpAllocations["allocations"] = mcpSet
  488. return &AllocationResponse{
  489. Allocations: mcpAllocations,
  490. }
  491. }
  492. func (s *MCPServer) QueryAssets(query *OpenCostQueryRequest) (*AssetResponse, error) {
  493. // 1. Parse Window
  494. window, err := opencost.ParseWindowWithOffset(query.Window, 0) // 0 offset for UTC
  495. if err != nil {
  496. return nil, fmt.Errorf("failed to parse window '%s': %w", query.Window, err)
  497. }
  498. // 2. Set Query Options
  499. start := *window.Start()
  500. end := *window.End()
  501. // 3. Call CostModel to get the asset set
  502. assetSet, err := s.costModel.ComputeAssets(start, end)
  503. if err != nil {
  504. return nil, fmt.Errorf("failed to compute assets: %w", err)
  505. }
  506. // 4. Transform Response for the MCP API
  507. return transformAssetSet(assetSet), nil
  508. }
  509. // transformAssetSet converts a opencost.AssetSet into the MCP's AssetResponse format.
  510. func transformAssetSet(assetSet *opencost.AssetSet) *AssetResponse {
  511. if assetSet == nil {
  512. return &AssetResponse{Assets: make(map[string]*AssetSet)}
  513. }
  514. mcpAssets := make(map[string]*AssetSet)
  515. // Create a single set for all assets
  516. mcpSet := &AssetSet{
  517. Name: "assets",
  518. Assets: []*Asset{},
  519. }
  520. for _, asset := range assetSet.Assets {
  521. if asset == nil {
  522. continue
  523. }
  524. properties := asset.GetProperties()
  525. labels := asset.GetLabels()
  526. mcpAsset := &Asset{
  527. Type: asset.Type().String(),
  528. Properties: AssetProperties{
  529. Category: properties.Category,
  530. Provider: properties.Provider,
  531. Account: properties.Account,
  532. Project: properties.Project,
  533. Service: properties.Service,
  534. Cluster: properties.Cluster,
  535. Name: properties.Name,
  536. ProviderID: properties.ProviderID,
  537. },
  538. Labels: labels,
  539. Start: asset.GetStart(),
  540. End: asset.GetEnd(),
  541. Minutes: asset.Minutes(),
  542. Adjustment: asset.GetAdjustment(),
  543. TotalCost: asset.TotalCost(),
  544. }
  545. // Handle type-specific fields
  546. switch a := asset.(type) {
  547. case *opencost.Disk:
  548. mcpAsset.ByteHours = a.ByteHours
  549. mcpAsset.ByteHoursUsed = a.ByteHoursUsed
  550. mcpAsset.ByteUsageMax = a.ByteUsageMax
  551. mcpAsset.StorageClass = a.StorageClass
  552. mcpAsset.VolumeName = a.VolumeName
  553. mcpAsset.ClaimName = a.ClaimName
  554. mcpAsset.ClaimNamespace = a.ClaimNamespace
  555. mcpAsset.Local = a.Local
  556. if a.Breakdown != nil {
  557. mcpAsset.Breakdown = &AssetBreakdown{
  558. Idle: a.Breakdown.Idle,
  559. Other: a.Breakdown.Other,
  560. System: a.Breakdown.System,
  561. User: a.Breakdown.User,
  562. }
  563. }
  564. case *opencost.Node:
  565. mcpAsset.NodeType = a.NodeType
  566. mcpAsset.CPUCoreHours = a.CPUCoreHours
  567. mcpAsset.RAMByteHours = a.RAMByteHours
  568. mcpAsset.GPUHours = a.GPUHours
  569. mcpAsset.GPUCount = a.GPUCount
  570. mcpAsset.CPUCost = a.CPUCost
  571. mcpAsset.GPUCost = a.GPUCost
  572. mcpAsset.RAMCost = a.RAMCost
  573. mcpAsset.Discount = a.Discount
  574. mcpAsset.Preemptible = a.Preemptible
  575. if a.CPUBreakdown != nil {
  576. mcpAsset.CPUBreakdown = &AssetBreakdown{
  577. Idle: a.CPUBreakdown.Idle,
  578. Other: a.CPUBreakdown.Other,
  579. System: a.CPUBreakdown.System,
  580. User: a.CPUBreakdown.User,
  581. }
  582. }
  583. if a.RAMBreakdown != nil {
  584. mcpAsset.RAMBreakdown = &AssetBreakdown{
  585. Idle: a.RAMBreakdown.Idle,
  586. Other: a.RAMBreakdown.Other,
  587. System: a.RAMBreakdown.System,
  588. User: a.RAMBreakdown.User,
  589. }
  590. }
  591. if a.Overhead != nil {
  592. mcpAsset.Overhead = &NodeOverhead{
  593. RamOverheadFraction: a.Overhead.RamOverheadFraction,
  594. CpuOverheadFraction: a.Overhead.CpuOverheadFraction,
  595. OverheadCostFraction: a.Overhead.OverheadCostFraction,
  596. }
  597. }
  598. case *opencost.LoadBalancer:
  599. mcpAsset.Private = a.Private
  600. mcpAsset.Ip = a.Ip
  601. case *opencost.Network:
  602. // Network assets have no specific fields beyond the base asset structure
  603. // All relevant data is in Properties, Labels, Cost, etc.
  604. case *opencost.Cloud:
  605. mcpAsset.Credit = a.Credit
  606. case *opencost.ClusterManagement:
  607. // ClusterManagement assets have no specific fields beyond the base asset structure
  608. // All relevant data is in Properties, Labels, Cost, etc.
  609. }
  610. mcpSet.Assets = append(mcpSet.Assets, mcpAsset)
  611. }
  612. mcpAssets["assets"] = mcpSet
  613. return &AssetResponse{
  614. Assets: mcpAssets,
  615. }
  616. }
  617. // QueryCloudCosts translates an MCP query into a CloudCost repository query and transforms the result.
  618. // The ctx parameter is used for timeout and cancellation handling of the cloud cost query.
  619. func (s *MCPServer) QueryCloudCosts(ctx context.Context, query *OpenCostQueryRequest) (*CloudCostResponse, error) {
  620. // 1. Check if cloud cost querier is available
  621. if s.cloudQuerier == nil {
  622. return nil, fmt.Errorf("cloud cost querier not configured - check cloud-integration.json file")
  623. }
  624. // 2. Parse Window
  625. window, err := opencost.ParseWindowWithOffset(query.Window, 0) // 0 offset for UTC
  626. if err != nil {
  627. return nil, fmt.Errorf("failed to parse window '%s': %w", query.Window, err)
  628. }
  629. // 3. Build query request
  630. request := cloudcost.QueryRequest{
  631. Start: *window.Start(),
  632. End: *window.End(),
  633. Filter: nil, // Will be set from CloudCostParams if provided
  634. }
  635. // 4. Apply filtering and aggregation from CloudCostParams
  636. if query.CloudCostParams != nil {
  637. request = s.buildCloudCostQueryRequest(request, query.CloudCostParams)
  638. }
  639. // 5. Create a timeout context for the query with configured timeout
  640. queryTimeout := env.GetMCPQueryTimeout()
  641. queryCtx, cancel := context.WithTimeout(ctx, queryTimeout)
  642. defer cancel()
  643. // 6. Query the repository (this handles multiple cloud providers automatically)
  644. ccsr, err := s.cloudQuerier.Query(queryCtx, request)
  645. if err != nil {
  646. return nil, fmt.Errorf("failed to query cloud costs: %w", err)
  647. }
  648. // 7. Transform Response
  649. return transformCloudCostSetRange(ccsr), nil
  650. }
  651. // buildCloudCostQueryRequest builds a QueryRequest from CloudCostParams
  652. func (s *MCPServer) buildCloudCostQueryRequest(request cloudcost.QueryRequest, params *CloudCostQuery) cloudcost.QueryRequest {
  653. // Set aggregation
  654. if params.Aggregate != "" {
  655. aggregateBy := strings.Split(params.Aggregate, ",")
  656. request.AggregateBy = aggregateBy
  657. }
  658. // Set accumulation
  659. if params.Accumulate != "" {
  660. request.Accumulate = opencost.ParseAccumulate(params.Accumulate)
  661. }
  662. // Build filter from individual parameters or filter string
  663. var filter filter.Filter
  664. var err error
  665. if params.Filter != "" {
  666. // Parse the filter string directly
  667. parser := cloudcostfilter.NewCloudCostFilterParser()
  668. filter, err = parser.Parse(params.Filter)
  669. if err != nil {
  670. // Log error but continue without filter rather than failing the entire request
  671. log.Warnf("failed to parse filter string '%s': %v", params.Filter, err)
  672. }
  673. } else {
  674. // Build filter from individual parameters
  675. filter = s.buildFilterFromParams(params)
  676. }
  677. request.Filter = filter
  678. return request
  679. }
  680. // buildFilterFromParams creates a filter from individual CloudCostQuery parameters
  681. func (s *MCPServer) buildFilterFromParams(params *CloudCostQuery) filter.Filter {
  682. var filterParts []string
  683. // Add provider filter
  684. if params.Provider != "" {
  685. filterParts = append(filterParts, fmt.Sprintf(`provider:"%s"`, params.Provider))
  686. }
  687. // Add providerID filter
  688. if params.ProviderID != "" {
  689. filterParts = append(filterParts, fmt.Sprintf(`providerID:"%s"`, params.ProviderID))
  690. }
  691. // Add service filter
  692. if params.Service != "" {
  693. filterParts = append(filterParts, fmt.Sprintf(`service:"%s"`, params.Service))
  694. }
  695. // Add category filter
  696. if params.Category != "" {
  697. filterParts = append(filterParts, fmt.Sprintf(`category:"%s"`, params.Category))
  698. }
  699. // Region is intentionally not supported here
  700. // Add account filter (maps to accountID)
  701. if params.AccountID != "" {
  702. filterParts = append(filterParts, fmt.Sprintf(`accountID:"%s"`, params.AccountID))
  703. }
  704. // Add invoiceEntityID filter
  705. if params.InvoiceEntityID != "" {
  706. filterParts = append(filterParts, fmt.Sprintf(`invoiceEntityID:"%s"`, params.InvoiceEntityID))
  707. }
  708. // Add label filters (label[key]:"value")
  709. if len(params.Labels) > 0 {
  710. for k, v := range params.Labels {
  711. if k == "" {
  712. continue
  713. }
  714. filterParts = append(filterParts, fmt.Sprintf(`label[%s]:"%s"`, k, v))
  715. }
  716. }
  717. // If no filters specified, return nil
  718. if len(filterParts) == 0 {
  719. return nil
  720. }
  721. // Combine all filter parts with AND logic (parser expects 'and')
  722. filterString := strings.Join(filterParts, " and ")
  723. // Parse the combined filter string
  724. parser := cloudcostfilter.NewCloudCostFilterParser()
  725. filter, err := parser.Parse(filterString)
  726. if err != nil {
  727. // Log error but return nil rather than failing
  728. log.Warnf("failed to parse combined filter '%s': %v", filterString, err)
  729. return nil
  730. }
  731. return filter
  732. }
  733. // transformCloudCostSetRange converts a opencost.CloudCostSetRange into the MCP's CloudCostResponse format.
  734. func transformCloudCostSetRange(ccsr *opencost.CloudCostSetRange) *CloudCostResponse {
  735. if ccsr == nil || len(ccsr.CloudCostSets) == 0 {
  736. return &CloudCostResponse{
  737. CloudCosts: make(map[string]*CloudCostSet),
  738. Summary: &CloudCostSummary{
  739. TotalNetCost: 0,
  740. },
  741. }
  742. }
  743. mcpCloudCosts := make(map[string]*CloudCostSet)
  744. var totalNetCost, totalAmortizedCost, totalInvoicedCost float64
  745. providerBreakdown := make(map[string]float64)
  746. serviceBreakdown := make(map[string]float64)
  747. regionBreakdown := make(map[string]float64)
  748. // Process each cloud cost set in the range
  749. for i, ccSet := range ccsr.CloudCostSets {
  750. if ccSet == nil {
  751. log.Warnf("transformCloudCostSetRange: skipping nil CloudCostSet at index %d", i)
  752. continue
  753. }
  754. // Check for nil Window or nil Start/End pointers before dereferencing
  755. if ccSet.Window.Start() == nil || ccSet.Window.End() == nil {
  756. log.Warnf("transformCloudCostSetRange: skipping CloudCostSet at index %d with invalid window (start=%v, end=%v)", i, ccSet.Window.Start(), ccSet.Window.End())
  757. continue
  758. }
  759. setName := fmt.Sprintf("cloudcosts_%d", i)
  760. mcpSet := &CloudCostSet{
  761. Name: setName,
  762. CloudCosts: []*CloudCost{},
  763. AggregationProperties: ccSet.AggregationProperties,
  764. Window: &TimeWindow{
  765. Start: *ccSet.Window.Start(),
  766. End: *ccSet.Window.End(),
  767. },
  768. }
  769. // Convert each cloud cost item
  770. for _, item := range ccSet.CloudCosts {
  771. if item == nil {
  772. log.Warnf("transformCloudCostSetRange: skipping nil CloudCost item in set %s", setName)
  773. continue
  774. }
  775. // Check for nil Window or nil Start/End pointers on the item
  776. if item.Window.Start() == nil || item.Window.End() == nil {
  777. log.Warnf("transformCloudCostSetRange: skipping CloudCost item with invalid window (start=%v, end=%v) in set %s", item.Window.Start(), item.Window.End(), setName)
  778. continue
  779. }
  780. mcpCC := &CloudCost{
  781. Properties: CloudCostProperties{
  782. ProviderID: item.Properties.ProviderID,
  783. Provider: item.Properties.Provider,
  784. AccountID: item.Properties.AccountID,
  785. AccountName: item.Properties.AccountName,
  786. InvoiceEntityID: item.Properties.InvoiceEntityID,
  787. InvoiceEntityName: item.Properties.InvoiceEntityName,
  788. RegionID: item.Properties.RegionID,
  789. AvailabilityZone: item.Properties.AvailabilityZone,
  790. Service: item.Properties.Service,
  791. Category: item.Properties.Category,
  792. Labels: item.Properties.Labels,
  793. },
  794. Window: TimeWindow{
  795. Start: *item.Window.Start(),
  796. End: *item.Window.End(),
  797. },
  798. ListCost: CostMetric{
  799. Cost: item.ListCost.Cost,
  800. KubernetesPercent: item.ListCost.KubernetesPercent,
  801. },
  802. NetCost: CostMetric{
  803. Cost: item.NetCost.Cost,
  804. KubernetesPercent: item.NetCost.KubernetesPercent,
  805. },
  806. AmortizedNetCost: CostMetric{
  807. Cost: item.AmortizedNetCost.Cost,
  808. KubernetesPercent: item.AmortizedNetCost.KubernetesPercent,
  809. },
  810. InvoicedCost: CostMetric{
  811. Cost: item.InvoicedCost.Cost,
  812. KubernetesPercent: item.InvoicedCost.KubernetesPercent,
  813. },
  814. AmortizedCost: CostMetric{
  815. Cost: item.AmortizedCost.Cost,
  816. KubernetesPercent: item.AmortizedCost.KubernetesPercent,
  817. },
  818. }
  819. mcpSet.CloudCosts = append(mcpSet.CloudCosts, mcpCC)
  820. // Update summary totals
  821. totalNetCost += item.NetCost.Cost
  822. totalAmortizedCost += item.AmortizedNetCost.Cost
  823. totalInvoicedCost += item.InvoicedCost.Cost
  824. // Update breakdowns
  825. providerBreakdown[item.Properties.Provider] += item.NetCost.Cost
  826. serviceBreakdown[item.Properties.Service] += item.NetCost.Cost
  827. regionBreakdown[item.Properties.RegionID] += item.NetCost.Cost
  828. }
  829. mcpCloudCosts[setName] = mcpSet
  830. }
  831. // Calculate cost-weighted average Kubernetes percentage (by NetCost)
  832. var avgKubernetesPercent float64
  833. var numerator, denominator float64
  834. for _, ccSet := range ccsr.CloudCostSets {
  835. if ccSet == nil {
  836. log.Warnf("transformCloudCostSetRange: skipping nil CloudCostSet in Kubernetes percent calculation")
  837. continue
  838. }
  839. // Skip sets with invalid windows (consistent with first loop)
  840. if ccSet.Window.Start() == nil || ccSet.Window.End() == nil {
  841. log.Warnf("transformCloudCostSetRange: skipping CloudCostSet with invalid window (start=%v, end=%v) in Kubernetes percent calculation", ccSet.Window.Start(), ccSet.Window.End())
  842. continue
  843. }
  844. for _, item := range ccSet.CloudCosts {
  845. if item == nil {
  846. log.Warnf("transformCloudCostSetRange: skipping nil CloudCost item in Kubernetes percent calculation")
  847. continue
  848. }
  849. // Skip items with invalid windows (consistent with first loop)
  850. if item.Window.Start() == nil || item.Window.End() == nil {
  851. log.Warnf("transformCloudCostSetRange: skipping CloudCost item with invalid window (start=%v, end=%v) in Kubernetes percent calculation", item.Window.Start(), item.Window.End())
  852. continue
  853. }
  854. cost := item.NetCost.Cost
  855. percent := item.NetCost.KubernetesPercent
  856. if cost <= 0 {
  857. continue
  858. }
  859. numerator += cost * percent
  860. denominator += cost
  861. }
  862. }
  863. if denominator > 0 {
  864. avgKubernetesPercent = numerator / denominator
  865. }
  866. summary := &CloudCostSummary{
  867. TotalNetCost: totalNetCost,
  868. TotalAmortizedCost: totalAmortizedCost,
  869. TotalInvoicedCost: totalInvoicedCost,
  870. KubernetesPercent: avgKubernetesPercent,
  871. ProviderBreakdown: providerBreakdown,
  872. ServiceBreakdown: serviceBreakdown,
  873. RegionBreakdown: regionBreakdown,
  874. }
  875. return &CloudCostResponse{
  876. CloudCosts: mcpCloudCosts,
  877. Summary: summary,
  878. }
  879. }
  880. // defaultEfficiencyStep returns a step duration that keeps peak memory
  881. // bounded for large query windows. When the caller does not specify a step,
  882. // this provides a safe default that avoids loading the entire window into
  883. // memory at once.
  884. func defaultEfficiencyStep(windowDuration time.Duration) time.Duration {
  885. switch {
  886. case windowDuration >= 30*24*time.Hour:
  887. return 24 * time.Hour
  888. case windowDuration >= 7*24*time.Hour:
  889. return 6 * time.Hour
  890. case windowDuration >= 24*time.Hour:
  891. return time.Hour
  892. default:
  893. return windowDuration
  894. }
  895. }
  896. // QueryEfficiency queries allocation data and computes efficiency metrics with recommendations.
  897. func (s *MCPServer) QueryEfficiency(query *OpenCostQueryRequest) (*EfficiencyResponse, error) {
  898. // 1. Parse Window
  899. window, err := opencost.ParseWindowWithOffset(query.Window, 0)
  900. if err != nil {
  901. return nil, fmt.Errorf("failed to parse window '%s': %w", query.Window, err)
  902. }
  903. // 2. Set default parameters
  904. var aggregateBy []string
  905. var filterString string
  906. var bufferMultiplier float64 = efficiencyBufferMultiplier // Default to 1.2 (20% headroom)
  907. // 3. Parse efficiency parameters if provided
  908. if query.EfficiencyParams != nil {
  909. // Parse aggregation properties (default to pod if not specified)
  910. if query.EfficiencyParams.Aggregate != "" {
  911. aggregateBy = strings.Split(query.EfficiencyParams.Aggregate, ",")
  912. } else {
  913. aggregateBy = []string{"pod"}
  914. }
  915. // Set filter string
  916. filterString = query.EfficiencyParams.Filter
  917. // Validate filter string if provided
  918. if filterString != "" {
  919. parser := allocation.NewAllocationFilterParser()
  920. _, err := parser.Parse(filterString)
  921. if err != nil {
  922. return nil, fmt.Errorf("invalid allocation filter '%s': %w", filterString, err)
  923. }
  924. }
  925. // Set buffer multiplier if provided, otherwise use default
  926. if query.EfficiencyParams.EfficiencyBufferMultiplier != nil {
  927. bufferMultiplier = *query.EfficiencyParams.EfficiencyBufferMultiplier
  928. }
  929. } else {
  930. // Default to pod-level aggregation
  931. aggregateBy = []string{"pod"}
  932. filterString = ""
  933. }
  934. // 4. Determine query step size.
  935. // A smaller step reduces peak memory by breaking large windows into batches.
  936. // Results are accumulated so the output is functionally equivalent regardless
  937. // of step, though minor floating-point differences are possible because
  938. // per-step cost calculations (which use max(request, usage)) are summed
  939. // rather than computed in a single pass.
  940. var step time.Duration
  941. if query.EfficiencyParams != nil && query.EfficiencyParams.Step > 0 {
  942. step = query.EfficiencyParams.Step
  943. } else {
  944. step = defaultEfficiencyStep(window.Duration())
  945. }
  946. if step > window.Duration() {
  947. step = window.Duration()
  948. }
  949. if step <= 0 {
  950. return nil, fmt.Errorf("invalid query: window has zero or negative duration")
  951. }
  952. accumulateBy := opencost.AccumulateOptionNone
  953. if step < window.Duration() {
  954. accumulateBy = opencost.AccumulateOptionAll
  955. }
  956. asr, err := s.costModel.QueryAllocation(
  957. window,
  958. step,
  959. aggregateBy,
  960. false, // includeIdle
  961. false, // idleByNode
  962. false, // includeProportionalAssetResourceCosts
  963. false, // includeAggregatedMetadata
  964. false, // sharedLoadBalancer
  965. accumulateBy,
  966. false, // shareIdle
  967. filterString,
  968. )
  969. if err != nil {
  970. return nil, fmt.Errorf("failed to query allocations: %w", err)
  971. }
  972. // 5. Handle empty results
  973. if asr == nil || len(asr.Allocations) == 0 {
  974. return &EfficiencyResponse{
  975. Efficiencies: []*EfficiencyMetric{},
  976. }, nil
  977. }
  978. // 6. Compute efficiency metrics from allocations using concurrent processing
  979. var (
  980. mu sync.Mutex
  981. wg sync.WaitGroup
  982. efficiencies = make([]*EfficiencyMetric, 0)
  983. )
  984. // Process each allocation set (typically one per time window) concurrently
  985. for _, allocSet := range asr.Allocations {
  986. if allocSet == nil {
  987. continue
  988. }
  989. // Process this allocation set in a goroutine
  990. wg.Add(1)
  991. go func(allocSet *opencost.AllocationSet) {
  992. defer wg.Done()
  993. // Compute metrics for all allocations in this set
  994. localMetrics := make([]*EfficiencyMetric, 0, len(allocSet.Allocations))
  995. for _, alloc := range allocSet.Allocations {
  996. if metric := computeEfficiencyMetric(alloc, bufferMultiplier); metric != nil {
  997. localMetrics = append(localMetrics, metric)
  998. }
  999. }
  1000. // Append results to shared slice (thread-safe)
  1001. if len(localMetrics) > 0 {
  1002. mu.Lock()
  1003. efficiencies = append(efficiencies, localMetrics...)
  1004. mu.Unlock()
  1005. }
  1006. }(allocSet)
  1007. }
  1008. // Wait for all goroutines to complete
  1009. wg.Wait()
  1010. return &EfficiencyResponse{
  1011. Efficiencies: efficiencies,
  1012. }, nil
  1013. }
  1014. // safeDiv performs division and returns 0 if denominator is 0.
  1015. func safeDiv(numerator, denominator float64) float64 {
  1016. if denominator == 0 {
  1017. return 0
  1018. }
  1019. return numerator / denominator
  1020. }
  1021. // computeEfficiencyMetric calculates efficiency metrics for a single allocation.
  1022. func computeEfficiencyMetric(alloc *opencost.Allocation, bufferMultiplier float64) *EfficiencyMetric {
  1023. if alloc == nil {
  1024. return nil
  1025. }
  1026. // Calculate time duration in hours
  1027. hours := alloc.Minutes() / 60.0
  1028. if hours <= 0 {
  1029. return nil
  1030. }
  1031. // Get current usage (average over the period)
  1032. cpuCoresUsed := alloc.CPUCoreHours / hours
  1033. ramBytesUsed := alloc.RAMByteHours / hours
  1034. // Get requested amounts
  1035. cpuCoresRequested := alloc.CPUCoreRequestAverage
  1036. ramBytesRequested := alloc.RAMBytesRequestAverage
  1037. // Calculate current efficiency (will be 0 if no requests are set)
  1038. cpuEfficiency := safeDiv(cpuCoresUsed, cpuCoresRequested)
  1039. memoryEfficiency := safeDiv(ramBytesUsed, ramBytesRequested)
  1040. // Calculate recommendations with buffer for headroom
  1041. recommendedCPU := cpuCoresUsed * bufferMultiplier
  1042. recommendedRAM := ramBytesUsed * bufferMultiplier
  1043. // Ensure recommendations meet minimum thresholds
  1044. if recommendedCPU < efficiencyMinCPU {
  1045. recommendedCPU = efficiencyMinCPU
  1046. }
  1047. if recommendedRAM < efficiencyMinRAM {
  1048. recommendedRAM = efficiencyMinRAM
  1049. }
  1050. // Calculate resulting efficiency after applying recommendations
  1051. resultingCPUEff := safeDiv(cpuCoresUsed, recommendedCPU)
  1052. resultingMemEff := safeDiv(ramBytesUsed, recommendedRAM)
  1053. // Calculate cost per unit based on REQUESTED amounts (not used amounts)
  1054. // This gives us the cost per core-hour or byte-hour that the cluster charges
  1055. cpuCostPerCoreHour := safeDiv(alloc.CPUCost, cpuCoresRequested*hours)
  1056. ramCostPerByteHour := safeDiv(alloc.RAMCost, ramBytesRequested*hours)
  1057. // Current total cost
  1058. currentTotalCost := alloc.TotalCost()
  1059. // Estimate recommended cost based on recommended requests
  1060. recommendedCPUCost := recommendedCPU * hours * cpuCostPerCoreHour
  1061. recommendedRAMCost := recommendedRAM * hours * ramCostPerByteHour
  1062. // Keep other costs the same (PV, network, shared, external, GPU)
  1063. otherCosts := alloc.PVCost() + alloc.NetworkCost + alloc.SharedCost + alloc.ExternalCost + alloc.GPUCost
  1064. recommendedTotalCost := recommendedCPUCost + recommendedRAMCost + otherCosts
  1065. // Clamp recommended cost to avoid rounding issues making it higher than current
  1066. if recommendedTotalCost > currentTotalCost && (recommendedTotalCost-currentTotalCost) < 0.0001 {
  1067. recommendedTotalCost = currentTotalCost
  1068. }
  1069. // Calculate savings
  1070. costSavings := currentTotalCost - recommendedTotalCost
  1071. costSavingsPercent := safeDiv(costSavings, currentTotalCost) * 100
  1072. return &EfficiencyMetric{
  1073. Name: alloc.Name,
  1074. CPUEfficiency: cpuEfficiency,
  1075. MemoryEfficiency: memoryEfficiency,
  1076. CPUCoresRequested: cpuCoresRequested,
  1077. CPUCoresUsed: cpuCoresUsed,
  1078. RAMBytesRequested: ramBytesRequested,
  1079. RAMBytesUsed: ramBytesUsed,
  1080. RecommendedCPURequest: recommendedCPU,
  1081. RecommendedRAMRequest: recommendedRAM,
  1082. ResultingCPUEfficiency: resultingCPUEff,
  1083. ResultingMemoryEfficiency: resultingMemEff,
  1084. CurrentTotalCost: currentTotalCost,
  1085. RecommendedCost: recommendedTotalCost,
  1086. CostSavings: costSavings,
  1087. CostSavingsPercent: costSavingsPercent,
  1088. EfficiencyBufferMultiplier: bufferMultiplier,
  1089. Start: alloc.Start,
  1090. End: alloc.End,
  1091. }
  1092. }