costmodel.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package costmodel
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/julienschmidt/httprouter"
  8. "github.com/opencost/opencost/core/pkg/util/apiutil"
  9. "github.com/opencost/opencost/pkg/cloud/models"
  10. "github.com/opencost/opencost/pkg/cloud/provider"
  11. "github.com/opencost/opencost/pkg/cloudcost"
  12. "github.com/opencost/opencost/pkg/customcost"
  13. "github.com/prometheus/client_golang/prometheus/promhttp"
  14. "github.com/rs/cors"
  15. mcp_sdk "github.com/modelcontextprotocol/go-sdk/mcp"
  16. "github.com/opencost/opencost/core/pkg/errors"
  17. "github.com/opencost/opencost/core/pkg/log"
  18. "github.com/opencost/opencost/core/pkg/version"
  19. "github.com/opencost/opencost/pkg/costmodel"
  20. "github.com/opencost/opencost/pkg/env"
  21. "github.com/opencost/opencost/pkg/filemanager"
  22. opencost_mcp "github.com/opencost/opencost/pkg/mcp"
  23. "github.com/opencost/opencost/pkg/metrics"
  24. )
  25. func Execute(conf *Config) error {
  26. log.Infof("Starting cost-model version %s", version.FriendlyVersion())
  27. if conf == nil {
  28. conf = DefaultConfig()
  29. }
  30. conf.log()
  31. router := httprouter.New()
  32. var a *costmodel.Accesses
  33. var cp models.Provider
  34. if conf.KubernetesEnabled {
  35. a = costmodel.Initialize(router)
  36. err := StartExportWorker(context.Background(), a.Model)
  37. if err != nil {
  38. log.Errorf("couldn't start CSV export worker: %v", err)
  39. }
  40. // Register OpenCost Specific Endpoints
  41. router.GET("/allocation", a.ComputeAllocationHandler)
  42. router.GET("/allocation/summary", a.ComputeAllocationHandlerSummary)
  43. router.GET("/assets", a.ComputeAssetsHandler)
  44. if conf.CarbonEstimatesEnabled {
  45. router.GET("/assets/carbon", a.ComputeAssetsCarbonHandler)
  46. }
  47. // set cloud provider for cloud cost
  48. cp = a.CloudProvider
  49. }
  50. var cloudCostPipelineService *cloudcost.PipelineService
  51. if conf.CloudCostEnabled {
  52. var providerConfig models.ProviderConfig
  53. if cp != nil {
  54. providerConfig = provider.ExtractConfigFromProviders(cp)
  55. }
  56. cloudCostPipelineService = costmodel.InitializeCloudCost(router, providerConfig)
  57. }
  58. var customCostPipelineService *customcost.PipelineService
  59. if conf.CloudCostEnabled {
  60. customCostPipelineService = costmodel.InitializeCustomCost(router)
  61. }
  62. // this endpoint is intentionally left out of the "if env.IsCustomCostEnabled()" conditional; in the handler, it is
  63. // valid for CustomCostPipelineService to be nil
  64. router.GET("/customCost/status", customCostPipelineService.GetCustomCostStatusHandler())
  65. // Initialize MCP Server if enabled and Kubernetes is available
  66. if conf.MCPServerEnabled && a != nil {
  67. // Get cloud cost querier if cloud costs are enabled
  68. var cloudCostQuerier cloudcost.Querier
  69. if conf.CloudCostEnabled && cloudCostPipelineService != nil {
  70. cloudCostQuerier = cloudCostPipelineService.GetCloudCostQuerier()
  71. }
  72. err := StartMCPServer(context.Background(), a, cloudCostQuerier)
  73. if err != nil {
  74. log.Errorf("Failed to start MCP server: %v", err)
  75. }
  76. } else if conf.MCPServerEnabled {
  77. log.Warnf("MCP Server is enabled but Kubernetes is not available. MCP server requires Kubernetes to function.")
  78. }
  79. apiutil.ApplyContainerDiagnosticEndpoints(router)
  80. rootMux := http.NewServeMux()
  81. rootMux.Handle("/", router)
  82. rootMux.Handle("/metrics", promhttp.Handler())
  83. telemetryHandler := metrics.ResponseMetricMiddleware(rootMux)
  84. handler := cors.AllowAll().Handler(telemetryHandler)
  85. return http.ListenAndServe(fmt.Sprint(":", conf.Port), errors.PanicHandlerMiddleware(handler))
  86. }
  87. func StartExportWorker(ctx context.Context, model costmodel.AllocationModel) error {
  88. exportPath := env.GetExportCSVFile()
  89. if exportPath == "" {
  90. log.Infof("%s is not set, CSV export is disabled", env.ExportCSVFile)
  91. return nil
  92. }
  93. fm, err := filemanager.NewFileManager(exportPath)
  94. if err != nil {
  95. return fmt.Errorf("could not create file manager: %v", err)
  96. }
  97. go func() {
  98. log.Info("Starting CSV exporter worker...")
  99. // perform first update immediately
  100. nextRunAt := time.Now()
  101. for {
  102. select {
  103. case <-ctx.Done():
  104. return
  105. case <-time.After(time.Until(nextRunAt)):
  106. err := costmodel.UpdateCSV(ctx, fm, model, env.GetExportCSVLabelsAll(), env.GetExportCSVLabelsList())
  107. if err != nil {
  108. // it's background worker, log error and carry on, maybe next time it will work
  109. log.Errorf("Error updating CSV: %s", err)
  110. }
  111. now := time.Now().UTC()
  112. // next launch is at 00:10 UTC tomorrow
  113. // extra 10 minutes is to let prometheus to collect all the data for the previous day
  114. nextRunAt = time.Date(now.Year(), now.Month(), now.Day(), 0, 10, 0, 0, now.Location()).AddDate(0, 0, 1)
  115. }
  116. }
  117. }()
  118. return nil
  119. }
  120. // StartMCPServer starts the MCP server as a background service
  121. func StartMCPServer(ctx context.Context, accesses *costmodel.Accesses, cloudCostQuerier cloudcost.Querier) error {
  122. log.Info("Initializing MCP server...")
  123. // Create MCP server using existing OpenCost dependencies
  124. mcpServer := opencost_mcp.NewMCPServer(accesses.Model, accesses.CloudProvider, cloudCostQuerier)
  125. // Create MCP SDK server
  126. sdkServer := mcp_sdk.NewServer(&mcp_sdk.Implementation{
  127. Name: "opencost-mcp-server",
  128. Version: version.Version,
  129. }, nil)
  130. // Define tool handlers
  131. handleAllocationCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args AllocationArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  132. // Parse step duration if provided
  133. var step time.Duration
  134. var err error
  135. if args.Step != "" {
  136. step, err = time.ParseDuration(args.Step)
  137. if err != nil {
  138. return nil, nil, fmt.Errorf("invalid step duration '%s': %w", args.Step, err)
  139. }
  140. }
  141. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  142. QueryType: opencost_mcp.AllocationQueryType,
  143. Window: args.Window,
  144. AllocationParams: &opencost_mcp.AllocationQuery{
  145. Step: step,
  146. Accumulate: args.Accumulate,
  147. ShareIdle: args.ShareIdle,
  148. Aggregate: args.Aggregate,
  149. IncludeIdle: args.IncludeIdle,
  150. IdleByNode: args.IdleByNode,
  151. IncludeProportionalAssetResourceCosts: args.IncludeProportionalAssetResourceCosts,
  152. IncludeAggregatedMetadata: args.IncludeAggregatedMetadata,
  153. ShareLB: args.ShareLB,
  154. Filter: args.Filter,
  155. },
  156. }
  157. mcpReq := &opencost_mcp.MCPRequest{
  158. Query: queryRequest,
  159. }
  160. mcpResp, err := mcpServer.ProcessMCPRequest(mcpReq)
  161. if err != nil {
  162. return nil, nil, fmt.Errorf("failed to process allocation request: %w", err)
  163. }
  164. return nil, mcpResp, nil
  165. }
  166. handleAssetCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args AssetArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  167. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  168. QueryType: opencost_mcp.AssetQueryType,
  169. Window: args.Window,
  170. AssetParams: &opencost_mcp.AssetQuery{},
  171. }
  172. mcpReq := &opencost_mcp.MCPRequest{
  173. Query: queryRequest,
  174. }
  175. mcpResp, err := mcpServer.ProcessMCPRequest(mcpReq)
  176. if err != nil {
  177. return nil, nil, fmt.Errorf("failed to process asset request: %w", err)
  178. }
  179. return nil, mcpResp, nil
  180. }
  181. handleCloudCosts := func(ctx context.Context, req *mcp_sdk.CallToolRequest, args CloudCostArgs) (*mcp_sdk.CallToolResult, interface{}, error) {
  182. queryRequest := &opencost_mcp.OpenCostQueryRequest{
  183. QueryType: opencost_mcp.CloudCostQueryType,
  184. Window: args.Window,
  185. CloudCostParams: &opencost_mcp.CloudCostQuery{
  186. Aggregate: args.Aggregate,
  187. Accumulate: args.Accumulate,
  188. Filter: args.Filter,
  189. Provider: args.Provider,
  190. Service: args.Service,
  191. Category: args.Category,
  192. Region: args.Region,
  193. AccountID: args.Account,
  194. },
  195. }
  196. mcpReq := &opencost_mcp.MCPRequest{
  197. Query: queryRequest,
  198. }
  199. mcpResp, err := mcpServer.ProcessMCPRequest(mcpReq)
  200. if err != nil {
  201. return nil, nil, fmt.Errorf("failed to process cloud cost request: %w", err)
  202. }
  203. return nil, mcpResp, nil
  204. }
  205. // Register tools
  206. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  207. Name: "get_allocation_costs",
  208. Description: "Retrieves allocation cost data.",
  209. }, handleAllocationCosts)
  210. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  211. Name: "get_asset_costs",
  212. Description: "Retrieves asset cost data.",
  213. }, handleAssetCosts)
  214. mcp_sdk.AddTool(sdkServer, &mcp_sdk.Tool{
  215. Name: "get_cloud_costs",
  216. Description: "Retrieves cloud cost data.",
  217. }, handleCloudCosts)
  218. // Create HTTP handler
  219. handler := mcp_sdk.NewStreamableHTTPHandler(func(r *http.Request) *mcp_sdk.Server {
  220. return sdkServer
  221. }, &mcp_sdk.StreamableHTTPOptions{
  222. JSONResponse: true,
  223. })
  224. // Add logging middleware
  225. loggingHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  226. log.Debugf("MCP HTTP request: %s %s from %s", req.Method, req.URL.Path, req.RemoteAddr)
  227. handler.ServeHTTP(w, req)
  228. })
  229. // Start HTTP server on configured port
  230. port := env.GetMCPHTTPPort()
  231. log.Infof("Starting MCP HTTP server on port %d...", port)
  232. server := &http.Server{
  233. Addr: fmt.Sprintf(":%d", port),
  234. Handler: loggingHandler,
  235. }
  236. // Start server in a goroutine
  237. go func() {
  238. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  239. log.Errorf("MCP server failed: %v", err)
  240. }
  241. }()
  242. log.Info("MCP server started successfully")
  243. return nil
  244. }
  245. // Tool argument structures for MCP server
  246. type AllocationArgs struct {
  247. Window string `json:"window"`
  248. Aggregate string `json:"aggregate"`
  249. // Allocation query parameters
  250. Step string `json:"step,omitempty"`
  251. Resolution string `json:"resolution,omitempty"`
  252. Accumulate bool `json:"accumulate,omitempty"`
  253. ShareIdle bool `json:"share_idle,omitempty"`
  254. IncludeIdle bool `json:"include_idle,omitempty"`
  255. IdleByNode bool `json:"idle_by_node,omitempty"`
  256. IncludeProportionalAssetResourceCosts bool `json:"include_proportional_asset_resource_costs,omitempty"`
  257. IncludeAggregatedMetadata bool `json:"include_aggregated_metadata,omitempty"`
  258. ShareLB bool `json:"share_lb,omitempty"`
  259. Filter string `json:"filter,omitempty"`
  260. }
  261. type AssetArgs struct {
  262. Window string `json:"window"`
  263. }
  264. type CloudCostArgs struct {
  265. Window string `json:"window"`
  266. Aggregate string `json:"aggregate"`
  267. // Cloud cost query parameters
  268. Accumulate string `json:"accumulate,omitempty"`
  269. Filter string `json:"filter,omitempty"`
  270. Provider string `json:"provider,omitempty"`
  271. Service string `json:"service,omitempty"`
  272. Category string `json:"category,omitempty"`
  273. Region string `json:"region,omitempty"`
  274. Account string `json:"account,omitempty"`
  275. }