costmodel.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. package costmodel
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "os/signal"
  8. "syscall"
  9. "time"
  10. "github.com/julienschmidt/httprouter"
  11. "github.com/opencost/opencost/core/pkg/util/apiutil"
  12. "github.com/opencost/opencost/pkg/cloud/models"
  13. "github.com/opencost/opencost/pkg/cloud/provider"
  14. "github.com/opencost/opencost/pkg/cloudcost"
  15. "github.com/opencost/opencost/pkg/customcost"
  16. "github.com/prometheus/client_golang/prometheus/promhttp"
  17. "github.com/rs/cors"
  18. mcp_sdk "github.com/modelcontextprotocol/go-sdk/mcp"
  19. "github.com/opencost/opencost/core/pkg/errors"
  20. "github.com/opencost/opencost/core/pkg/log"
  21. "github.com/opencost/opencost/core/pkg/version"
  22. "github.com/opencost/opencost/pkg/costmodel"
  23. "github.com/opencost/opencost/pkg/env"
  24. "github.com/opencost/opencost/pkg/filemanager"
  25. opencost_mcp "github.com/opencost/opencost/pkg/mcp"
  26. "github.com/opencost/opencost/pkg/metrics"
  27. )
  28. func Execute(conf *Config) error {
  29. log.Infof("Starting cost-model version %s", version.FriendlyVersion())
  30. if conf == nil {
  31. conf = DefaultConfig()
  32. }
  33. conf.log()
  34. // Create cancellable context for graceful shutdown
  35. ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
  36. defer cancel()
  37. router := httprouter.New()
  38. var a *costmodel.Accesses
  39. var cp models.Provider
  40. if conf.KubernetesEnabled {
  41. a = costmodel.Initialize(router)
  42. err := StartExportWorker(context.Background(), a.Model)
  43. if err != nil {
  44. log.Errorf("couldn't start CSV export worker: %v", err)
  45. }
  46. // Register OpenCost Specific Endpoints
  47. router.GET("/allocation", a.ComputeAllocationHandler)
  48. router.GET("/allocation/summary", a.ComputeAllocationHandlerSummary)
  49. router.GET("/assets", a.ComputeAssetsHandler)
  50. if conf.CarbonEstimatesEnabled {
  51. router.GET("/assets/carbon", a.ComputeAssetsCarbonHandler)
  52. }
  53. // set cloud provider for cloud cost
  54. cp = a.CloudProvider
  55. }
  56. var cloudCostPipelineService *cloudcost.PipelineService
  57. if conf.CloudCostEnabled {
  58. var providerConfig models.ProviderConfig
  59. if cp != nil {
  60. providerConfig = provider.ExtractConfigFromProviders(cp)
  61. }
  62. cloudCostPipelineService = costmodel.InitializeCloudCost(router, providerConfig)
  63. }
  64. var customCostPipelineService *customcost.PipelineService
  65. if conf.CustomCostEnabled {
  66. customCostPipelineService = costmodel.InitializeCustomCost(router)
  67. }
  68. // this endpoint is intentionally left out of the "if env.IsCustomCostEnabled()" conditional; in the handler, it is
  69. // valid for CustomCostPipelineService to be nil
  70. router.GET("/customCost/status", customCostPipelineService.GetCustomCostStatusHandler())
  71. // Initialize MCP Server if enabled and Kubernetes is available
  72. if conf.MCPServerEnabled && a != nil {
  73. // Get cloud cost querier if cloud costs are enabled
  74. var cloudCostQuerier cloudcost.Querier
  75. if conf.CloudCostEnabled && cloudCostPipelineService != nil {
  76. cloudCostQuerier = cloudCostPipelineService.GetCloudCostQuerier()
  77. }
  78. err := StartMCPServer(ctx, a, cloudCostQuerier)
  79. if err != nil {
  80. log.Errorf("Failed to start MCP server: %v", err)
  81. }
  82. } else if conf.MCPServerEnabled {
  83. log.Warnf("MCP Server is enabled but Kubernetes is not available. MCP server requires Kubernetes to function.")
  84. }
  85. apiutil.ApplyContainerDiagnosticEndpoints(router)
  86. rootMux := http.NewServeMux()
  87. rootMux.Handle("/", router)
  88. rootMux.Handle("/metrics", promhttp.Handler())
  89. telemetryHandler := metrics.ResponseMetricMiddleware(rootMux)
  90. handler := cors.AllowAll().Handler(telemetryHandler)
  91. return http.ListenAndServe(fmt.Sprint(":", conf.Port), errors.PanicHandlerMiddleware(handler))
  92. }
  93. func StartExportWorker(ctx context.Context, model costmodel.AllocationModel) error {
  94. exportPath := env.GetExportCSVFile()
  95. if exportPath == "" {
  96. log.Infof("%s is not set, CSV export is disabled", env.ExportCSVFile)
  97. return nil
  98. }
  99. fm, err := filemanager.NewFileManager(exportPath)
  100. if err != nil {
  101. return fmt.Errorf("could not create file manager: %v", err)
  102. }
  103. go func() {
  104. log.Info("Starting CSV exporter worker...")
  105. // perform first update immediately
  106. nextRunAt := time.Now()
  107. for {
  108. select {
  109. case <-ctx.Done():
  110. return
  111. case <-time.After(time.Until(nextRunAt)):
  112. err := costmodel.UpdateCSV(ctx, fm, model, env.GetExportCSVLabelsAll(), env.GetExportCSVLabelsList())
  113. if err != nil {
  114. // it's background worker, log error and carry on, maybe next time it will work
  115. log.Errorf("Error updating CSV: %s", err)
  116. }
  117. now := time.Now().UTC()
  118. // next launch is at 00:10 UTC tomorrow
  119. // extra 10 minutes is to let prometheus to collect all the data for the previous day
  120. nextRunAt = time.Date(now.Year(), now.Month(), now.Day(), 0, 10, 0, 0, now.Location()).AddDate(0, 0, 1)
  121. }
  122. }
  123. }()
  124. return nil
  125. }
  126. // StartMCPServer starts the MCP server as a background service
  127. func StartMCPServer(ctx context.Context, accesses *costmodel.Accesses, cloudCostQuerier cloudcost.Querier) error {
  128. log.Info("Initializing MCP server...")
  129. // Create MCP server using existing OpenCost dependencies
  130. mcpServer := opencost_mcp.NewMCPServer(accesses.Model, accesses.CloudProvider, cloudCostQuerier)
  131. // Create MCP SDK server
  132. sdkServer := mcp_sdk.NewServer(&mcp_sdk.Implementation{
  133. Name: "opencost-mcp-server",
  134. Version: version.Version,
  135. }, nil)
  136. // Define tool handlers
  137. handleAllocationCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args AllocationArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  138. // Parse step duration if provided
  139. var step time.Duration
  140. var err error
  141. if args.Step != "" {
  142. step, err = time.ParseDuration(args.Step)
  143. if err != nil {
  144. return nil, nil, fmt.Errorf("invalid step duration '%s': %w", args.Step, err)
  145. }
  146. }
  147. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  148. QueryType: opencost_mcp.AllocationQueryType,
  149. Window: args.Window,
  150. AllocationParams: &opencost_mcp.AllocationQuery{
  151. Step: step,
  152. Accumulate: args.Accumulate,
  153. ShareIdle: args.ShareIdle,
  154. Aggregate: args.Aggregate,
  155. IncludeIdle: args.IncludeIdle,
  156. IdleByNode: args.IdleByNode,
  157. IncludeProportionalAssetResourceCosts: args.IncludeProportionalAssetResourceCosts,
  158. IncludeAggregatedMetadata: args.IncludeAggregatedMetadata,
  159. ShareLB: args.ShareLB,
  160. Filter: args.Filter,
  161. },
  162. }
  163. mcpReq := &opencost_mcp.MCPRequest{
  164. Query: queryRequest,
  165. }
  166. mcpResp, err := mcpServer.ProcessMCPRequest(ctx, mcpReq)
  167. if err != nil {
  168. return nil, nil, fmt.Errorf("failed to process allocation request: %w", err)
  169. }
  170. return nil, mcpResp, nil
  171. }
  172. handleAssetCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args AssetArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  173. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  174. QueryType: opencost_mcp.AssetQueryType,
  175. Window: args.Window,
  176. AssetParams: &opencost_mcp.AssetQuery{},
  177. }
  178. mcpReq := &opencost_mcp.MCPRequest{
  179. Query: queryRequest,
  180. }
  181. mcpResp, err := mcpServer.ProcessMCPRequest(ctx, mcpReq)
  182. if err != nil {
  183. return nil, nil, fmt.Errorf("failed to process asset request: %w", err)
  184. }
  185. return nil, mcpResp, nil
  186. }
  187. handleCloudCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args CloudCostArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  188. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  189. QueryType: opencost_mcp.CloudCostQueryType,
  190. Window: args.Window,
  191. CloudCostParams: &opencost_mcp.CloudCostQuery{
  192. Aggregate: args.Aggregate,
  193. Accumulate: args.Accumulate,
  194. Filter: args.Filter,
  195. Provider: args.Provider,
  196. Service: args.Service,
  197. Category: args.Category,
  198. Region: args.Region,
  199. AccountID: args.Account,
  200. },
  201. }
  202. mcpReq := &opencost_mcp.MCPRequest{
  203. Query: queryRequest,
  204. }
  205. mcpResp, err := mcpServer.ProcessMCPRequest(ctx, mcpReq)
  206. if err != nil {
  207. return nil, nil, fmt.Errorf("failed to process cloud cost request: %w", err)
  208. }
  209. return nil, mcpResp, nil
  210. }
  211. handleEfficiency := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args EfficiencyArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  212. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  213. QueryType: opencost_mcp.EfficiencyQueryType,
  214. Window: args.Window,
  215. EfficiencyParams: &opencost_mcp.EfficiencyQuery{
  216. Aggregate: args.Aggregate,
  217. Filter: args.Filter,
  218. EfficiencyBufferMultiplier: args.BufferMultiplier,
  219. },
  220. }
  221. mcpReq := &opencost_mcp.MCPRequest{
  222. Query: queryRequest,
  223. }
  224. mcpResp, err := mcpServer.ProcessMCPRequest(ctx, mcpReq)
  225. if err != nil {
  226. return nil, nil, fmt.Errorf("failed to process efficiency request: %w", err)
  227. }
  228. return nil, mcpResp, nil
  229. }
  230. // Register tools
  231. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  232. Name: "get_allocation_costs",
  233. Description: "Retrieves allocation cost data.",
  234. }, handleAllocationCosts)
  235. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  236. Name: "get_asset_costs",
  237. Description: "Retrieves asset cost data.",
  238. }, handleAssetCosts)
  239. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  240. Name: "get_cloud_costs",
  241. Description: "Retrieves cloud cost data.",
  242. }, handleCloudCosts)
  243. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  244. Name: "get_efficiency",
  245. Description: "Retrieves resource efficiency metrics with rightsizing recommendations and cost savings analysis. Computes CPU and memory efficiency (usage/request ratio), provides recommended resource requests, and calculates potential cost savings. Optional buffer_multiplier parameter (default: 1.2 for 20% headroom) can be set to values like 1.4 for 40% headroom.",
  246. }, handleEfficiency)
  247. // Create HTTP handler
  248. handler := mcp_sdk.NewStreamableHTTPHandler(func(r *http.Request) *mcp_sdk.Server {
  249. return sdkServer
  250. }, &mcp_sdk.StreamableHTTPOptions{
  251. JSONResponse: true,
  252. })
  253. // Add logging middleware
  254. loggingHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  255. log.Debugf("MCP HTTP request: %s %s from %s", req.Method, req.URL.Path, req.RemoteAddr)
  256. handler.ServeHTTP(w, req)
  257. })
  258. // Start HTTP server on configured port
  259. port := env.GetMCPHTTPPort()
  260. log.Infof("Starting MCP HTTP server on port %d...", port)
  261. server := &http.Server{
  262. Addr: fmt.Sprintf(":%d", port),
  263. Handler: loggingHandler,
  264. }
  265. // Start server in a goroutine
  266. go func() {
  267. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  268. log.Errorf("MCP server failed: %v", err)
  269. }
  270. }()
  271. // Graceful shutdown goroutine
  272. go func() {
  273. <-ctx.Done()
  274. log.Info("Shutting down MCP server...")
  275. shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  276. defer cancel()
  277. if err := server.Shutdown(shutdownCtx); err != nil {
  278. log.Errorf("MCP server shutdown error: %v", err)
  279. } else {
  280. log.Info("MCP server shut down successfully")
  281. }
  282. }()
  283. log.Info("MCP server started successfully")
  284. return nil
  285. }
  286. // Tool argument structures for MCP server
  287. type AllocationArgs struct {
  288. Window string `json:"window"`
  289. Aggregate string `json:"aggregate"`
  290. // Allocation query parameters
  291. Step string `json:"step,omitempty"`
  292. Resolution string `json:"resolution,omitempty"`
  293. Accumulate bool `json:"accumulate,omitempty"`
  294. ShareIdle bool `json:"share_idle,omitempty"`
  295. IncludeIdle bool `json:"include_idle,omitempty"`
  296. IdleByNode bool `json:"idle_by_node,omitempty"`
  297. IncludeProportionalAssetResourceCosts bool `json:"include_proportional_asset_resource_costs,omitempty"`
  298. IncludeAggregatedMetadata bool `json:"include_aggregated_metadata,omitempty"`
  299. ShareLB bool `json:"share_lb,omitempty"`
  300. Filter string `json:"filter,omitempty"`
  301. }
  302. type AssetArgs struct {
  303. Window string `json:"window"`
  304. }
  305. type CloudCostArgs struct {
  306. Window string `json:"window"`
  307. Aggregate string `json:"aggregate"`
  308. // Cloud cost query parameters
  309. Accumulate string `json:"accumulate,omitempty"`
  310. Filter string `json:"filter,omitempty"`
  311. Provider string `json:"provider,omitempty"`
  312. Service string `json:"service,omitempty"`
  313. Category string `json:"category,omitempty"`
  314. Region string `json:"region,omitempty"`
  315. Account string `json:"account,omitempty"`
  316. }
  317. type EfficiencyArgs struct {
  318. Window string `json:"window"` // Time window (e.g., "today", "yesterday", "7d", "lastweek")
  319. Aggregate string `json:"aggregate,omitempty"` // Aggregation level (e.g., "pod", "namespace", "controller")
  320. Filter string `json:"filter,omitempty"` // Filter expression (same as allocation filters)
  321. BufferMultiplier *float64 `json:"buffer_multiplier,omitempty"` // Buffer multiplier for recommendations (default: 1.2 for 20% headroom, e.g., 1.4 for 40%)
  322. }