costmodel.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. package costmodel
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/julienschmidt/httprouter"
  8. "github.com/opencost/opencost/core/pkg/util/apiutil"
  9. "github.com/opencost/opencost/pkg/cloud/models"
  10. "github.com/opencost/opencost/pkg/cloud/provider"
  11. "github.com/opencost/opencost/pkg/cloudcost"
  12. "github.com/opencost/opencost/pkg/customcost"
  13. "github.com/prometheus/client_golang/prometheus/promhttp"
  14. "github.com/rs/cors"
  15. mcp_sdk "github.com/modelcontextprotocol/go-sdk/mcp"
  16. "github.com/opencost/opencost/core/pkg/errors"
  17. "github.com/opencost/opencost/core/pkg/log"
  18. "github.com/opencost/opencost/core/pkg/version"
  19. "github.com/opencost/opencost/pkg/costmodel"
  20. "github.com/opencost/opencost/pkg/env"
  21. "github.com/opencost/opencost/pkg/filemanager"
  22. opencost_mcp "github.com/opencost/opencost/pkg/mcp"
  23. "github.com/opencost/opencost/pkg/metrics"
  24. )
  25. func Execute(conf *Config) error {
  26. log.Infof("Starting cost-model version %s", version.FriendlyVersion())
  27. if conf == nil {
  28. conf = DefaultConfig()
  29. }
  30. conf.log()
  31. router := httprouter.New()
  32. var a *costmodel.Accesses
  33. var cp models.Provider
  34. if conf.KubernetesEnabled {
  35. a = costmodel.Initialize(router)
  36. err := StartExportWorker(context.Background(), a.Model)
  37. if err != nil {
  38. log.Errorf("couldn't start CSV export worker: %v", err)
  39. }
  40. // Register OpenCost Specific Endpoints
  41. router.GET("/allocation", a.ComputeAllocationHandler)
  42. router.GET("/allocation/summary", a.ComputeAllocationHandlerSummary)
  43. router.GET("/assets", a.ComputeAssetsHandler)
  44. if conf.CarbonEstimatesEnabled {
  45. router.GET("/assets/carbon", a.ComputeAssetsCarbonHandler)
  46. }
  47. // set cloud provider for cloud cost
  48. cp = a.CloudProvider
  49. }
  50. var cloudCostPipelineService *cloudcost.PipelineService
  51. if conf.CloudCostEnabled {
  52. var providerConfig models.ProviderConfig
  53. if cp != nil {
  54. providerConfig = provider.ExtractConfigFromProviders(cp)
  55. }
  56. cloudCostPipelineService = costmodel.InitializeCloudCost(router, providerConfig)
  57. }
  58. var customCostPipelineService *customcost.PipelineService
  59. if conf.CloudCostEnabled {
  60. customCostPipelineService = costmodel.InitializeCustomCost(router)
  61. }
  62. // this endpoint is intentionally left out of the "if env.IsCustomCostEnabled()" conditional; in the handler, it is
  63. // valid for CustomCostPipelineService to be nil
  64. router.GET("/customCost/status", customCostPipelineService.GetCustomCostStatusHandler())
  65. // Initialize MCP Server if enabled and Kubernetes is available
  66. if conf.MCPServerEnabled && a != nil {
  67. // Get cloud cost querier if cloud costs are enabled
  68. var cloudCostQuerier cloudcost.Querier
  69. if conf.CloudCostEnabled && cloudCostPipelineService != nil {
  70. cloudCostQuerier = cloudCostPipelineService.GetCloudCostQuerier()
  71. }
  72. err := StartMCPServer(context.Background(), a, cloudCostQuerier)
  73. if err != nil {
  74. log.Errorf("Failed to start MCP server: %v", err)
  75. }
  76. } else if conf.MCPServerEnabled {
  77. log.Warnf("MCP Server is enabled but Kubernetes is not available. MCP server requires Kubernetes to function.")
  78. }
  79. apiutil.ApplyContainerDiagnosticEndpoints(router)
  80. rootMux := http.NewServeMux()
  81. rootMux.Handle("/", router)
  82. rootMux.Handle("/metrics", promhttp.Handler())
  83. telemetryHandler := metrics.ResponseMetricMiddleware(rootMux)
  84. handler := cors.AllowAll().Handler(telemetryHandler)
  85. return http.ListenAndServe(fmt.Sprint(":", conf.Port), errors.PanicHandlerMiddleware(handler))
  86. }
  87. func StartExportWorker(ctx context.Context, model costmodel.AllocationModel) error {
  88. exportPath := env.GetExportCSVFile()
  89. if exportPath == "" {
  90. log.Infof("%s is not set, CSV export is disabled", env.ExportCSVFile)
  91. return nil
  92. }
  93. fm, err := filemanager.NewFileManager(exportPath)
  94. if err != nil {
  95. return fmt.Errorf("could not create file manager: %v", err)
  96. }
  97. go func() {
  98. log.Info("Starting CSV exporter worker...")
  99. // perform first update immediately
  100. nextRunAt := time.Now()
  101. for {
  102. select {
  103. case <-ctx.Done():
  104. return
  105. case <-time.After(time.Until(nextRunAt)):
  106. err := costmodel.UpdateCSV(ctx, fm, model, env.GetExportCSVLabelsAll(), env.GetExportCSVLabelsList())
  107. if err != nil {
  108. // it's background worker, log error and carry on, maybe next time it will work
  109. log.Errorf("Error updating CSV: %s", err)
  110. }
  111. now := time.Now().UTC()
  112. // next launch is at 00:10 UTC tomorrow
  113. // extra 10 minutes is to let prometheus to collect all the data for the previous day
  114. nextRunAt = time.Date(now.Year(), now.Month(), now.Day(), 0, 10, 0, 0, now.Location()).AddDate(0, 0, 1)
  115. }
  116. }
  117. }()
  118. return nil
  119. }
  120. // StartMCPServer starts the MCP server as a background service
  121. func StartMCPServer(ctx context.Context, accesses *costmodel.Accesses, cloudCostQuerier cloudcost.Querier) error {
  122. log.Info("Initializing MCP server...")
  123. // Create MCP server using existing OpenCost dependencies
  124. mcpServer := opencost_mcp.NewMCPServer(accesses.Model, accesses.CloudProvider, cloudCostQuerier)
  125. // Create MCP SDK server
  126. sdkServer := mcp_sdk.NewServer(&mcp_sdk.Implementation{
  127. Name: "opencost-mcp-server",
  128. Version: version.Version,
  129. }, nil)
  130. // Define tool handlers
  131. handleAllocationCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args AllocationArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  132. // Parse step duration if provided
  133. var step time.Duration
  134. var err error
  135. if args.Step != "" {
  136. step, err = time.ParseDuration(args.Step)
  137. if err != nil {
  138. return nil, nil, fmt.Errorf("invalid step duration '%s': %w", args.Step, err)
  139. }
  140. }
  141. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  142. QueryType: opencost_mcp.AllocationQueryType,
  143. Window: args.Window,
  144. AllocationParams: &opencost_mcp.AllocationQuery{
  145. Step: step,
  146. Accumulate: args.Accumulate,
  147. ShareIdle: args.ShareIdle,
  148. Aggregate: args.Aggregate,
  149. IncludeIdle: args.IncludeIdle,
  150. IdleByNode: args.IdleByNode,
  151. IncludeProportionalAssetResourceCosts: args.IncludeProportionalAssetResourceCosts,
  152. IncludeAggregatedMetadata: args.IncludeAggregatedMetadata,
  153. ShareLB: args.ShareLB,
  154. Filter: args.Filter,
  155. },
  156. }
  157. mcpReq := &opencost_mcp.MCPRequest{
  158. Query: queryRequest,
  159. }
  160. mcpResp, err := mcpServer.ProcessMCPRequest(mcpReq)
  161. if err != nil {
  162. return nil, nil, fmt.Errorf("failed to process allocation request: %w", err)
  163. }
  164. return nil, mcpResp, nil
  165. }
  166. handleAssetCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args AssetArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  167. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  168. QueryType: opencost_mcp.AssetQueryType,
  169. Window: args.Window,
  170. AssetParams: &opencost_mcp.AssetQuery{},
  171. }
  172. mcpReq := &opencost_mcp.MCPRequest{
  173. Query: queryRequest,
  174. }
  175. mcpResp, err := mcpServer.ProcessMCPRequest(mcpReq)
  176. if err != nil {
  177. return nil, nil, fmt.Errorf("failed to process asset request: %w", err)
  178. }
  179. return nil, mcpResp, nil
  180. }
  181. handleCloudCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args CloudCostArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  182. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  183. QueryType: opencost_mcp.CloudCostQueryType,
  184. Window: args.Window,
  185. CloudCostParams: &opencost_mcp.CloudCostQuery{
  186. Aggregate: args.Aggregate,
  187. Accumulate: args.Accumulate,
  188. Filter: args.Filter,
  189. Provider: args.Provider,
  190. Service: args.Service,
  191. Category: args.Category,
  192. Region: args.Region,
  193. AccountID: args.Account,
  194. },
  195. }
  196. mcpReq := &opencost_mcp.MCPRequest{
  197. Query: queryRequest,
  198. }
  199. mcpResp, err := mcpServer.ProcessMCPRequest(mcpReq)
  200. if err != nil {
  201. return nil, nil, fmt.Errorf("failed to process cloud cost request: %w", err)
  202. }
  203. return nil, mcpResp, nil
  204. }
  205. handleEfficiency := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args EfficiencyArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  206. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  207. QueryType: opencost_mcp.EfficiencyQueryType,
  208. Window: args.Window,
  209. EfficiencyParams: &opencost_mcp.EfficiencyQuery{
  210. Aggregate: args.Aggregate,
  211. Filter: args.Filter,
  212. EfficiencyBufferMultiplier: args.BufferMultiplier,
  213. },
  214. }
  215. mcpReq := &opencost_mcp.MCPRequest{
  216. Query: queryRequest,
  217. }
  218. mcpResp, err := mcpServer.ProcessMCPRequest(mcpReq)
  219. if err != nil {
  220. return nil, nil, fmt.Errorf("failed to process efficiency request: %w", err)
  221. }
  222. return nil, mcpResp, nil
  223. }
  224. handleRecommendations := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args RecommendationsArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  225. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  226. QueryType: opencost_mcp.RecommendationsQueryType,
  227. Window: args.Window,
  228. RecommendationsParams: &opencost_mcp.RecommendationsQuery{
  229. Aggregate: args.Aggregate,
  230. Filter: args.Filter,
  231. BufferMultiplier: args.BufferMultiplier,
  232. MinSavings: args.MinSavings,
  233. IncludeIdle: args.IncludeIdle,
  234. IncludeOversized: args.IncludeOversized,
  235. IncludeRightsize: args.IncludeRightsize,
  236. TopN: args.TopN,
  237. },
  238. }
  239. mcpReq := &opencost_mcp.MCPRequest{
  240. Query: queryRequest,
  241. }
  242. mcpResp, err := mcpServer.ProcessMCPRequest(mcpReq)
  243. if err != nil {
  244. return nil, nil, fmt.Errorf("failed to process recommendations request: %w", err)
  245. }
  246. return nil, mcpResp, nil
  247. }
  248. // Register tools
  249. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  250. Name: "get_allocation_costs",
  251. Description: "Retrieves allocation cost data.",
  252. }, handleAllocationCosts)
  253. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  254. Name: "get_asset_costs",
  255. Description: "Retrieves asset cost data.",
  256. }, handleAssetCosts)
  257. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  258. Name: "get_cloud_costs",
  259. Description: "Retrieves cloud cost data.",
  260. }, handleCloudCosts)
  261. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  262. Name: "get_efficiency",
  263. Description: "Retrieves resource efficiency metrics with rightsizing recommendations and cost savings analysis. Computes CPU and memory efficiency (usage/request ratio), provides recommended resource requests, and calculates potential cost savings. Optional buffer_multiplier parameter (default: 1.2 for 20% headroom) can be set to values like 1.4 for 40% headroom.",
  264. }, handleEfficiency)
  265. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  266. Name: "get_cost_recommendations",
  267. Description: "Generates actionable cost optimization recommendations. Identifies idle resources (very low utilization), oversized resources (low efficiency), and rightsizing opportunities. Returns prioritized recommendations sorted by potential savings. Supports filtering by namespace, aggregation level, and minimum savings threshold. Each recommendation includes current vs recommended resource requests, estimated savings, and specific actions to take.",
  268. }, handleRecommendations)
  269. // Create HTTP handler
  270. handler := mcp_sdk.NewStreamableHTTPHandler(func(r *http.Request) *mcp_sdk.Server {
  271. return sdkServer
  272. }, &mcp_sdk.StreamableHTTPOptions{
  273. JSONResponse: true,
  274. })
  275. // Add logging middleware
  276. loggingHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  277. log.Debugf("MCP HTTP request: %s %s from %s", req.Method, req.URL.Path, req.RemoteAddr)
  278. handler.ServeHTTP(w, req)
  279. })
  280. // Start HTTP server on configured port
  281. port := env.GetMCPHTTPPort()
  282. log.Infof("Starting MCP HTTP server on port %d...", port)
  283. server := &http.Server{
  284. Addr: fmt.Sprintf(":%d", port),
  285. Handler: loggingHandler,
  286. }
  287. // Start server in a goroutine
  288. go func() {
  289. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  290. log.Errorf("MCP server failed: %v", err)
  291. }
  292. }()
  293. log.Info("MCP server started successfully")
  294. return nil
  295. }
  296. // Tool argument structures for MCP server
  297. type AllocationArgs struct {
  298. Window string `json:"window"`
  299. Aggregate string `json:"aggregate"`
  300. // Allocation query parameters
  301. Step string `json:"step,omitempty"`
  302. Resolution string `json:"resolution,omitempty"`
  303. Accumulate bool `json:"accumulate,omitempty"`
  304. ShareIdle bool `json:"share_idle,omitempty"`
  305. IncludeIdle bool `json:"include_idle,omitempty"`
  306. IdleByNode bool `json:"idle_by_node,omitempty"`
  307. IncludeProportionalAssetResourceCosts bool `json:"include_proportional_asset_resource_costs,omitempty"`
  308. IncludeAggregatedMetadata bool `json:"include_aggregated_metadata,omitempty"`
  309. ShareLB bool `json:"share_lb,omitempty"`
  310. Filter string `json:"filter,omitempty"`
  311. }
  312. type AssetArgs struct {
  313. Window string `json:"window"`
  314. }
  315. type CloudCostArgs struct {
  316. Window string `json:"window"`
  317. Aggregate string `json:"aggregate"`
  318. // Cloud cost query parameters
  319. Accumulate string `json:"accumulate,omitempty"`
  320. Filter string `json:"filter,omitempty"`
  321. Provider string `json:"provider,omitempty"`
  322. Service string `json:"service,omitempty"`
  323. Category string `json:"category,omitempty"`
  324. Region string `json:"region,omitempty"`
  325. Account string `json:"account,omitempty"`
  326. }
  327. type EfficiencyArgs struct {
  328. Window string `json:"window"` // Time window (e.g., "today", "yesterday", "7d", "lastweek")
  329. Aggregate string `json:"aggregate,omitempty"` // Aggregation level (e.g., "pod", "namespace", "controller")
  330. Filter string `json:"filter,omitempty"` // Filter expression (same as allocation filters)
  331. BufferMultiplier *float64 `json:"buffer_multiplier,omitempty"` // Buffer multiplier for recommendations (default: 1.2 for 20% headroom, e.g., 1.4 for 40%)
  332. }
  333. type RecommendationsArgs struct {
  334. Window string `json:"window"` // Time window (e.g., "today", "yesterday", "7d", "lastweek")
  335. Aggregate string `json:"aggregate,omitempty"` // Aggregation level (e.g., "pod", "namespace", "controller")
  336. Filter string `json:"filter,omitempty"` // Filter expression (same as allocation filters)
  337. BufferMultiplier *float64 `json:"buffer_multiplier,omitempty"` // Buffer multiplier for sizing recommendations (default: 1.2)
  338. MinSavings *float64 `json:"min_savings,omitempty"` // Minimum savings threshold to include recommendation (default: 0.01)
  339. IncludeIdle bool `json:"include_idle,omitempty"` // Include idle resource detection
  340. IncludeOversized bool `json:"include_oversized,omitempty"` // Include oversized resource detection
  341. IncludeRightsize bool `json:"include_rightsize,omitempty"` // Include rightsizing recommendations
  342. TopN *int `json:"top_n,omitempty"` // Limit to top N recommendations by savings
  343. }