agent.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. package docker
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "os"
  11. "strings"
  12. "time"
  13. "github.com/digitalocean/godo"
  14. "github.com/docker/distribution/reference"
  15. "github.com/docker/docker/api/types"
  16. "github.com/docker/docker/api/types/container"
  17. "github.com/docker/docker/api/types/filters"
  18. "github.com/docker/docker/api/types/network"
  19. "github.com/docker/docker/api/types/volume"
  20. "github.com/docker/docker/client"
  21. "github.com/moby/moby/pkg/jsonmessage"
  22. "github.com/moby/term"
  23. )
  24. // Agent is a Docker client for performing operations that interact
  25. // with the Docker engine over REST
  26. type Agent struct {
  27. *client.Client
  28. authGetter *AuthGetter
  29. ctx context.Context
  30. label string
  31. }
  32. // ImagePull overrides the default docker client ImagePull to inject registry credentials
  33. func (a *Agent) ImagePull(ctx context.Context, refStr string, options types.ImagePullOptions) (io.ReadCloser, error) {
  34. opts, err := a.getPullOptions(refStr)
  35. if err != nil {
  36. return a.Client.ImagePull(ctx, refStr, options)
  37. }
  38. return a.Client.ImagePull(ctx, refStr, opts)
  39. }
  40. // ImagePush overrides the default docker client ImagePush to inject registry credentials
  41. func (a *Agent) ImagePush(ctx context.Context, image string, options types.ImagePushOptions) (io.ReadCloser, error) {
  42. opts, err := a.getPushOptions(image)
  43. if err != nil {
  44. return a.Client.ImagePush(ctx, image, options)
  45. }
  46. return a.Client.ImagePush(ctx, image, opts)
  47. }
  48. // CreateLocalVolumeIfNotExist creates a volume using driver type "local" with the
  49. // given name if it does not exist. If the volume does exist but does not contain
  50. // the required label (a.label), an error is thrown.
  51. func (a *Agent) CreateLocalVolumeIfNotExist(name string) (*types.Volume, error) {
  52. volListBody, err := a.VolumeList(a.ctx, filters.Args{})
  53. if err != nil {
  54. return nil, a.handleDockerClientErr(err, "Could not list volumes")
  55. }
  56. for _, _vol := range volListBody.Volumes {
  57. if contains, ok := _vol.Labels[a.label]; ok && contains == "true" && _vol.Name == name {
  58. return _vol, nil
  59. } else if !ok && _vol.Name == name {
  60. return nil, fmt.Errorf("volume conflict for %s: please remove existing volume and try again", name)
  61. }
  62. }
  63. return a.CreateLocalVolume(name)
  64. }
  65. // CreateLocalVolume creates a volume using driver type "local" with no
  66. // configured options. The equivalent of:
  67. //
  68. // docker volume create --driver local [name]
  69. func (a *Agent) CreateLocalVolume(name string) (*types.Volume, error) {
  70. labels := make(map[string]string)
  71. labels[a.label] = "true"
  72. opts := volume.VolumeCreateBody{
  73. Name: name,
  74. Driver: "local",
  75. Labels: labels,
  76. }
  77. vol, err := a.VolumeCreate(a.ctx, opts)
  78. if err != nil {
  79. return nil, a.handleDockerClientErr(err, "Could not create volume "+name)
  80. }
  81. return &vol, nil
  82. }
  83. // RemoveLocalVolume removes a volume by name
  84. func (a *Agent) RemoveLocalVolume(name string) error {
  85. return a.VolumeRemove(a.ctx, name, true)
  86. }
  87. // CreateBridgeNetworkIfNotExist creates a volume using driver type "local" with the
  88. // given name if it does not exist. If the volume does exist but does not contain
  89. // the required label (a.label), an error is thrown.
  90. func (a *Agent) CreateBridgeNetworkIfNotExist(name string) (id string, err error) {
  91. networks, err := a.NetworkList(a.ctx, types.NetworkListOptions{})
  92. if err != nil {
  93. return "", a.handleDockerClientErr(err, "Could not list volumes")
  94. }
  95. for _, net := range networks {
  96. if contains, ok := net.Labels[a.label]; ok && contains == "true" && net.Name == name {
  97. return net.ID, nil
  98. } else if !ok && net.Name == name {
  99. return "", fmt.Errorf("network conflict for %s: please remove existing network and try again", name)
  100. }
  101. }
  102. return a.CreateBridgeNetwork(name)
  103. }
  104. // CreateBridgeNetwork creates a volume using the default driver type (bridge)
  105. // with the CLI label attached
  106. func (a *Agent) CreateBridgeNetwork(name string) (id string, err error) {
  107. labels := make(map[string]string)
  108. labels[a.label] = "true"
  109. opts := types.NetworkCreate{
  110. Labels: labels,
  111. Attachable: true,
  112. }
  113. net, err := a.NetworkCreate(a.ctx, name, opts)
  114. if err != nil {
  115. return "", a.handleDockerClientErr(err, "Could not create network "+name)
  116. }
  117. return net.ID, nil
  118. }
  119. // ConnectContainerToNetwork attaches a container to a specified network
  120. func (a *Agent) ConnectContainerToNetwork(networkID, containerID, containerName string) error {
  121. // check if the container is connected already
  122. net, err := a.NetworkInspect(a.ctx, networkID, types.NetworkInspectOptions{})
  123. if err != nil {
  124. return a.handleDockerClientErr(err, "Could not inspect network"+networkID)
  125. }
  126. for _, cont := range net.Containers {
  127. // if container is connected, just return
  128. if cont.Name == containerName {
  129. return nil
  130. }
  131. }
  132. return a.NetworkConnect(a.ctx, networkID, containerID, &network.EndpointSettings{})
  133. }
  134. func (a *Agent) TagImage(old, new string) error {
  135. return a.ImageTag(a.ctx, old, new)
  136. }
  137. // PullImageEvent represents a response from the Docker API with an image pull event
  138. type PullImageEvent struct {
  139. Status string `json:"status"`
  140. Error string `json:"error"`
  141. Progress string `json:"progress"`
  142. ProgressDetail struct {
  143. Current int `json:"current"`
  144. Total int `json:"total"`
  145. } `json:"progressDetail"`
  146. }
  147. var PullImageErrNotFound = fmt.Errorf("Requested image not found")
  148. var PullImageErrUnauthorized = fmt.Errorf("Could not pull image: unauthorized")
  149. func getRegistryRepositoryPair(imageRepo string) ([]string, error) {
  150. named, err := reference.ParseNamed(imageRepo)
  151. if err != nil {
  152. return nil, err
  153. }
  154. path := reference.Path(named)
  155. return strings.SplitN(path, "/", 2), nil
  156. }
  157. // CheckIfImageExists checks if the image exists in the registry
  158. func (a *Agent) CheckIfImageExists(imageRepo, imageTag string) bool {
  159. registryToken, err := a.getContainerRegistryToken(imageRepo)
  160. if err != nil {
  161. return false
  162. }
  163. ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
  164. defer cancel()
  165. if strings.Contains(imageRepo, "gcr.io") {
  166. gcrRegRepo, err := getRegistryRepositoryPair(imageRepo)
  167. if err != nil {
  168. return false
  169. }
  170. named, err := reference.ParseNamed(imageRepo)
  171. if err != nil {
  172. return false
  173. }
  174. req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf(
  175. "https://%s/v2/%s/%s/tags/list", reference.Domain(named), gcrRegRepo[0], gcrRegRepo[1],
  176. ), nil)
  177. if err != nil {
  178. return false
  179. }
  180. req.Header.Add("Content-Type", "application/json")
  181. req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", registryToken))
  182. resp, err := http.DefaultClient.Do(req)
  183. if err != nil {
  184. return false
  185. }
  186. defer resp.Body.Close()
  187. var tags struct {
  188. Tags []string `json:"tags,omitempty"`
  189. }
  190. err = json.NewDecoder(resp.Body).Decode(&tags)
  191. if err != nil {
  192. return false
  193. }
  194. for _, tag := range tags.Tags {
  195. if tag == imageTag {
  196. return true
  197. }
  198. }
  199. return false
  200. } else if strings.Contains(imageRepo, "registry.digitalocean.com") {
  201. doRegRepo, err := getRegistryRepositoryPair(imageRepo)
  202. if err != nil {
  203. return false
  204. }
  205. doClient := godo.NewFromToken(registryToken)
  206. manifests, _, err := doClient.Registry.ListRepositoryManifests(
  207. ctx, doRegRepo[0], doRegRepo[1], &godo.ListOptions{},
  208. )
  209. if err != nil {
  210. return false
  211. }
  212. for _, manifest := range manifests {
  213. for _, tag := range manifest.Tags {
  214. if tag == imageTag {
  215. return true
  216. }
  217. }
  218. }
  219. return false
  220. }
  221. image := imageRepo + ":" + imageTag
  222. encodedRegistryAuth, err := a.getEncodedRegistryAuth(image)
  223. if err != nil {
  224. return false
  225. }
  226. _, err = a.DistributionInspect(context.Background(), image, encodedRegistryAuth)
  227. if err == nil {
  228. return true
  229. } else if strings.Contains(err.Error(), "image not found") ||
  230. strings.Contains(err.Error(), "does not exist in the registry") {
  231. return false
  232. }
  233. return false
  234. }
  235. // PullImage pulls an image specified by the image string
  236. func (a *Agent) PullImage(image string) error {
  237. opts, err := a.getPullOptions(image)
  238. if err != nil {
  239. return err
  240. }
  241. // pull the specified image
  242. out, err := a.ImagePull(a.ctx, image, opts)
  243. if err != nil {
  244. if client.IsErrNotFound(err) {
  245. return PullImageErrNotFound
  246. } else if client.IsErrUnauthorized(err) {
  247. return PullImageErrUnauthorized
  248. } else {
  249. return a.handleDockerClientErr(err, "Could not pull image "+image)
  250. }
  251. }
  252. defer out.Close()
  253. termFd, isTerm := term.GetFdInfo(os.Stderr)
  254. return jsonmessage.DisplayJSONMessagesStream(out, os.Stderr, termFd, isTerm, nil)
  255. }
  256. // PushImage pushes an image specified by the image string
  257. func (a *Agent) PushImage(image string) error {
  258. opts, err := a.getPushOptions(image)
  259. if err != nil {
  260. return err
  261. }
  262. out, err := a.ImagePush(
  263. context.Background(),
  264. image,
  265. opts,
  266. )
  267. if out != nil {
  268. defer out.Close()
  269. }
  270. if err != nil {
  271. return err
  272. }
  273. termFd, isTerm := term.GetFdInfo(os.Stderr)
  274. return jsonmessage.DisplayJSONMessagesStream(out, os.Stderr, termFd, isTerm, nil)
  275. }
  276. func (a *Agent) getPullOptions(image string) (types.ImagePullOptions, error) {
  277. // check if agent has an auth getter; otherwise, assume public usage
  278. if a.authGetter == nil {
  279. return types.ImagePullOptions{}, nil
  280. }
  281. authConfigEncoded, err := a.getEncodedRegistryAuth(image)
  282. if err != nil {
  283. return types.ImagePullOptions{}, err
  284. }
  285. return types.ImagePullOptions{
  286. RegistryAuth: authConfigEncoded,
  287. Platform: "linux/amd64",
  288. }, nil
  289. }
  290. func (a *Agent) getContainerRegistryToken(image string) (string, error) {
  291. serverURL, err := GetServerURLFromTag(image)
  292. if err != nil {
  293. return "", err
  294. }
  295. _, secret, err := a.authGetter.GetCredentials(serverURL)
  296. if err != nil {
  297. return "", err
  298. }
  299. return secret, nil
  300. }
  301. func (a *Agent) getEncodedRegistryAuth(image string) (string, error) {
  302. // get using server url
  303. serverURL, err := GetServerURLFromTag(image)
  304. if err != nil {
  305. return "", err
  306. }
  307. user, secret, err := a.authGetter.GetCredentials(serverURL)
  308. if err != nil {
  309. return "", err
  310. }
  311. var authConfig = types.AuthConfig{
  312. Username: user,
  313. Password: secret,
  314. ServerAddress: "https://" + serverURL,
  315. }
  316. authConfigBytes, _ := json.Marshal(authConfig)
  317. return base64.URLEncoding.EncodeToString(authConfigBytes), nil
  318. }
  319. func (a *Agent) getPushOptions(image string) (types.ImagePushOptions, error) {
  320. pullOpts, err := a.getPullOptions(image)
  321. return types.ImagePushOptions(pullOpts), err
  322. }
  323. func GetServerURLFromTag(image string) (string, error) {
  324. named, err := reference.ParseNamed(image)
  325. if err != nil {
  326. return "", err
  327. }
  328. domain := reference.Domain(named)
  329. if domain == "" {
  330. // if domain name is empty, use index.docker.io/v1
  331. return "index.docker.io/v1", nil
  332. } else if matches := ecrPattern.FindStringSubmatch(image); len(matches) >= 3 {
  333. // if this matches ECR, just use the domain name
  334. return domain, nil
  335. } else if strings.Contains(image, "gcr.io") || strings.Contains(image, "registry.digitalocean.com") {
  336. // if this matches GCR or DOCR, use the first path component
  337. return fmt.Sprintf("%s/%s", domain, strings.Split(reference.Path(named), "/")[0]), nil
  338. }
  339. // otherwise, best-guess is to get components of path that aren't the image name
  340. pathParts := strings.Split(reference.Path(named), "/")
  341. nonImagePath := ""
  342. if len(pathParts) > 1 {
  343. nonImagePath = strings.Join(pathParts[0:len(pathParts)-1], "/")
  344. }
  345. if err != nil {
  346. return "", err
  347. }
  348. return fmt.Sprintf("%s/%s", domain, nonImagePath), nil
  349. }
  350. // WaitForContainerStop waits until a container has stopped to exit
  351. func (a *Agent) WaitForContainerStop(id string) error {
  352. // wait for container to stop before exit
  353. statusCh, errCh := a.ContainerWait(a.ctx, id, container.WaitConditionNotRunning)
  354. select {
  355. case err := <-errCh:
  356. if err != nil {
  357. return a.handleDockerClientErr(err, "Error waiting for stopped container")
  358. }
  359. case <-statusCh:
  360. }
  361. return nil
  362. }
  363. // WaitForContainerHealthy waits until a container is returning a healthy status. Streak
  364. // is the maximum number of failures in a row, while timeout is the length of time between
  365. // checks.
  366. func (a *Agent) WaitForContainerHealthy(id string, streak int) error {
  367. for {
  368. cont, err := a.ContainerInspect(a.ctx, id)
  369. if err != nil {
  370. return a.handleDockerClientErr(err, "Error waiting for stopped container")
  371. }
  372. health := cont.State.Health
  373. if health == nil || health.Status == "healthy" {
  374. return nil
  375. } else if health.FailingStreak >= streak {
  376. break
  377. }
  378. time.Sleep(time.Second)
  379. }
  380. return errors.New("container not healthy")
  381. }
  382. // ------------------------- AGENT HELPER FUNCTIONS ------------------------- //
  383. func (a *Agent) handleDockerClientErr(err error, errPrefix string) error {
  384. if strings.Contains(err.Error(), "Cannot connect to the Docker daemon") {
  385. return fmt.Errorf("The Docker daemon must be running in order to start Porter: connection to %s failed", a.DaemonHost())
  386. }
  387. return fmt.Errorf("%s:%s", errPrefix, err.Error())
  388. }