main.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. // Copyright 2021 the Kilo authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "net"
  20. "net/http"
  21. "os"
  22. "os/signal"
  23. "strconv"
  24. "strings"
  25. "syscall"
  26. "time"
  27. "github.com/go-kit/kit/log"
  28. "github.com/go-kit/kit/log/level"
  29. "github.com/metalmatze/signal/internalserver"
  30. "github.com/oklog/run"
  31. "github.com/prometheus/client_golang/prometheus"
  32. "github.com/prometheus/client_golang/prometheus/collectors"
  33. "github.com/spf13/cobra"
  34. apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
  35. "k8s.io/client-go/kubernetes"
  36. "k8s.io/client-go/tools/clientcmd"
  37. "github.com/squat/kilo/pkg/encapsulation"
  38. "github.com/squat/kilo/pkg/k8s"
  39. kiloclient "github.com/squat/kilo/pkg/k8s/clientset/versioned"
  40. "github.com/squat/kilo/pkg/mesh"
  41. "github.com/squat/kilo/pkg/version"
  42. "github.com/squat/kilo/pkg/wireguard"
  43. )
  44. const (
  45. logLevelAll = "all"
  46. logLevelDebug = "debug"
  47. logLevelInfo = "info"
  48. logLevelWarn = "warn"
  49. logLevelError = "error"
  50. logLevelNone = "none"
  51. )
  52. // Compatibility modes.
  53. const (
  54. compatFlannel = "flannel"
  55. compatCilium = "cilium"
  56. )
  57. var (
  58. availableBackends = strings.Join([]string{
  59. k8s.Backend,
  60. }, ", ")
  61. availableCompatibilities = strings.Join([]string{
  62. compatFlannel,
  63. compatCilium,
  64. }, ", ")
  65. availableEncapsulations = strings.Join([]string{
  66. string(encapsulation.Never),
  67. string(encapsulation.CrossSubnet),
  68. string(encapsulation.Always),
  69. }, ", ")
  70. availableGranularities = strings.Join([]string{
  71. string(mesh.LogicalGranularity),
  72. string(mesh.FullGranularity),
  73. }, ", ")
  74. availableLogLevels = strings.Join([]string{
  75. logLevelAll,
  76. logLevelDebug,
  77. logLevelInfo,
  78. logLevelWarn,
  79. logLevelError,
  80. logLevelNone,
  81. }, ", ")
  82. )
  83. var cmd = &cobra.Command{
  84. Use: "kg",
  85. Short: "kg is the Kilo agent",
  86. Long: `kg is the Kilo agent.
  87. It runs on every node of a cluster,
  88. setting up the public and private keys for the VPN
  89. as well as the necessary rules to route packets between locations.`,
  90. PreRunE: preRun,
  91. RunE: runRoot,
  92. SilenceUsage: true,
  93. SilenceErrors: true,
  94. }
  95. var (
  96. backend string
  97. cleanUp bool
  98. cleanUpIface bool
  99. createIface bool
  100. cni bool
  101. cniPath string
  102. compatibility string
  103. encapsulate string
  104. granularity string
  105. hostname string
  106. kubeconfig string
  107. iface string
  108. listen string
  109. local bool
  110. master string
  111. mtuFlag string
  112. topologyLabel string
  113. port int
  114. serviceCIDRsRaw []string
  115. internalCIDRsRaw []string
  116. subnet string
  117. resyncPeriod time.Duration
  118. iptablesForwardRule bool
  119. prioritisePrivateAddr bool
  120. printVersion bool
  121. logLevel string
  122. logger log.Logger
  123. registry *prometheus.Registry
  124. )
  125. func init() {
  126. cmd.Flags().StringVar(&backend, "backend", k8s.Backend, fmt.Sprintf("The backend for the mesh. Possible values: %s", availableBackends))
  127. cmd.Flags().BoolVar(&cleanUp, "clean-up", true, "Should kilo clean up network modifications on shutdown?")
  128. cmd.Flags().BoolVar(&cleanUpIface, "clean-up-interface", false, "Should Kilo delete its interface when it shuts down?")
  129. cmd.Flags().BoolVar(&createIface, "create-interface", true, "Should kilo create an interface on startup?")
  130. cmd.Flags().BoolVar(&cni, "cni", true, "Should Kilo manage the node's CNI configuration?")
  131. cmd.Flags().StringVar(&cniPath, "cni-path", mesh.DefaultCNIPath, "Path to CNI config.")
  132. cmd.Flags().StringVar(&compatibility, "compatibility", "", fmt.Sprintf("Should Kilo run in compatibility mode? Possible values: %s", availableCompatibilities))
  133. cmd.Flags().StringVar(&encapsulate, "encapsulate", string(encapsulation.Always), fmt.Sprintf("When should Kilo encapsulate packets within a location? Possible values: %s", availableEncapsulations))
  134. cmd.Flags().StringVar(&granularity, "mesh-granularity", string(mesh.LogicalGranularity), fmt.Sprintf("The granularity of the network mesh to create. Possible values: %s", availableGranularities))
  135. cmd.Flags().StringVar(&kubeconfig, "kubeconfig", "", "Path to kubeconfig.")
  136. cmd.Flags().StringVar(&hostname, "hostname", "", "Hostname of the node on which this process is running.")
  137. cmd.Flags().StringVar(&iface, "interface", mesh.DefaultKiloInterface, "Name of the Kilo interface to use; if it does not exist, it will be created.")
  138. cmd.Flags().StringVar(&listen, "listen", ":1107", "The address at which to listen for health and metrics.")
  139. cmd.Flags().BoolVar(&local, "local", true, "Should Kilo manage routes within a location?")
  140. cmd.Flags().StringVar(&master, "master", "", "The address of the Kubernetes API server (overrides any value in kubeconfig).")
  141. cmd.Flags().StringVar(&mtuFlag, "mtu", "auto", "The MTU of the WireGuard interface created by Kilo. Set to 'auto' to detect from the underlay interface.")
  142. cmd.Flags().StringVar(&topologyLabel, "topology-label", k8s.RegionLabelKey, "Kubernetes node label used to group nodes into logical locations.")
  143. cmd.Flags().IntVar(&port, "port", mesh.DefaultKiloPort, "The port over which WireGuard peers should communicate.")
  144. cmd.Flags().StringSliceVar(&serviceCIDRsRaw, "service-cidr", nil, "The service CIDR for the Kubernetes cluster. Can be provided optionally to avoid masquerading packets sent to service IPs. Can be specified multiple times.")
  145. cmd.Flags().StringSliceVar(&internalCIDRsRaw, "internal-cidr", nil, "CIDRs to consider for internal IP auto-detection. If specified, only IPs within these CIDRs will be used. Can be specified multiple times.")
  146. cmd.Flags().StringVar(&subnet, "subnet", mesh.DefaultKiloSubnet.String(), "CIDR from which to allocate addresses for WireGuard interfaces.")
  147. cmd.Flags().DurationVar(&resyncPeriod, "resync-period", 30*time.Second, "How often should the Kilo controllers reconcile?")
  148. cmd.Flags().BoolVar(&iptablesForwardRule, "iptables-forward-rules", false, "Add default accept rules to the FORWARD chain in iptables. Warning: this may break firewalls with a deny all policy and is potentially insecure!")
  149. cmd.Flags().BoolVar(&prioritisePrivateAddr, "prioritise-private-addresses", false, "Prefer to assign a private IP address to the node's endpoint.")
  150. cmd.PersistentFlags().BoolVar(&printVersion, "version", false, "Print version and exit.")
  151. cmd.PersistentFlags().StringVar(&logLevel, "log-level", logLevelInfo, fmt.Sprintf("Log level to use. Possible values: %s", availableLogLevels))
  152. }
  153. func preRun(_ *cobra.Command, _ []string) error {
  154. logger = log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
  155. switch logLevel {
  156. case logLevelAll:
  157. logger = level.NewFilter(logger, level.AllowAll())
  158. case logLevelDebug:
  159. logger = level.NewFilter(logger, level.AllowDebug())
  160. case logLevelInfo:
  161. logger = level.NewFilter(logger, level.AllowInfo())
  162. case logLevelWarn:
  163. logger = level.NewFilter(logger, level.AllowWarn())
  164. case logLevelError:
  165. logger = level.NewFilter(logger, level.AllowError())
  166. case logLevelNone:
  167. logger = level.NewFilter(logger, level.AllowNone())
  168. default:
  169. return fmt.Errorf("log level %v unknown; possible values are: %s", logLevel, availableLogLevels)
  170. }
  171. logger = log.With(logger, "ts", log.DefaultTimestampUTC)
  172. logger = log.With(logger, "caller", log.DefaultCaller)
  173. registry = prometheus.NewRegistry()
  174. registry.MustRegister(
  175. collectors.NewGoCollector(),
  176. collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
  177. )
  178. return nil
  179. }
  180. // runRoot is the principal function for the binary.
  181. func runRoot(_ *cobra.Command, _ []string) error {
  182. if printVersion {
  183. fmt.Println(version.Version)
  184. return nil
  185. }
  186. _, s, err := net.ParseCIDR(subnet)
  187. if err != nil {
  188. return fmt.Errorf("failed to parse %q as CIDR: %v", subnet, err)
  189. }
  190. if hostname == "" {
  191. var err error
  192. hostname, err = os.Hostname()
  193. if hostname == "" || err != nil {
  194. return errors.New("failed to determine hostname")
  195. }
  196. }
  197. e := encapsulation.Strategy(encapsulate)
  198. switch e {
  199. case encapsulation.Never:
  200. case encapsulation.CrossSubnet:
  201. case encapsulation.Always:
  202. default:
  203. return fmt.Errorf("encapsulation %v unknown; possible values are: %s", encapsulate, availableEncapsulations)
  204. }
  205. var enc encapsulation.Encapsulator
  206. switch compatibility {
  207. case compatFlannel:
  208. enc = encapsulation.NewFlannel(e)
  209. case compatCilium:
  210. enc = encapsulation.NewCilium(e)
  211. default:
  212. enc = encapsulation.NewIPIP(e)
  213. }
  214. gr := mesh.Granularity(granularity)
  215. switch gr {
  216. case mesh.LogicalGranularity:
  217. case mesh.FullGranularity:
  218. default:
  219. return fmt.Errorf("mesh granularity %v unknown; possible values are: %s", granularity, availableGranularities)
  220. }
  221. var b mesh.Backend
  222. switch backend {
  223. case k8s.Backend:
  224. config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
  225. if err != nil {
  226. return fmt.Errorf("failed to create Kubernetes config: %v", err)
  227. }
  228. c := kubernetes.NewForConfigOrDie(config)
  229. kc := kiloclient.NewForConfigOrDie(config)
  230. ec := apiextensions.NewForConfigOrDie(config)
  231. b = k8s.New(c, kc, ec, topologyLabel, log.With(logger, "component", "k8s backend"))
  232. default:
  233. return fmt.Errorf("backend %v unknown; possible values are: %s", backend, availableBackends)
  234. }
  235. if port < 1 || port > 1<<16-1 {
  236. return fmt.Errorf("invalid port: port mus be in range [%d:%d], but got %d", 1, 1<<16-1, port)
  237. }
  238. var serviceCIDRs []*net.IPNet
  239. for _, serviceCIDR := range serviceCIDRsRaw {
  240. _, s, err := net.ParseCIDR(serviceCIDR)
  241. if err != nil {
  242. return fmt.Errorf("failed to parse %q as CIDR: %v", serviceCIDR, err)
  243. }
  244. serviceCIDRs = append(serviceCIDRs, s)
  245. }
  246. var internalCIDRs []*net.IPNet
  247. for _, internalCIDR := range internalCIDRsRaw {
  248. _, s, err := net.ParseCIDR(internalCIDR)
  249. if err != nil {
  250. return fmt.Errorf("failed to parse %q as CIDR: %v", internalCIDR, err)
  251. }
  252. internalCIDRs = append(internalCIDRs, s)
  253. }
  254. var mtu uint
  255. var autoMTU bool
  256. if mtuFlag == "auto" {
  257. autoMTU = true
  258. mtu = wireguard.DefaultMTU
  259. } else {
  260. v, err := strconv.ParseUint(mtuFlag, 10, 32)
  261. if err != nil {
  262. return fmt.Errorf("failed to parse MTU %q: %v", mtuFlag, err)
  263. }
  264. mtu = uint(v)
  265. }
  266. m, err := mesh.New(b, enc, gr, hostname, port, s, local, cni, cniPath, iface, cleanUp, cleanUpIface, createIface, mtu, autoMTU, resyncPeriod, prioritisePrivateAddr, iptablesForwardRule, internalCIDRs, serviceCIDRs, log.With(logger, "component", "kilo"), registry)
  267. if err != nil {
  268. return fmt.Errorf("failed to create Kilo mesh: %v", err)
  269. }
  270. var g run.Group
  271. {
  272. h := internalserver.NewHandler(
  273. internalserver.WithName("Internal Kilo API"),
  274. internalserver.WithPrometheusRegistry(registry),
  275. internalserver.WithPProf(),
  276. )
  277. h.AddEndpoint("/health", "Exposes health checks", healthHandler)
  278. h.AddEndpoint("/graph", "Exposes Kilo mesh topology graph", (&graphHandler{m, gr, &hostname, s, serviceCIDRs}).ServeHTTP)
  279. // Run the HTTP server.
  280. l, err := net.Listen("tcp", listen)
  281. if err != nil {
  282. return fmt.Errorf("failed to listen on %s: %v", listen, err)
  283. }
  284. g.Add(func() error {
  285. if err := http.Serve(l, h); err != nil && err != http.ErrServerClosed {
  286. return fmt.Errorf("error: server exited unexpectedly: %v", err)
  287. }
  288. return nil
  289. }, func(error) {
  290. _ = l.Close()
  291. })
  292. }
  293. {
  294. ctx, cancel := context.WithCancel(context.Background())
  295. // Start the mesh.
  296. g.Add(func() error {
  297. _ = logger.Log("msg", fmt.Sprintf("Starting Kilo network mesh '%v'.", version.Version))
  298. if err := m.Run(ctx); err != nil {
  299. return fmt.Errorf("error: Kilo exited unexpectedly: %v", err)
  300. }
  301. return nil
  302. }, func(error) {
  303. cancel()
  304. })
  305. }
  306. {
  307. // Exit gracefully on SIGINT and SIGTERM.
  308. term := make(chan os.Signal, 1)
  309. signal.Notify(term, syscall.SIGINT, syscall.SIGTERM)
  310. cancel := make(chan struct{})
  311. g.Add(func() error {
  312. for {
  313. select {
  314. case <-term:
  315. _ = logger.Log("msg", "caught interrupt; gracefully cleaning up; see you next time!")
  316. return nil
  317. case <-cancel:
  318. return nil
  319. }
  320. }
  321. }, func(error) {
  322. close(cancel)
  323. })
  324. }
  325. return g.Run()
  326. }
  327. var versionCmd = &cobra.Command{
  328. Use: "version",
  329. Short: "Print the version and exit.",
  330. Run: func(_ *cobra.Command, _ []string) { fmt.Println(version.Version) },
  331. }
  332. func main() {
  333. cmd.AddCommand(webhookCmd, versionCmd)
  334. if err := cmd.Execute(); err != nil {
  335. fmt.Fprintf(os.Stderr, "%v\n", err)
  336. os.Exit(1)
  337. }
  338. }