costmodel.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package costmodel
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "os/signal"
  8. "syscall"
  9. "time"
  10. "github.com/julienschmidt/httprouter"
  11. "github.com/opencost/opencost/core/pkg/util/apiutil"
  12. "github.com/opencost/opencost/pkg/cloudcost"
  13. "github.com/opencost/opencost/pkg/currency"
  14. "github.com/opencost/opencost/pkg/customcost"
  15. "github.com/prometheus/client_golang/prometheus/promhttp"
  16. "github.com/rs/cors"
  17. mcp_sdk "github.com/modelcontextprotocol/go-sdk/mcp"
  18. "github.com/opencost/opencost/core/pkg/errors"
  19. "github.com/opencost/opencost/core/pkg/log"
  20. "github.com/opencost/opencost/core/pkg/version"
  21. "github.com/opencost/opencost/pkg/costmodel"
  22. "github.com/opencost/opencost/pkg/env"
  23. "github.com/opencost/opencost/pkg/filemanager"
  24. opencost_mcp "github.com/opencost/opencost/pkg/mcp"
  25. "github.com/opencost/opencost/pkg/metrics"
  26. )
  27. const shutdownTimeout = 30 * time.Second
  28. func Execute(conf *Config) error {
  29. log.Infof("Starting cost-model version %s", version.FriendlyVersion())
  30. if conf == nil {
  31. conf = DefaultConfig()
  32. }
  33. conf.log()
  34. // Create cancellable context for graceful shutdown
  35. ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
  36. defer cancel()
  37. router := httprouter.New()
  38. var a *costmodel.Accesses
  39. // Initialize currency converter
  40. var currencyConverter currency.Converter
  41. currencyAPIKey := env.GetCurrencyAPIKey()
  42. currencyProvider := env.GetCurrencyProvider()
  43. if currencyProvider == "exchangerateapi" && currencyAPIKey != "" {
  44. config := currency.Config{
  45. APIKey: currencyAPIKey,
  46. CacheTTL: 24 * time.Hour,
  47. APITimeout: 10 * time.Second,
  48. }
  49. var err error
  50. currencyConverter, err = currency.NewConverter(config)
  51. if err != nil {
  52. log.Errorf("Failed to initialize currency converter: %v", err)
  53. log.Warnf("Currency conversion disabled - will return USD values only")
  54. } else {
  55. log.Infof("Currency converter initialized successfully")
  56. }
  57. } else {
  58. log.Infof("Currency conversion disabled (provider=%s, hasKey=%v)",
  59. currencyProvider, currencyAPIKey != "")
  60. }
  61. if conf.KubernetesEnabled {
  62. a = costmodel.Initialize(router)
  63. // Set currency converter in Accesses
  64. if a != nil {
  65. a.CurrencyConverter = currencyConverter
  66. }
  67. err := StartExportWorker(context.Background(), a.Model)
  68. if err != nil {
  69. log.Errorf("couldn't start CSV export worker: %v", err)
  70. }
  71. // Register OpenCost Specific Endpoints
  72. router.GET("/allocation", a.ComputeAllocationHandler)
  73. router.GET("/allocation/summary", a.ComputeAllocationHandlerSummary)
  74. router.GET("/assets", a.ComputeAssetsHandler)
  75. if conf.CarbonEstimatesEnabled {
  76. router.GET("/assets/carbon", a.ComputeAssetsCarbonHandler)
  77. }
  78. }
  79. var cloudCostPipelineService *cloudcost.PipelineService
  80. if conf.CloudCostEnabled {
  81. cloudCostPipelineService = costmodel.InitializeCloudCost(router, currencyConverter)
  82. }
  83. var customCostPipelineService *customcost.PipelineService
  84. if conf.CustomCostEnabled {
  85. customCostPipelineService = costmodel.InitializeCustomCost(router, currencyConverter)
  86. }
  87. // this endpoint is intentionally left out of the "if env.IsCustomCostEnabled()" conditional; in the handler, it is
  88. // valid for CustomCostPipelineService to be nil
  89. router.GET("/customCost/status", customCostPipelineService.GetCustomCostStatusHandler())
  90. // Initialize MCP Server if enabled and Kubernetes is available
  91. if conf.MCPServerEnabled && a != nil {
  92. // Get cloud cost querier if cloud costs are enabled
  93. var cloudCostQuerier cloudcost.Querier
  94. if conf.CloudCostEnabled && cloudCostPipelineService != nil {
  95. cloudCostQuerier = cloudCostPipelineService.GetCloudCostQuerier()
  96. }
  97. err := StartMCPServer(ctx, a, cloudCostQuerier)
  98. if err != nil {
  99. log.Errorf("Failed to start MCP server: %v", err)
  100. }
  101. } else if conf.MCPServerEnabled {
  102. log.Warnf("MCP Server is enabled but Kubernetes is not available. MCP server requires Kubernetes to function.")
  103. } else {
  104. if value, exists := os.LookupEnv(env.MCPServerEnabledEnvVar); !exists || value == "" {
  105. log.Infof("MCP server is now disabled by default. If you wish to use the MCP server, please set the %s environment variable to true.", env.MCPServerEnabledEnvVar)
  106. }
  107. }
  108. apiutil.ApplyContainerDiagnosticEndpoints(router)
  109. rootMux := http.NewServeMux()
  110. rootMux.Handle("/", router)
  111. rootMux.Handle("/metrics", promhttp.Handler())
  112. telemetryHandler := metrics.ResponseMetricMiddleware(rootMux)
  113. handler := cors.AllowAll().Handler(telemetryHandler)
  114. server := &http.Server{
  115. Addr: fmt.Sprint(":", conf.Port),
  116. Handler: errors.PanicHandlerMiddleware(handler),
  117. }
  118. serverErrors := make(chan error, 1)
  119. go func() {
  120. log.Infof("HTTP server starting on port %d", conf.Port)
  121. serverErrors <- server.ListenAndServe()
  122. }()
  123. select {
  124. case err := <-serverErrors:
  125. if err != nil && err != http.ErrServerClosed {
  126. return err
  127. }
  128. return nil
  129. case <-ctx.Done():
  130. log.Infof("Shutdown signal received, starting graceful shutdown...")
  131. if customCostPipelineService != nil {
  132. customCostPipelineService.Stop()
  133. }
  134. shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
  135. defer shutdownCancel()
  136. if err := server.Shutdown(shutdownCtx); err != nil {
  137. log.Errorf("Error during server shutdown: %v", err)
  138. server.Close()
  139. return err
  140. }
  141. log.Infof("Graceful shutdown completed")
  142. return nil
  143. }
  144. }
  145. func StartExportWorker(ctx context.Context, model costmodel.AllocationModel) error {
  146. exportPath := env.GetExportCSVFile()
  147. if exportPath == "" {
  148. log.Infof("%s is not set, CSV export is disabled", env.ExportCSVFile)
  149. return nil
  150. }
  151. fm, err := filemanager.NewFileManager(exportPath)
  152. if err != nil {
  153. return fmt.Errorf("could not create file manager: %v", err)
  154. }
  155. go func() {
  156. log.Info("Starting CSV exporter worker...")
  157. // perform first update immediately
  158. nextRunAt := time.Now()
  159. for {
  160. select {
  161. case <-ctx.Done():
  162. return
  163. case <-time.After(time.Until(nextRunAt)):
  164. err := costmodel.UpdateCSV(ctx, fm, model, env.GetExportCSVLabelsAll(), env.GetExportCSVLabelsList())
  165. if err != nil {
  166. // it's background worker, log error and carry on, maybe next time it will work
  167. log.Errorf("Error updating CSV: %s", err)
  168. }
  169. now := time.Now().UTC()
  170. // next launch is at 00:10 UTC tomorrow
  171. // extra 10 minutes is to let prometheus to collect all the data for the previous day
  172. nextRunAt = time.Date(now.Year(), now.Month(), now.Day(), 0, 10, 0, 0, now.Location()).AddDate(0, 0, 1)
  173. }
  174. }
  175. }()
  176. return nil
  177. }
  178. // StartMCPServer starts the MCP server as a background service
  179. func StartMCPServer(ctx context.Context, accesses *costmodel.Accesses, cloudCostQuerier cloudcost.Querier) error {
  180. log.Info("Initializing MCP server...")
  181. // Create MCP server using existing OpenCost dependencies
  182. mcpServer := opencost_mcp.NewMCPServer(accesses.Model, accesses.CloudProvider, cloudCostQuerier)
  183. // Create MCP SDK server
  184. sdkServer := mcp_sdk.NewServer(&mcp_sdk.Implementation{
  185. Name: "opencost-mcp-server",
  186. Version: version.Version,
  187. }, nil)
  188. // Define tool handlers
  189. handleAllocationCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args AllocationArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  190. // Parse step duration if provided
  191. var step time.Duration
  192. var err error
  193. if args.Step != "" {
  194. step, err = time.ParseDuration(args.Step)
  195. if err != nil {
  196. return nil, nil, fmt.Errorf("invalid step duration '%s': %w", args.Step, err)
  197. }
  198. }
  199. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  200. QueryType: opencost_mcp.AllocationQueryType,
  201. Window: args.Window,
  202. AllocationParams: &opencost_mcp.AllocationQuery{
  203. Step: step,
  204. Accumulate: args.Accumulate,
  205. ShareIdle: args.ShareIdle,
  206. Aggregate: args.Aggregate,
  207. IncludeIdle: args.IncludeIdle,
  208. IdleByNode: args.IdleByNode,
  209. IncludeProportionalAssetResourceCosts: args.IncludeProportionalAssetResourceCosts,
  210. IncludeAggregatedMetadata: args.IncludeAggregatedMetadata,
  211. ShareLB: args.ShareLB,
  212. Filter: args.Filter,
  213. },
  214. }
  215. mcpReq := &opencost_mcp.MCPRequest{
  216. Query: queryRequest,
  217. }
  218. mcpResp, err := mcpServer.ProcessMCPRequest(ctx, mcpReq)
  219. if err != nil {
  220. return nil, nil, fmt.Errorf("failed to process allocation request: %w", err)
  221. }
  222. return nil, mcpResp, nil
  223. }
  224. handleAssetCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args AssetArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  225. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  226. QueryType: opencost_mcp.AssetQueryType,
  227. Window: args.Window,
  228. AssetParams: &opencost_mcp.AssetQuery{},
  229. }
  230. mcpReq := &opencost_mcp.MCPRequest{
  231. Query: queryRequest,
  232. }
  233. mcpResp, err := mcpServer.ProcessMCPRequest(ctx, mcpReq)
  234. if err != nil {
  235. return nil, nil, fmt.Errorf("failed to process asset request: %w", err)
  236. }
  237. return nil, mcpResp, nil
  238. }
  239. handleCloudCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args CloudCostArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  240. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  241. QueryType: opencost_mcp.CloudCostQueryType,
  242. Window: args.Window,
  243. CloudCostParams: &opencost_mcp.CloudCostQuery{
  244. Aggregate: args.Aggregate,
  245. Accumulate: args.Accumulate,
  246. Filter: args.Filter,
  247. Provider: args.Provider,
  248. Service: args.Service,
  249. Category: args.Category,
  250. Region: args.Region,
  251. AccountID: args.Account,
  252. },
  253. }
  254. mcpReq := &opencost_mcp.MCPRequest{
  255. Query: queryRequest,
  256. }
  257. mcpResp, err := mcpServer.ProcessMCPRequest(ctx, mcpReq)
  258. if err != nil {
  259. return nil, nil, fmt.Errorf("failed to process cloud cost request: %w", err)
  260. }
  261. return nil, mcpResp, nil
  262. }
  263. handleEfficiency := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args EfficiencyArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  264. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  265. QueryType: opencost_mcp.EfficiencyQueryType,
  266. Window: args.Window,
  267. EfficiencyParams: &opencost_mcp.EfficiencyQuery{
  268. Aggregate: args.Aggregate,
  269. Filter: args.Filter,
  270. EfficiencyBufferMultiplier: args.BufferMultiplier,
  271. },
  272. }
  273. mcpReq := &opencost_mcp.MCPRequest{
  274. Query: queryRequest,
  275. }
  276. mcpResp, err := mcpServer.ProcessMCPRequest(ctx, mcpReq)
  277. if err != nil {
  278. return nil, nil, fmt.Errorf("failed to process efficiency request: %w", err)
  279. }
  280. return nil, mcpResp, nil
  281. }
  282. // Register tools
  283. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  284. Name: "get_allocation_costs",
  285. Description: "Retrieves allocation cost data.",
  286. }, handleAllocationCosts)
  287. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  288. Name: "get_asset_costs",
  289. Description: "Retrieves asset cost data.",
  290. }, handleAssetCosts)
  291. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  292. Name: "get_cloud_costs",
  293. Description: "Retrieves cloud cost data.",
  294. }, handleCloudCosts)
  295. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  296. Name: "get_efficiency",
  297. Description: "Retrieves resource efficiency metrics with rightsizing recommendations and cost savings analysis. Computes CPU and memory efficiency (usage/request ratio), provides recommended resource requests, and calculates potential cost savings. Optional buffer_multiplier parameter (default: 1.2 for 20% headroom) can be set to values like 1.4 for 40% headroom.",
  298. }, handleEfficiency)
  299. // Create HTTP handler
  300. handler := mcp_sdk.NewStreamableHTTPHandler(func(r *http.Request) *mcp_sdk.Server {
  301. return sdkServer
  302. }, &mcp_sdk.StreamableHTTPOptions{
  303. JSONResponse: true,
  304. })
  305. // Add logging middleware
  306. loggingHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  307. log.Debugf("MCP HTTP request: %s %s from %s", req.Method, req.URL.Path, req.RemoteAddr)
  308. handler.ServeHTTP(w, req)
  309. })
  310. // Start HTTP server on configured port
  311. port := env.GetMCPHTTPPort()
  312. log.Infof("Starting MCP HTTP server on port %d...", port)
  313. server := &http.Server{
  314. Addr: fmt.Sprintf(":%d", port),
  315. Handler: loggingHandler,
  316. }
  317. // Start server in a goroutine
  318. go func() {
  319. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  320. log.Errorf("MCP server failed: %v", err)
  321. }
  322. }()
  323. // Graceful shutdown goroutine
  324. go func() {
  325. <-ctx.Done()
  326. log.Info("Shutting down MCP server...")
  327. shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  328. defer cancel()
  329. if err := server.Shutdown(shutdownCtx); err != nil {
  330. log.Errorf("MCP server shutdown error: %v", err)
  331. } else {
  332. log.Info("MCP server shut down successfully")
  333. }
  334. }()
  335. log.Info("MCP server started successfully")
  336. return nil
  337. }
  338. // Tool argument structures for MCP server
  339. type AllocationArgs struct {
  340. Window string `json:"window"`
  341. Aggregate string `json:"aggregate"`
  342. // Allocation query parameters
  343. Step string `json:"step,omitempty"`
  344. Resolution string `json:"resolution,omitempty"`
  345. Accumulate bool `json:"accumulate,omitempty"`
  346. ShareIdle bool `json:"share_idle,omitempty"`
  347. IncludeIdle bool `json:"include_idle,omitempty"`
  348. IdleByNode bool `json:"idle_by_node,omitempty"`
  349. IncludeProportionalAssetResourceCosts bool `json:"include_proportional_asset_resource_costs,omitempty"`
  350. IncludeAggregatedMetadata bool `json:"include_aggregated_metadata,omitempty"`
  351. ShareLB bool `json:"share_lb,omitempty"`
  352. Filter string `json:"filter,omitempty"`
  353. }
  354. type AssetArgs struct {
  355. Window string `json:"window"`
  356. }
  357. type CloudCostArgs struct {
  358. Window string `json:"window"`
  359. Aggregate string `json:"aggregate"`
  360. // Cloud cost query parameters
  361. Accumulate string `json:"accumulate,omitempty"`
  362. Filter string `json:"filter,omitempty"`
  363. Provider string `json:"provider,omitempty"`
  364. Service string `json:"service,omitempty"`
  365. Category string `json:"category,omitempty"`
  366. Region string `json:"region,omitempty"`
  367. Account string `json:"account,omitempty"`
  368. }
  369. type EfficiencyArgs struct {
  370. Window string `json:"window"` // Time window (e.g., "today", "yesterday", "7d", "lastweek")
  371. Aggregate string `json:"aggregate,omitempty"` // Aggregation level (e.g., "pod", "namespace", "controller")
  372. Filter string `json:"filter,omitempty"` // Filter expression (same as allocation filters)
  373. BufferMultiplier *float64 `json:"buffer_multiplier,omitempty"` // Buffer multiplier for recommendations (default: 1.2 for 20% headroom, e.g., 1.4 for 40%)
  374. }