recommender.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. //go:build ee
  2. /*
  3. === Recommender Job ===
  4. This job checks to see if a cluster matches policies set by the OPA config file.
  5. */
  6. package jobs
  7. import (
  8. "errors"
  9. "fmt"
  10. "log"
  11. "time"
  12. "github.com/mitchellh/mapstructure"
  13. "github.com/porter-dev/porter/api/server/shared/config/env"
  14. "github.com/porter-dev/porter/api/server/shared/requestutils"
  15. "github.com/porter-dev/porter/api/types"
  16. "github.com/porter-dev/porter/ee/integrations/vault"
  17. "github.com/porter-dev/porter/internal/kubernetes"
  18. "github.com/porter-dev/porter/internal/models"
  19. "github.com/porter-dev/porter/internal/oauth"
  20. "github.com/porter-dev/porter/internal/opa"
  21. "github.com/porter-dev/porter/internal/repository"
  22. rcreds "github.com/porter-dev/porter/internal/repository/credentials"
  23. rgorm "github.com/porter-dev/porter/internal/repository/gorm"
  24. "golang.org/x/oauth2"
  25. "gorm.io/gorm"
  26. )
  27. type recommender struct {
  28. enqueueTime time.Time
  29. db *gorm.DB
  30. repo repository.Repository
  31. doConf *oauth2.Config
  32. clusterAndProjectIDs []clusterAndProjectID
  33. categories []string
  34. policies *opa.KubernetesPolicies
  35. }
  36. // RecommenderOpts holds the options required to run this job
  37. type RecommenderOpts struct {
  38. DBConf *env.DBConf
  39. DOClientID string
  40. DOClientSecret string
  41. DOScopes []string
  42. ServerURL string
  43. LegacyProjectIDs []uint
  44. Input map[string]interface{}
  45. }
  46. type recommenderInput struct {
  47. Projects []uint `mapstructure:"projects"`
  48. ClusterID uint `mapstructure:"cluster_id"`
  49. Priority string `mapstructure:"priority"`
  50. Categories []string `mapstructure:"categories"`
  51. }
  52. type clusterAndProjectID struct {
  53. clusterID uint
  54. projectID uint
  55. }
  56. func NewRecommender(
  57. db *gorm.DB,
  58. enqueueTime time.Time,
  59. opts *RecommenderOpts,
  60. opaPolicies *opa.KubernetesPolicies,
  61. ) (*recommender, error) {
  62. var credBackend rcreds.CredentialStorage
  63. if opts.DBConf.VaultAPIKey != "" && opts.DBConf.VaultServerURL != "" && opts.DBConf.VaultPrefix != "" {
  64. credBackend = vault.NewClient(
  65. opts.DBConf.VaultServerURL,
  66. opts.DBConf.VaultAPIKey,
  67. opts.DBConf.VaultPrefix,
  68. )
  69. }
  70. var key [32]byte
  71. for i, b := range []byte(opts.DBConf.EncryptionKey) {
  72. key[i] = b
  73. }
  74. repo := rgorm.NewRepository(db, &key, credBackend)
  75. doConf := oauth.NewDigitalOceanClient(&oauth.Config{
  76. ClientID: opts.DOClientID,
  77. ClientSecret: opts.DOClientSecret,
  78. Scopes: opts.DOScopes,
  79. BaseURL: opts.ServerURL,
  80. })
  81. // parse input
  82. parsedInput := &recommenderInput{}
  83. err := mapstructure.Decode(opts.Input, parsedInput)
  84. if err != nil {
  85. return nil, err
  86. }
  87. // validate
  88. validator := requestutils.NewDefaultValidator()
  89. if requestErr := validator.Validate(parsedInput); requestErr != nil {
  90. return nil, fmt.Errorf(requestErr.Error())
  91. }
  92. clusterIDs, err := getClustersToParse(db, repo.Cluster(), parsedInput, opts.LegacyProjectIDs)
  93. if err != nil {
  94. return nil, err
  95. }
  96. return &recommender{
  97. enqueueTime, db, repo, doConf, clusterIDs, parsedInput.Categories, opaPolicies,
  98. }, nil
  99. }
  100. func getClustersToParse(db *gorm.DB, clusterRepo repository.ClusterRepository, input *recommenderInput, legacyProjects []uint) ([]clusterAndProjectID, error) {
  101. // if the project and cluster ID is set, make sure that the project id matches the cluster's
  102. // project id
  103. if input.ClusterID != 0 {
  104. if len(input.Projects) != 1 {
  105. return nil, fmt.Errorf("if cluster ID is passed, you must pass the matching project ID")
  106. }
  107. _, err := clusterRepo.ReadCluster(input.Projects[0], input.ClusterID)
  108. if err != nil {
  109. return nil, err
  110. }
  111. return []clusterAndProjectID{{
  112. clusterID: input.ClusterID,
  113. projectID: input.Projects[0],
  114. }}, nil
  115. }
  116. // if there are no projects set, query for all clusters within the relevant projects
  117. clusters := make([]*models.Cluster, 0)
  118. query := db.Where(`clusters.project_id IN (?) OR clusters.project_id IN (
  119. SELECT p2.id FROM projects AS p2
  120. INNER JOIN project_usages ON p2.id=project_usages.project_id
  121. WHERE project_usages.resource_cpu != 10 AND project_usages.resource_memory != 20000 AND project_usages.clusters != 1 AND project_usages.users != 1
  122. )`, legacyProjects)
  123. if err := query.Find(&clusters).Error; err != nil {
  124. return nil, err
  125. }
  126. res := make([]clusterAndProjectID, 0)
  127. for _, cluster := range clusters {
  128. res = append(res, clusterAndProjectID{
  129. clusterID: cluster.ID,
  130. projectID: cluster.ProjectID,
  131. })
  132. }
  133. return res, nil
  134. }
  135. func (n *recommender) ID() string {
  136. return "recommender"
  137. }
  138. func (n *recommender) EnqueueTime() time.Time {
  139. return n.enqueueTime
  140. }
  141. func (n *recommender) Run() error {
  142. for _, ids := range n.clusterAndProjectIDs {
  143. fmt.Println(ids.projectID, ids.clusterID)
  144. cluster, err := n.repo.Cluster().ReadCluster(ids.projectID, ids.clusterID)
  145. if err != nil {
  146. log.Printf("error reading cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  147. continue
  148. }
  149. k8sAgent, err := kubernetes.GetAgentOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
  150. Cluster: cluster,
  151. Repo: n.repo,
  152. DigitalOceanOAuth: n.doConf,
  153. AllowInClusterConnections: false,
  154. })
  155. if err != nil {
  156. log.Printf("error getting k8s agent for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  157. continue
  158. }
  159. dynamicClient, err := kubernetes.GetDynamicClientOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
  160. Cluster: cluster,
  161. Repo: n.repo,
  162. DigitalOceanOAuth: n.doConf,
  163. AllowInClusterConnections: false,
  164. })
  165. if err != nil {
  166. log.Printf("error getting dynamic client for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  167. continue
  168. }
  169. runner := opa.NewRunner(n.policies, k8sAgent, dynamicClient)
  170. queryResults, err := runner.GetRecommendations(n.categories)
  171. if err != nil {
  172. log.Printf("error querying opa policies for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  173. continue
  174. }
  175. for _, queryRes := range queryResults {
  176. fmt.Println(queryRes.ObjectID, queryRes.Allow, queryRes.PolicyTitle, queryRes.PolicyMessage)
  177. monitor, err := n.repo.MonitorTestResult().ReadMonitorTestResult(ids.projectID, ids.clusterID, queryRes.ObjectID)
  178. if err != nil {
  179. if errors.Is(err, gorm.ErrRecordNotFound) {
  180. monitor, err = n.repo.MonitorTestResult().CreateMonitorTestResult(n.getMonitorTestResultFromQueryResult(cluster, queryRes))
  181. } else {
  182. continue
  183. }
  184. } else {
  185. monitor, err = n.repo.MonitorTestResult().UpdateMonitorTestResult(mergeMonitorTestResultFromQueryResult(monitor, queryRes))
  186. }
  187. if err != nil {
  188. continue
  189. }
  190. }
  191. }
  192. return nil
  193. }
  194. func (n *recommender) getMonitorTestResultFromQueryResult(cluster *models.Cluster, queryRes *opa.OPARecommenderQueryResult) *models.MonitorTestResult {
  195. runResult := types.MonitorTestStatusSuccess
  196. if !queryRes.Allow {
  197. runResult = types.MonitorTestStatusFailed
  198. }
  199. currTime := time.Now()
  200. return &models.MonitorTestResult{
  201. ProjectID: cluster.ProjectID,
  202. ClusterID: cluster.ID,
  203. Category: queryRes.CategoryName,
  204. ObjectID: queryRes.ObjectID,
  205. LastStatusChange: &currTime,
  206. LastTested: &currTime,
  207. LastRunResult: string(runResult),
  208. LastRunResultEnum: models.GetLastRunResultEnum(string(runResult)),
  209. Title: queryRes.PolicyTitle,
  210. Message: queryRes.PolicyMessage,
  211. Severity: queryRes.PolicySeverity,
  212. SeverityEnum: models.GetSeverityEnum(queryRes.PolicySeverity),
  213. }
  214. }
  215. func mergeMonitorTestResultFromQueryResult(monitor *models.MonitorTestResult, queryRes *opa.OPARecommenderQueryResult) *models.MonitorTestResult {
  216. runResult := types.MonitorTestStatusSuccess
  217. if !queryRes.Allow {
  218. runResult = types.MonitorTestStatusFailed
  219. }
  220. currTime := time.Now()
  221. if isStatusChange := monitor.LastRunResult == string(runResult); isStatusChange {
  222. monitor.LastStatusChange = &currTime
  223. }
  224. monitor.LastTested = &currTime
  225. monitor.LastRunResult = string(runResult)
  226. monitor.Title = queryRes.PolicyTitle
  227. monitor.Message = queryRes.PolicyMessage
  228. monitor.Severity = queryRes.PolicySeverity
  229. monitor.SeverityEnum = models.GetSeverityEnum(queryRes.PolicySeverity)
  230. monitor.LastRunResultEnum = models.GetLastRunResultEnum(string(runResult))
  231. return monitor
  232. }
  233. func (n *recommender) SetData([]byte) {}