2
0

webhook.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. // Copyright 2021 the Kilo authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "context"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io/ioutil"
  21. "net/http"
  22. "os"
  23. "syscall"
  24. "time"
  25. "github.com/go-kit/kit/log/level"
  26. "github.com/oklog/run"
  27. "github.com/prometheus/client_golang/prometheus"
  28. "github.com/prometheus/client_golang/prometheus/promhttp"
  29. "github.com/spf13/cobra"
  30. v1 "k8s.io/api/admission/v1"
  31. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  32. "k8s.io/apimachinery/pkg/runtime"
  33. "k8s.io/apimachinery/pkg/runtime/serializer"
  34. kilo "github.com/squat/kilo/pkg/k8s/apis/kilo/v1alpha1"
  35. "github.com/squat/kilo/pkg/version"
  36. )
  37. var webhookCmd = &cobra.Command{
  38. Use: "webhook",
  39. PreRunE: func(c *cobra.Command, a []string) error {
  40. if c.HasParent() {
  41. return c.Parent().PreRunE(c, a)
  42. }
  43. return nil
  44. },
  45. Short: "webhook starts a HTTPS server to validate updates and creations of Kilo peers.",
  46. RunE: webhook,
  47. }
  48. var (
  49. certPath string
  50. keyPath string
  51. metricsAddr string
  52. listenAddr string
  53. )
  54. func init() {
  55. webhookCmd.Flags().StringVar(&certPath, "cert-file", "", "The path to a certificate file")
  56. webhookCmd.Flags().StringVar(&keyPath, "key-file", "", "The path to a key file")
  57. webhookCmd.Flags().StringVar(&metricsAddr, "listen-metrics", ":1107", "The metrics server will be listening to that address")
  58. webhookCmd.Flags().StringVar(&listenAddr, "listen", ":8443", "The webhook server will be listening to that address")
  59. }
  60. var deserializer = serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()
  61. var (
  62. validationCounter = prometheus.NewCounterVec(
  63. prometheus.CounterOpts{
  64. Name: "admission_requests_total",
  65. Help: "The number of received admission reviews requests",
  66. },
  67. []string{"operation", "response"},
  68. )
  69. requestCounter = prometheus.NewCounterVec(
  70. prometheus.CounterOpts{
  71. Name: "http_requests_total",
  72. Help: "The number of received http requests",
  73. },
  74. []string{"handler", "method"},
  75. )
  76. errorCounter = prometheus.NewCounter(
  77. prometheus.CounterOpts{
  78. Name: "errors_total",
  79. Help: "The total number of errors",
  80. },
  81. )
  82. )
  83. func validationHandler(w http.ResponseWriter, r *http.Request) {
  84. level.Debug(logger).Log("msg", "handling request", "source", r.RemoteAddr)
  85. body, err := ioutil.ReadAll(r.Body)
  86. if err != nil {
  87. errorCounter.Inc()
  88. level.Error(logger).Log("err", "failed to parse body from incoming request", "source", r.RemoteAddr)
  89. http.Error(w, err.Error(), http.StatusBadRequest)
  90. return
  91. }
  92. var admissionReview v1.AdmissionReview
  93. contentType := r.Header.Get("Content-Type")
  94. if contentType != "application/json" {
  95. errorCounter.Inc()
  96. msg := fmt.Sprintf("received Content-Type=%s, expected application/json", contentType)
  97. level.Error(logger).Log("err", msg)
  98. http.Error(w, msg, http.StatusBadRequest)
  99. return
  100. }
  101. response := v1.AdmissionReview{}
  102. _, gvk, err := deserializer.Decode(body, nil, &admissionReview)
  103. if err != nil {
  104. errorCounter.Inc()
  105. msg := fmt.Sprintf("Request could not be decoded: %v", err)
  106. level.Error(logger).Log("err", msg)
  107. http.Error(w, msg, http.StatusBadRequest)
  108. return
  109. }
  110. if *gvk != v1.SchemeGroupVersion.WithKind("AdmissionReview") {
  111. errorCounter.Inc()
  112. msg := "only API v1 is supported"
  113. level.Error(logger).Log("err", msg)
  114. http.Error(w, msg, http.StatusBadRequest)
  115. return
  116. }
  117. response.SetGroupVersionKind(*gvk)
  118. response.Response = &v1.AdmissionResponse{
  119. UID: admissionReview.Request.UID,
  120. }
  121. rawExtension := admissionReview.Request.Object
  122. var peer kilo.Peer
  123. if err := json.Unmarshal(rawExtension.Raw, &peer); err != nil {
  124. errorCounter.Inc()
  125. msg := fmt.Sprintf("could not unmarshal extension to peer spec: %v:", err)
  126. level.Error(logger).Log("err", msg)
  127. http.Error(w, msg, http.StatusBadRequest)
  128. return
  129. }
  130. if err := peer.Validate(); err == nil {
  131. level.Debug(logger).Log("msg", "got valid peer spec", "spec", peer.Spec, "name", peer.ObjectMeta.Name)
  132. validationCounter.With(prometheus.Labels{"operation": string(admissionReview.Request.Operation), "response": "allowed"}).Inc()
  133. response.Response.Allowed = true
  134. } else {
  135. level.Debug(logger).Log("msg", "got invalid peer spec", "spec", peer.Spec, "name", peer.ObjectMeta.Name)
  136. validationCounter.With(prometheus.Labels{"operation": string(admissionReview.Request.Operation), "response": "denied"}).Inc()
  137. response.Response.Result = &metav1.Status{
  138. Message: err.Error(),
  139. }
  140. }
  141. res, err := json.Marshal(response)
  142. if err != nil {
  143. errorCounter.Inc()
  144. msg := fmt.Sprintf("failed to marshal response: %v", err)
  145. level.Error(logger).Log("err", msg)
  146. http.Error(w, msg, http.StatusInternalServerError)
  147. return
  148. }
  149. w.Header().Set("Content-Type", "application/json")
  150. if _, err := w.Write(res); err != nil {
  151. level.Error(logger).Log("err", err, "msg", "failed to write response")
  152. }
  153. }
  154. func metricsMiddleWare(path string, next func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
  155. return func(w http.ResponseWriter, r *http.Request) {
  156. requestCounter.With(prometheus.Labels{"method": r.Method, "handler": path}).Inc()
  157. next(w, r)
  158. }
  159. }
  160. func webhook(_ *cobra.Command, _ []string) error {
  161. if printVersion {
  162. fmt.Println(version.Version)
  163. os.Exit(0)
  164. }
  165. registry.MustRegister(
  166. errorCounter,
  167. validationCounter,
  168. requestCounter,
  169. )
  170. ctx, cancel := context.WithCancel(context.Background())
  171. defer func() {
  172. cancel()
  173. }()
  174. var g run.Group
  175. g.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM))
  176. {
  177. mm := http.NewServeMux()
  178. mm.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
  179. msrv := &http.Server{
  180. Addr: metricsAddr,
  181. Handler: mm,
  182. }
  183. g.Add(
  184. func() error {
  185. level.Info(logger).Log("msg", "starting metrics server", "address", msrv.Addr)
  186. err := msrv.ListenAndServe()
  187. level.Info(logger).Log("msg", "metrics server exited", "err", err)
  188. return err
  189. },
  190. func(err error) {
  191. var serr run.SignalError
  192. if ok := errors.As(err, &serr); ok {
  193. level.Info(logger).Log("msg", "received signal", "signal", serr.Signal.String(), "err", err.Error())
  194. } else {
  195. level.Error(logger).Log("msg", "received error", "err", err.Error())
  196. }
  197. level.Info(logger).Log("msg", "shutting down metrics server gracefully")
  198. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  199. defer func() {
  200. cancel()
  201. }()
  202. if err := msrv.Shutdown(ctx); err != nil {
  203. level.Error(logger).Log("msg", "failed to shut down metrics server gracefully", "err", err.Error())
  204. msrv.Close()
  205. }
  206. },
  207. )
  208. }
  209. {
  210. mux := http.NewServeMux()
  211. mux.HandleFunc("/validate", metricsMiddleWare("/validate", validationHandler))
  212. srv := &http.Server{
  213. Addr: listenAddr,
  214. Handler: mux,
  215. }
  216. g.Add(
  217. func() error {
  218. level.Info(logger).Log("msg", "starting webhook server", "address", srv.Addr)
  219. err := srv.ListenAndServeTLS(certPath, keyPath)
  220. level.Info(logger).Log("msg", "webhook server exited", "err", err)
  221. return err
  222. },
  223. func(err error) {
  224. var serr run.SignalError
  225. if ok := errors.As(err, &serr); ok {
  226. level.Info(logger).Log("msg", "received signal", "signal", serr.Signal.String(), "err", err.Error())
  227. } else {
  228. level.Error(logger).Log("msg", "received error", "err", err.Error())
  229. }
  230. level.Info(logger).Log("msg", "shutting down webhook server gracefully")
  231. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  232. defer func() {
  233. cancel()
  234. }()
  235. if err := srv.Shutdown(ctx); err != nil {
  236. level.Error(logger).Log("msg", "failed to shut down webhook server gracefully", "err", err.Error())
  237. srv.Close()
  238. }
  239. },
  240. )
  241. }
  242. err := g.Run()
  243. var serr run.SignalError
  244. if ok := errors.As(err, &serr); ok {
  245. return nil
  246. }
  247. return err
  248. }