route.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. // Copyright 2019 the Kilo authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package route
  15. import (
  16. "errors"
  17. "fmt"
  18. "sync"
  19. "github.com/vishvananda/netlink"
  20. "golang.org/x/sys/unix"
  21. )
  22. // Table represents a routing table.
  23. // Table can safely be used concurrently.
  24. type Table struct {
  25. errors chan error
  26. mu sync.Mutex
  27. rs map[string]interface{}
  28. subscribed bool
  29. // Make these functions fields to allow
  30. // for testing.
  31. addRoute func(*netlink.Route) error
  32. delRoute func(*netlink.Route) error
  33. addRule func(*netlink.Rule) error
  34. delRule func(*netlink.Rule) error
  35. }
  36. // NewTable generates a new table.
  37. func NewTable() *Table {
  38. return &Table{
  39. errors: make(chan error),
  40. rs: make(map[string]interface{}),
  41. addRoute: netlink.RouteReplace,
  42. delRoute: func(r *netlink.Route) error {
  43. name := routeToString(r)
  44. if name == "" {
  45. return errors.New("attempting to delete invalid route")
  46. }
  47. routes, err := netlink.RouteList(nil, netlink.FAMILY_ALL)
  48. if err != nil {
  49. return fmt.Errorf("failed to list routes before deletion: %v", err)
  50. }
  51. for _, route := range routes {
  52. if routeToString(&route) == name {
  53. return netlink.RouteDel(r)
  54. }
  55. }
  56. return nil
  57. },
  58. addRule: netlink.RuleAdd,
  59. delRule: func(r *netlink.Rule) error {
  60. name := ruleToString(r)
  61. if name == "" {
  62. return errors.New("attempting to delete invalid rule")
  63. }
  64. rules, err := netlink.RuleList(netlink.FAMILY_ALL)
  65. if err != nil {
  66. return fmt.Errorf("failed to list rules before deletion: %v", err)
  67. }
  68. for _, rule := range rules {
  69. if ruleToString(&rule) == name {
  70. return netlink.RuleDel(r)
  71. }
  72. }
  73. return nil
  74. },
  75. }
  76. }
  77. // Run watches for changes to routes and rules in the table and reconciles
  78. // the table against the desired state.
  79. func (t *Table) Run(stop <-chan struct{}) (<-chan error, error) {
  80. t.mu.Lock()
  81. if t.subscribed {
  82. t.mu.Unlock()
  83. return t.errors, nil
  84. }
  85. // Ensure a given instance only subscribes once.
  86. t.subscribed = true
  87. t.mu.Unlock()
  88. events := make(chan netlink.RouteUpdate)
  89. if err := netlink.RouteSubscribe(events, stop); err != nil {
  90. return t.errors, fmt.Errorf("failed to subscribe to route events: %v", err)
  91. }
  92. go func() {
  93. defer close(t.errors)
  94. for {
  95. var e netlink.RouteUpdate
  96. select {
  97. case e = <-events:
  98. case <-stop:
  99. return
  100. }
  101. switch e.Type {
  102. // Watch for deleted routes to reconcile this table's routes.
  103. case unix.RTM_DELROUTE:
  104. // Filter out invalid routes.
  105. if e.Route.Dst == nil {
  106. continue
  107. }
  108. t.mu.Lock()
  109. for k := range t.rs {
  110. switch r := t.rs[k].(type) {
  111. case *netlink.Route:
  112. // If any deleted route's destination matches a destination
  113. // in the table, reset the corresponding route just in case.
  114. if r.Dst.IP.Equal(e.Route.Dst.IP) && r.Dst.Mask.String() == e.Route.Dst.Mask.String() {
  115. if err := t.addRoute(r); err != nil {
  116. nonBlockingSend(t.errors, fmt.Errorf("failed add route: %v", err))
  117. }
  118. }
  119. }
  120. }
  121. t.mu.Unlock()
  122. }
  123. }
  124. }()
  125. return t.errors, nil
  126. }
  127. // CleanUp will clean up any routes and rules created by the instance.
  128. func (t *Table) CleanUp() error {
  129. t.mu.Lock()
  130. defer t.mu.Unlock()
  131. for k := range t.rs {
  132. switch r := t.rs[k].(type) {
  133. case *netlink.Route:
  134. if err := t.delRoute(r); err != nil {
  135. return fmt.Errorf("failed to delete route: %v", err)
  136. }
  137. case *netlink.Rule:
  138. if err := t.delRule(r); err != nil {
  139. return fmt.Errorf("failed to delete rule: %v", err)
  140. }
  141. }
  142. delete(t.rs, k)
  143. }
  144. return nil
  145. }
  146. // Set idempotently overwrites any routes and rules previously defined
  147. // for the table with the given set of routes and rules.
  148. func (t *Table) Set(routes []*netlink.Route, rules []*netlink.Rule) error {
  149. rs := make(map[string]interface{})
  150. for _, route := range routes {
  151. if route == nil {
  152. continue
  153. }
  154. rs[routeToString(route)] = route
  155. }
  156. for _, rule := range rules {
  157. if rule == nil {
  158. continue
  159. }
  160. rs[ruleToString(rule)] = rule
  161. }
  162. t.mu.Lock()
  163. defer t.mu.Unlock()
  164. for k := range t.rs {
  165. if _, ok := rs[k]; !ok {
  166. switch r := t.rs[k].(type) {
  167. case *netlink.Route:
  168. if err := t.delRoute(r); err != nil {
  169. return fmt.Errorf("failed to delete route: %v", err)
  170. }
  171. case *netlink.Rule:
  172. if err := t.delRule(r); err != nil {
  173. return fmt.Errorf("failed to delete rule: %v", err)
  174. }
  175. }
  176. delete(t.rs, k)
  177. }
  178. }
  179. // When adding routes/rules, we need to compare against what is
  180. // actually on the Linux routing table. This is because
  181. // routes/rules can be deleted by the kernel due to interface churn
  182. // causing a situation where the controller thinks it has an item
  183. // that is not actually there.
  184. existing := make(map[string]interface{})
  185. existingRoutes, err := netlink.RouteList(nil, netlink.FAMILY_ALL)
  186. if err != nil {
  187. return fmt.Errorf("failed to list existing routes: %v", err)
  188. }
  189. for k := range existingRoutes {
  190. existing[routeToString(&existingRoutes[k])] = &existingRoutes[k]
  191. }
  192. existingRules, err := netlink.RuleList(netlink.FAMILY_ALL)
  193. if err != nil {
  194. return fmt.Errorf("failed to list existing rules: %v", err)
  195. }
  196. for k := range existingRules {
  197. existing[ruleToString(&existingRules[k])] = &existingRules[k]
  198. }
  199. for k := range rs {
  200. if _, ok := existing[k]; !ok {
  201. switch r := rs[k].(type) {
  202. case *netlink.Route:
  203. if err := t.addRoute(r); err != nil {
  204. return fmt.Errorf("failed to add route %q: %v", k, err)
  205. }
  206. case *netlink.Rule:
  207. if err := t.addRule(r); err != nil {
  208. return fmt.Errorf("failed to add rule %q: %v", k, err)
  209. }
  210. }
  211. t.rs[k] = rs[k]
  212. }
  213. }
  214. return nil
  215. }
  216. func nonBlockingSend(errors chan<- error, err error) {
  217. select {
  218. case errors <- err:
  219. default:
  220. }
  221. }
  222. func routeToString(route *netlink.Route) string {
  223. if route == nil || route.Dst == nil {
  224. return ""
  225. }
  226. src := "-"
  227. if route.Src != nil {
  228. src = route.Src.String()
  229. }
  230. gw := "-"
  231. if route.Gw != nil {
  232. gw = route.Gw.String()
  233. }
  234. return fmt.Sprintf("dst: %s, via: %s, src: %s, dev: %d", route.Dst.String(), gw, src, route.LinkIndex)
  235. }
  236. func ruleToString(rule *netlink.Rule) string {
  237. if rule == nil || (rule.Src == nil && rule.Dst == nil) {
  238. return ""
  239. }
  240. src := "-"
  241. if rule.Src != nil {
  242. src = rule.Src.String()
  243. }
  244. dst := "-"
  245. if rule.Dst != nil {
  246. dst = rule.Dst.String()
  247. }
  248. return fmt.Sprintf("src: %s, dst: %s, table: %d, input: %s", src, dst, rule.Table, rule.IifName)
  249. }