recommender.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. //go:build ee
  2. /*
  3. === Recommender Job ===
  4. This job checks to see if a cluster matches policies set by the OPA config file.
  5. */
  6. package jobs
  7. import (
  8. "context"
  9. "errors"
  10. "fmt"
  11. "log"
  12. "time"
  13. "github.com/mitchellh/mapstructure"
  14. "github.com/porter-dev/porter/api/server/shared/config/env"
  15. "github.com/porter-dev/porter/api/server/shared/requestutils"
  16. "github.com/porter-dev/porter/api/types"
  17. "github.com/porter-dev/porter/ee/integrations/vault"
  18. "github.com/porter-dev/porter/internal/encryption"
  19. "github.com/porter-dev/porter/internal/kubernetes"
  20. "github.com/porter-dev/porter/internal/models"
  21. "github.com/porter-dev/porter/internal/oauth"
  22. "github.com/porter-dev/porter/internal/opa"
  23. "github.com/porter-dev/porter/internal/repository"
  24. rcreds "github.com/porter-dev/porter/internal/repository/credentials"
  25. rgorm "github.com/porter-dev/porter/internal/repository/gorm"
  26. "golang.org/x/oauth2"
  27. "gorm.io/gorm"
  28. )
  29. type recommender struct {
  30. enqueueTime time.Time
  31. db *gorm.DB
  32. repo repository.Repository
  33. doConf *oauth2.Config
  34. clusterAndProjectIDs []clusterAndProjectID
  35. categories []string
  36. policies *opa.KubernetesPolicies
  37. runRecommenderID string
  38. }
  39. // RecommenderOpts holds the options required to run this job
  40. type RecommenderOpts struct {
  41. DBConf *env.DBConf
  42. DOClientID string
  43. DOClientSecret string
  44. DOScopes []string
  45. ServerURL string
  46. LegacyProjectIDs []uint
  47. Input map[string]interface{}
  48. }
  49. type recommenderInput struct {
  50. Projects []uint `mapstructure:"projects"`
  51. ClusterID uint `mapstructure:"cluster_id"`
  52. Priority string `mapstructure:"priority"`
  53. Categories []string `mapstructure:"categories"`
  54. }
  55. type clusterAndProjectID struct {
  56. clusterID uint
  57. projectID uint
  58. }
  59. func NewRecommender(
  60. db *gorm.DB,
  61. enqueueTime time.Time,
  62. opts *RecommenderOpts,
  63. opaPolicies *opa.KubernetesPolicies,
  64. ) (*recommender, error) {
  65. var credBackend rcreds.CredentialStorage
  66. if opts.DBConf.VaultAPIKey != "" && opts.DBConf.VaultServerURL != "" && opts.DBConf.VaultPrefix != "" {
  67. credBackend = vault.NewClient(
  68. opts.DBConf.VaultServerURL,
  69. opts.DBConf.VaultAPIKey,
  70. opts.DBConf.VaultPrefix,
  71. )
  72. }
  73. var key [32]byte
  74. for i, b := range []byte(opts.DBConf.EncryptionKey) {
  75. key[i] = b
  76. }
  77. repo := rgorm.NewRepository(db, &key, credBackend)
  78. doConf := oauth.NewDigitalOceanClient(&oauth.Config{
  79. ClientID: opts.DOClientID,
  80. ClientSecret: opts.DOClientSecret,
  81. Scopes: opts.DOScopes,
  82. BaseURL: opts.ServerURL,
  83. })
  84. // parse input
  85. parsedInput := &recommenderInput{}
  86. err := mapstructure.Decode(opts.Input, parsedInput)
  87. if err != nil {
  88. return nil, err
  89. }
  90. // validate
  91. validator := requestutils.NewDefaultValidator()
  92. if requestErr := validator.Validate(parsedInput); requestErr != nil {
  93. return nil, fmt.Errorf(requestErr.Error())
  94. }
  95. clusterIDs, err := getClustersToParse(db, repo.Cluster(), parsedInput, opts.LegacyProjectIDs)
  96. if err != nil {
  97. return nil, err
  98. }
  99. recommenderID, err := encryption.GenerateRandomBytes(32)
  100. if err != nil {
  101. return nil, err
  102. }
  103. return &recommender{
  104. enqueueTime, db, repo, doConf, clusterIDs, parsedInput.Categories, opaPolicies, string(recommenderID),
  105. }, nil
  106. }
  107. func getClustersToParse(db *gorm.DB, clusterRepo repository.ClusterRepository, input *recommenderInput, legacyProjects []uint) ([]clusterAndProjectID, error) {
  108. // if the project and cluster ID is set, make sure that the project id matches the cluster's
  109. // project id
  110. if input.ClusterID != 0 {
  111. if len(input.Projects) != 1 {
  112. return nil, fmt.Errorf("if cluster ID is passed, you must pass the matching project ID")
  113. }
  114. _, err := clusterRepo.ReadCluster(input.Projects[0], input.ClusterID)
  115. if err != nil {
  116. return nil, err
  117. }
  118. return []clusterAndProjectID{{
  119. clusterID: input.ClusterID,
  120. projectID: input.Projects[0],
  121. }}, nil
  122. }
  123. // if there are no projects set, query for all clusters within the relevant projects
  124. clusters := make([]*models.Cluster, 0)
  125. query := db.Where(`clusters.project_id IN (?) OR clusters.project_id IN (
  126. SELECT p2.id FROM projects AS p2
  127. INNER JOIN project_usages ON p2.id=project_usages.project_id
  128. WHERE project_usages.resource_cpu != 10 AND project_usages.resource_memory != 20000 AND project_usages.clusters != 1 AND project_usages.users != 1
  129. )`, legacyProjects)
  130. if err := query.Find(&clusters).Error; err != nil {
  131. return nil, err
  132. }
  133. res := make([]clusterAndProjectID, 0)
  134. for _, cluster := range clusters {
  135. res = append(res, clusterAndProjectID{
  136. clusterID: cluster.ID,
  137. projectID: cluster.ProjectID,
  138. })
  139. }
  140. return res, nil
  141. }
  142. func (n *recommender) ID() string {
  143. return "recommender"
  144. }
  145. func (n *recommender) EnqueueTime() time.Time {
  146. return n.enqueueTime
  147. }
  148. func (n *recommender) Run(ctx context.Context) error {
  149. for _, ids := range n.clusterAndProjectIDs {
  150. fmt.Println(ids.projectID, ids.clusterID)
  151. cluster, err := n.repo.Cluster().ReadCluster(ids.projectID, ids.clusterID)
  152. if err != nil {
  153. log.Printf("error reading cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  154. continue
  155. }
  156. k8sAgent, err := kubernetes.GetAgentOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
  157. Cluster: cluster,
  158. Repo: n.repo,
  159. DigitalOceanOAuth: n.doConf,
  160. AllowInClusterConnections: false,
  161. Timeout: 5 * time.Second,
  162. })
  163. if err != nil {
  164. log.Printf("error getting k8s agent for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  165. continue
  166. }
  167. dynamicClient, err := kubernetes.GetDynamicClientOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
  168. Cluster: cluster,
  169. Repo: n.repo,
  170. DigitalOceanOAuth: n.doConf,
  171. AllowInClusterConnections: false,
  172. })
  173. if err != nil {
  174. log.Printf("error getting dynamic client for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  175. continue
  176. }
  177. runner := opa.NewRunner(n.policies, cluster, k8sAgent, dynamicClient)
  178. queryResults, err := runner.GetRecommendations(n.categories)
  179. if err != nil {
  180. log.Printf("error querying opa policies for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  181. continue
  182. }
  183. for _, queryRes := range queryResults {
  184. fmt.Println(queryRes.ObjectID, queryRes.Allow, queryRes.PolicyTitle, queryRes.PolicyMessage)
  185. monitor, err := n.repo.MonitorTestResult().ReadMonitorTestResult(ids.projectID, ids.clusterID, queryRes.ObjectID)
  186. if err != nil {
  187. if errors.Is(err, gorm.ErrRecordNotFound) {
  188. monitor, err = n.repo.MonitorTestResult().CreateMonitorTestResult(n.getMonitorTestResultFromQueryResult(cluster, queryRes, n.runRecommenderID))
  189. } else {
  190. continue
  191. }
  192. } else {
  193. monitor, err = n.repo.MonitorTestResult().UpdateMonitorTestResult(mergeMonitorTestResultFromQueryResult(monitor, queryRes, n.runRecommenderID))
  194. }
  195. if err != nil {
  196. continue
  197. }
  198. }
  199. err = n.repo.MonitorTestResult().ArchiveMonitorTestResults(ids.projectID, ids.clusterID, n.runRecommenderID)
  200. if err != nil {
  201. log.Printf("error archiving test results for cluster ID %d: %v", ids.clusterID, err)
  202. continue
  203. }
  204. err = n.repo.MonitorTestResult().DeleteOldMonitorTestResults(ids.projectID, ids.clusterID, n.runRecommenderID)
  205. if err != nil {
  206. log.Printf("error deleting old test results for cluster ID %d: %v", ids.clusterID, err)
  207. continue
  208. }
  209. }
  210. return nil
  211. }
  212. func (n *recommender) getMonitorTestResultFromQueryResult(cluster *models.Cluster, queryRes *opa.OPARecommenderQueryResult, recommenderID string) *models.MonitorTestResult {
  213. runResult := types.MonitorTestStatusSuccess
  214. if !queryRes.Allow {
  215. runResult = types.MonitorTestStatusFailed
  216. }
  217. currTime := time.Now()
  218. return &models.MonitorTestResult{
  219. ProjectID: cluster.ProjectID,
  220. ClusterID: cluster.ID,
  221. Category: queryRes.CategoryName,
  222. ObjectID: queryRes.ObjectID,
  223. LastStatusChange: &currTime,
  224. LastTested: &currTime,
  225. LastRunResult: string(runResult),
  226. LastRunResultEnum: models.GetLastRunResultEnum(string(runResult)),
  227. LastRecommenderRunID: recommenderID,
  228. Title: queryRes.PolicyTitle,
  229. Message: queryRes.PolicyMessage,
  230. Severity: queryRes.PolicySeverity,
  231. SeverityEnum: models.GetSeverityEnum(queryRes.PolicySeverity),
  232. Archived: false,
  233. }
  234. }
  235. func mergeMonitorTestResultFromQueryResult(monitor *models.MonitorTestResult, queryRes *opa.OPARecommenderQueryResult, recommenderID string) *models.MonitorTestResult {
  236. runResult := types.MonitorTestStatusSuccess
  237. if !queryRes.Allow {
  238. runResult = types.MonitorTestStatusFailed
  239. }
  240. currTime := time.Now()
  241. if isStatusChange := monitor.LastRunResult == string(runResult); isStatusChange {
  242. monitor.LastStatusChange = &currTime
  243. }
  244. monitor.LastTested = &currTime
  245. monitor.LastRunResult = string(runResult)
  246. monitor.Title = queryRes.PolicyTitle
  247. monitor.Message = queryRes.PolicyMessage
  248. monitor.Severity = queryRes.PolicySeverity
  249. monitor.SeverityEnum = models.GetSeverityEnum(queryRes.PolicySeverity)
  250. monitor.LastRunResultEnum = models.GetLastRunResultEnum(string(runResult))
  251. monitor.LastRecommenderRunID = recommenderID
  252. monitor.Archived = false
  253. return monitor
  254. }
  255. func (n *recommender) SetData([]byte) {}