Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print combined progress when pulling #31

Merged
merged 2 commits into from
Oct 4, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
distribution: calculate combined progress when pulling
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
  • Loading branch information
petrosagg committed Oct 3, 2017
commit 36ea648767681f57e2717f39ce09bc94cf4759a6
4 changes: 4 additions & 0 deletions distribution/pull_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,7 @@ func (ld *v1LayerDescriptor) Registered(diffID layer.DiffID) {
// Cache mapping from this layer's DiffID to the blobsum
ld.v1IDService.Set(ld.v1LayerID, ld.indexName, diffID)
}

func (ld *v1LayerDescriptor) Size() int64 {
return ld.layerSize
}
4 changes: 4 additions & 0 deletions distribution/pull_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
ld.V2MetadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.Name.Name()})
}

func (ld *v2LayerDescriptor) Size() int64 {
return ld.src.Size
}

func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdated bool, err error) {
manSvc, err := p.repo.Manifests(ctx)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions distribution/xfer/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type DownloadDescriptor interface {
// if it is unknown (for example, if it has not been downloaded
// before).
DiffID() (layer.DiffID, error)
Size() int64
// Download is called to perform the download.
Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error)
// Return the DeltaBase if any
Expand Down Expand Up @@ -114,6 +115,8 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
platform = layer.Platform(runtime.GOOS)
}

totalProgress := progress.NewProgressSink(progressOutput, 0, "Total", "")

rootFS := initialRootFS
for _, descriptor := range layers {
key := descriptor.Key()
Expand Down Expand Up @@ -159,13 +162,14 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima

// Layer is not known to exist - download and register it.
progress.Update(progressOutput, descriptor.ID(), "Pulling fs layer")
totalProgress.Size += descriptor.Size()

var xferFunc DoFunc
if topDownload != nil {
xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload, platform)
xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload, platform, totalProgress)
defer topDownload.Transfer.Release(watcher)
} else {
xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil, platform)
xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil, platform, totalProgress)
}
topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
topDownload = topDownloadUncasted.(*downloadTransfer)
Expand Down Expand Up @@ -222,7 +226,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
// complete before the registration step, and registers the downloaded data
// on top of parentDownload's resulting layer. Otherwise, it registers the
// layer on top of the ChainID given by parentLayer.
func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer, platform layer.Platform) DoFunc {
func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer, platform layer.Platform, totalProgress io.Writer) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
d := &downloadTransfer{
Transfer: NewTransfer(),
Expand Down Expand Up @@ -334,7 +338,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.Transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
defer reader.Close()

inflatedLayerData, err := archive.DecompressStream(reader)
inflatedLayerData, err := archive.DecompressStream(io.TeeReader(reader, totalProgress))
if err != nil {
d.err = fmt.Errorf("could not get decompression stream: %v", err)
return
Expand Down