agent.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. package docker
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "os"
  10. "strings"
  11. "time"
  12. "github.com/digitalocean/godo"
  13. "github.com/docker/distribution/reference"
  14. "github.com/docker/docker/api/types"
  15. "github.com/docker/docker/api/types/container"
  16. "github.com/docker/docker/api/types/filters"
  17. "github.com/docker/docker/api/types/network"
  18. "github.com/docker/docker/api/types/volume"
  19. "github.com/docker/docker/client"
  20. "github.com/moby/moby/pkg/jsonmessage"
  21. "github.com/moby/term"
  22. )
  23. // Agent is a Docker client for performing operations that interact
  24. // with the Docker engine over REST
  25. type Agent struct {
  26. *client.Client
  27. authGetter *AuthGetter
  28. ctx context.Context
  29. label string
  30. }
  31. // CreateLocalVolumeIfNotExist creates a volume using driver type "local" with the
  32. // given name if it does not exist. If the volume does exist but does not contain
  33. // the required label (a.label), an error is thrown.
  34. func (a *Agent) CreateLocalVolumeIfNotExist(name string) (*types.Volume, error) {
  35. volListBody, err := a.VolumeList(a.ctx, filters.Args{})
  36. if err != nil {
  37. return nil, a.handleDockerClientErr(err, "Could not list volumes")
  38. }
  39. for _, _vol := range volListBody.Volumes {
  40. if contains, ok := _vol.Labels[a.label]; ok && contains == "true" && _vol.Name == name {
  41. return _vol, nil
  42. } else if !ok && _vol.Name == name {
  43. return nil, fmt.Errorf("volume conflict for %s: please remove existing volume and try again", name)
  44. }
  45. }
  46. return a.CreateLocalVolume(name)
  47. }
  48. // CreateLocalVolume creates a volume using driver type "local" with no
  49. // configured options. The equivalent of:
  50. //
  51. // docker volume create --driver local [name]
  52. func (a *Agent) CreateLocalVolume(name string) (*types.Volume, error) {
  53. labels := make(map[string]string)
  54. labels[a.label] = "true"
  55. opts := volume.VolumeCreateBody{
  56. Name: name,
  57. Driver: "local",
  58. Labels: labels,
  59. }
  60. vol, err := a.VolumeCreate(a.ctx, opts)
  61. if err != nil {
  62. return nil, a.handleDockerClientErr(err, "Could not create volume "+name)
  63. }
  64. return &vol, nil
  65. }
  66. // RemoveLocalVolume removes a volume by name
  67. func (a *Agent) RemoveLocalVolume(name string) error {
  68. return a.VolumeRemove(a.ctx, name, true)
  69. }
  70. // CreateBridgeNetworkIfNotExist creates a volume using driver type "local" with the
  71. // given name if it does not exist. If the volume does exist but does not contain
  72. // the required label (a.label), an error is thrown.
  73. func (a *Agent) CreateBridgeNetworkIfNotExist(name string) (id string, err error) {
  74. networks, err := a.NetworkList(a.ctx, types.NetworkListOptions{})
  75. if err != nil {
  76. return "", a.handleDockerClientErr(err, "Could not list volumes")
  77. }
  78. for _, net := range networks {
  79. if contains, ok := net.Labels[a.label]; ok && contains == "true" && net.Name == name {
  80. return net.ID, nil
  81. } else if !ok && net.Name == name {
  82. return "", fmt.Errorf("network conflict for %s: please remove existing network and try again", name)
  83. }
  84. }
  85. return a.CreateBridgeNetwork(name)
  86. }
  87. // CreateBridgeNetwork creates a volume using the default driver type (bridge)
  88. // with the CLI label attached
  89. func (a *Agent) CreateBridgeNetwork(name string) (id string, err error) {
  90. labels := make(map[string]string)
  91. labels[a.label] = "true"
  92. opts := types.NetworkCreate{
  93. Labels: labels,
  94. Attachable: true,
  95. }
  96. net, err := a.NetworkCreate(a.ctx, name, opts)
  97. if err != nil {
  98. return "", a.handleDockerClientErr(err, "Could not create network "+name)
  99. }
  100. return net.ID, nil
  101. }
  102. // ConnectContainerToNetwork attaches a container to a specified network
  103. func (a *Agent) ConnectContainerToNetwork(networkID, containerID, containerName string) error {
  104. // check if the container is connected already
  105. net, err := a.NetworkInspect(a.ctx, networkID, types.NetworkInspectOptions{})
  106. if err != nil {
  107. return a.handleDockerClientErr(err, "Could not inspect network"+networkID)
  108. }
  109. for _, cont := range net.Containers {
  110. // if container is connected, just return
  111. if cont.Name == containerName {
  112. return nil
  113. }
  114. }
  115. return a.NetworkConnect(a.ctx, networkID, containerID, &network.EndpointSettings{})
  116. }
  117. func (a *Agent) TagImage(old, new string) error {
  118. return a.ImageTag(a.ctx, old, new)
  119. }
  120. // PullImageEvent represents a response from the Docker API with an image pull event
  121. type PullImageEvent struct {
  122. Status string `json:"status"`
  123. Error string `json:"error"`
  124. Progress string `json:"progress"`
  125. ProgressDetail struct {
  126. Current int `json:"current"`
  127. Total int `json:"total"`
  128. } `json:"progressDetail"`
  129. }
  130. var PullImageErrNotFound = fmt.Errorf("Requested image not found")
  131. var PullImageErrUnauthorized = fmt.Errorf("Could not pull image: unauthorized")
  132. func getRegistryRepositoryPair(imageRepo string) ([]string, error) {
  133. named, err := reference.ParseNamed(imageRepo)
  134. if err != nil {
  135. return nil, err
  136. }
  137. path := reference.Path(named)
  138. return strings.SplitN(path, "/", 2), nil
  139. }
  140. // CheckIfImageExists checks if the image exists in the registry
  141. func (a *Agent) CheckIfImageExists(imageRepo, imageTag string) bool {
  142. registryToken, err := a.getContainerRegistryToken(imageRepo)
  143. if err != nil {
  144. return false
  145. }
  146. ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
  147. defer cancel()
  148. if strings.Contains(imageRepo, "gcr.io") {
  149. gcrRegRepo, err := getRegistryRepositoryPair(imageRepo)
  150. if err != nil {
  151. return false
  152. }
  153. named, err := reference.ParseNamed(imageRepo)
  154. if err != nil {
  155. return false
  156. }
  157. req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf(
  158. "https://%s/v2/%s/%s/tags/list", reference.Domain(named), gcrRegRepo[0], gcrRegRepo[1],
  159. ), nil)
  160. if err != nil {
  161. return false
  162. }
  163. req.Header.Add("Content-Type", "application/json")
  164. req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", registryToken))
  165. resp, err := http.DefaultClient.Do(req)
  166. if err != nil {
  167. return false
  168. }
  169. defer resp.Body.Close()
  170. var tags struct {
  171. Tags []string `json:"tags,omitempty"`
  172. }
  173. err = json.NewDecoder(resp.Body).Decode(&tags)
  174. if err != nil {
  175. return false
  176. }
  177. for _, tag := range tags.Tags {
  178. if tag == imageTag {
  179. return true
  180. }
  181. }
  182. return false
  183. } else if strings.Contains(imageRepo, "registry.digitalocean.com") {
  184. doRegRepo, err := getRegistryRepositoryPair(imageRepo)
  185. if err != nil {
  186. return false
  187. }
  188. doClient := godo.NewFromToken(registryToken)
  189. manifests, _, err := doClient.Registry.ListRepositoryManifests(
  190. ctx, doRegRepo[0], doRegRepo[1], &godo.ListOptions{},
  191. )
  192. if err != nil {
  193. return false
  194. }
  195. for _, manifest := range manifests {
  196. for _, tag := range manifest.Tags {
  197. if tag == imageTag {
  198. return true
  199. }
  200. }
  201. }
  202. return false
  203. }
  204. image := imageRepo + ":" + imageTag
  205. encodedRegistryAuth, err := a.getEncodedRegistryAuth(image)
  206. if err != nil {
  207. return false
  208. }
  209. _, err = a.DistributionInspect(context.Background(), image, encodedRegistryAuth)
  210. if err == nil {
  211. return true
  212. } else if strings.Contains(err.Error(), "image not found") ||
  213. strings.Contains(err.Error(), "does not exist in the registry") {
  214. return false
  215. }
  216. return false
  217. }
  218. // PullImage pulls an image specified by the image string
  219. func (a *Agent) PullImage(image string) error {
  220. opts, err := a.getPullOptions(image)
  221. if err != nil {
  222. return err
  223. }
  224. // pull the specified image
  225. out, err := a.ImagePull(a.ctx, image, opts)
  226. if err != nil {
  227. if client.IsErrNotFound(err) ||
  228. (strings.Contains(image, "gcr.io") && strings.Contains(err.Error(), "or it may not exist")) {
  229. return PullImageErrNotFound
  230. } else if client.IsErrUnauthorized(err) {
  231. return PullImageErrUnauthorized
  232. } else {
  233. return a.handleDockerClientErr(err, "Could not pull image "+image)
  234. }
  235. }
  236. defer out.Close()
  237. termFd, isTerm := term.GetFdInfo(os.Stderr)
  238. return jsonmessage.DisplayJSONMessagesStream(out, os.Stderr, termFd, isTerm, nil)
  239. }
  240. // PushImage pushes an image specified by the image string
  241. func (a *Agent) PushImage(image string) error {
  242. opts, err := a.getPushOptions(image)
  243. if err != nil {
  244. return err
  245. }
  246. out, err := a.ImagePush(
  247. context.Background(),
  248. image,
  249. opts,
  250. )
  251. if out != nil {
  252. defer out.Close()
  253. }
  254. if err != nil {
  255. return err
  256. }
  257. termFd, isTerm := term.GetFdInfo(os.Stderr)
  258. err = jsonmessage.DisplayJSONMessagesStream(out, os.Stderr, termFd, isTerm, nil)
  259. if err != nil {
  260. return err
  261. }
  262. return nil
  263. }
  264. func (a *Agent) getPullOptions(image string) (types.ImagePullOptions, error) {
  265. // check if agent has an auth getter; otherwise, assume public usage
  266. if a.authGetter == nil {
  267. return types.ImagePullOptions{}, nil
  268. }
  269. authConfigEncoded, err := a.getEncodedRegistryAuth(image)
  270. if err != nil {
  271. return types.ImagePullOptions{}, err
  272. }
  273. fmt.Println(authConfigEncoded)
  274. return types.ImagePullOptions{
  275. RegistryAuth: authConfigEncoded,
  276. Platform: "linux/amd64",
  277. }, nil
  278. }
  279. func (a *Agent) getContainerRegistryToken(image string) (string, error) {
  280. serverURL, err := GetServerURLFromTag(image)
  281. if err != nil {
  282. return "", err
  283. }
  284. _, secret, err := a.authGetter.GetCredentials(serverURL)
  285. if err != nil {
  286. return "", err
  287. }
  288. return secret, nil
  289. }
  290. func (a *Agent) getEncodedRegistryAuth(image string) (string, error) {
  291. // get using server url
  292. serverURL, err := GetServerURLFromTag(image)
  293. if err != nil {
  294. return "", err
  295. }
  296. user, secret, err := a.authGetter.GetCredentials(serverURL)
  297. if err != nil {
  298. return "", err
  299. }
  300. serverAddress := serverURL
  301. if !strings.Contains(serverURL, "https://") {
  302. serverAddress = fmt.Sprintf("https://%s", serverURL)
  303. }
  304. authConfig := types.AuthConfig{
  305. Username: user,
  306. Password: secret,
  307. ServerAddress: serverAddress,
  308. }
  309. authConfigBytes, err := json.Marshal(authConfig)
  310. if err != nil {
  311. return "", fmt.Errorf("unable to marshal docker auth config: %w", err)
  312. }
  313. return base64.URLEncoding.EncodeToString(authConfigBytes), nil
  314. }
  315. func (a *Agent) getPushOptions(image string) (types.ImagePushOptions, error) {
  316. pullOpts, err := a.getPullOptions(image)
  317. return types.ImagePushOptions(pullOpts), err
  318. }
  319. func GetServerURLFromTag(image string) (string, error) {
  320. named, err := reference.ParseNormalizedNamed(image)
  321. if err != nil {
  322. return "", err
  323. }
  324. domain := reference.Domain(named)
  325. if domain == "" {
  326. // if domain name is empty, use index.docker.io/v1
  327. return "index.docker.io/v1", nil
  328. } else if matches := ecrPattern.FindStringSubmatch(image); len(matches) >= 3 {
  329. // if this matches ECR, just use the domain name
  330. return domain, nil
  331. } else if strings.Contains(image, "gcr.io") || strings.Contains(image, "registry.digitalocean.com") {
  332. // if this matches GCR or DOCR, use the first path component
  333. return fmt.Sprintf("%s/%s", domain, strings.Split(reference.Path(named), "/")[0]), nil
  334. }
  335. // otherwise, best-guess is to get components of path that aren't the image name
  336. pathParts := strings.Split(reference.Path(named), "/")
  337. nonImagePath := ""
  338. if len(pathParts) > 1 {
  339. nonImagePath = strings.Join(pathParts[0:len(pathParts)-1], "/")
  340. }
  341. if err != nil {
  342. return "", err
  343. }
  344. if domain == "docker.io" {
  345. domain = "index.docker.io"
  346. }
  347. return fmt.Sprintf("%s/%s", domain, nonImagePath), nil
  348. }
  349. // WaitForContainerStop waits until a container has stopped to exit
  350. func (a *Agent) WaitForContainerStop(id string) error {
  351. // wait for container to stop before exit
  352. statusCh, errCh := a.ContainerWait(a.ctx, id, container.WaitConditionNotRunning)
  353. select {
  354. case err := <-errCh:
  355. if err != nil {
  356. return a.handleDockerClientErr(err, "Error waiting for stopped container")
  357. }
  358. case <-statusCh:
  359. }
  360. return nil
  361. }
  362. // WaitForContainerHealthy waits until a container is returning a healthy status. Streak
  363. // is the maximum number of failures in a row, while timeout is the length of time between
  364. // checks.
  365. func (a *Agent) WaitForContainerHealthy(id string, streak int) error {
  366. for {
  367. cont, err := a.ContainerInspect(a.ctx, id)
  368. if err != nil {
  369. return a.handleDockerClientErr(err, "Error waiting for stopped container")
  370. }
  371. health := cont.State.Health
  372. if health == nil || health.Status == "healthy" {
  373. return nil
  374. } else if health.FailingStreak >= streak {
  375. break
  376. }
  377. time.Sleep(time.Second)
  378. }
  379. return errors.New("container not healthy")
  380. }
  381. // ------------------------- AGENT HELPER FUNCTIONS ------------------------- //
  382. func (a *Agent) handleDockerClientErr(err error, errPrefix string) error {
  383. if strings.Contains(err.Error(), "Cannot connect to the Docker daemon") {
  384. return fmt.Errorf("The Docker daemon must be running in order to start Porter: connection to %s failed", a.DaemonHost())
  385. }
  386. return fmt.Errorf("%s:%s", errPrefix, err.Error())
  387. }