[lxc-devel] [lxd/master] lxd: Send operation progress when creating image or container.
joelhockey on Github
lxc-bot at linuxcontainers.org
Wed Jan 30 11:36:12 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 1233 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190130/f83dbe2e/attachment-0001.bin>
-------------- next part --------------
From c32ec5da54ab5379a3357c623d92bb917982f032 Mon Sep 17 00:00:00 2001
From: Joel Hockey <joelhockey at chromium.org>
Date: Thu, 24 Jan 2019 01:55:10 -0800
Subject: [PATCH] lxd: Send operation progress when creating image or
container.
Added metadata fields 'stage', 'percent', 'speed' to track
long running tasks in some operations.
In CreateImage, added stages 'create_image_from_container_tar'
and 'create_image_from_container_compress'.
In CreateContainerFromImage, added stage
'create_container_from_image_unpack'.
Modified images.go compressFile to take io.Reader and io.Writer
rather than a path name to allow imgPostContInfo to
supply an ioprogress.ProgressWriter that can track progress
of image compression.
Refactored imgPostContInfo to not require a separate pass
to determine the fingerprint when publishing an image from
a container. The sha256 calculation is either done during
tar, or during compression.
Added tracker field in storageShared where callers can set
a ProgressTracker which is used when unpacking an image to
create new container in unpackImage and Unpack
Signed-off-by: Joel Hockey <joelhockey at chromium.org>
---
lxd/backup.go | 14 ++++-
lxd/container.go | 4 +-
lxd/containers_post.go | 13 ++++-
lxd/images.go | 116 +++++++++++++++++++++++++++------------
lxd/patches.go | 14 ++++-
lxd/storage.go | 3 +
lxd/storage_btrfs.go | 2 +-
lxd/storage_ceph.go | 2 +-
lxd/storage_dir.go | 2 +-
lxd/storage_lvm.go | 2 +-
lxd/storage_lvm_utils.go | 2 +-
lxd/storage_shared.go | 7 +++
lxd/storage_zfs.go | 2 +-
shared/archive_linux.go | 37 +++++++++----
14 files changed, 161 insertions(+), 59 deletions(-)
diff --git a/lxd/backup.go b/lxd/backup.go
index ec1e4959c5..85ff67d413 100644
--- a/lxd/backup.go
+++ b/lxd/backup.go
@@ -353,7 +353,17 @@ func backupCreateTarball(s *state.State, path string, backup backup) error {
}
if compress != "none" {
- compressedPath, err := compressFile(backupPath, compress)
+ infile, err := os.Open(backupPath)
+ if err != nil {
+ return err
+ }
+ defer infile.Close()
+ compressed, err := os.Create(backupPath + ".compressed")
+ if err != nil {
+ return err
+ }
+ defer compressed.Close()
+ err = compressFile(compress, infile, compressed)
if err != nil {
return err
}
@@ -363,7 +373,7 @@ func backupCreateTarball(s *state.State, path string, backup backup) error {
return err
}
- err = os.Rename(compressedPath, backupPath)
+ err = os.Rename(compressed.Name(), backupPath)
if err != nil {
return err
}
diff --git a/lxd/container.go b/lxd/container.go
index 533f5d7c93..04920d78ac 100644
--- a/lxd/container.go
+++ b/lxd/container.go
@@ -25,6 +25,7 @@ import (
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
"github.com/lxc/lxd/shared/idmap"
+ "github.com/lxc/lxd/shared/ioprogress"
log "github.com/lxc/lxd/shared/log15"
"github.com/lxc/lxd/shared/logger"
"github.com/lxc/lxd/shared/osarch"
@@ -767,7 +768,7 @@ func containerCreateEmptySnapshot(s *state.State, args db.ContainerArgs) (contai
return c, nil
}
-func containerCreateFromImage(d *Daemon, args db.ContainerArgs, hash string) (container, error) {
+func containerCreateFromImage(d *Daemon, args db.ContainerArgs, hash string, tracker *ioprogress.ProgressTracker) (container, error) {
s := d.State()
// Get the image properties
@@ -826,6 +827,7 @@ func containerCreateFromImage(d *Daemon, args db.ContainerArgs, hash string) (co
}
// Now create the storage from an image
+ c.Storage().SetProgressTracker(tracker)
err = c.Storage().ContainerCreateFromImage(c, hash)
if err != nil {
c.Delete()
diff --git a/lxd/containers_post.go b/lxd/containers_post.go
index 1ec8a6b3c6..d632eaa388 100644
--- a/lxd/containers_post.go
+++ b/lxd/containers_post.go
@@ -11,6 +11,7 @@ import (
"net/http"
"net/url"
"os"
+ "strconv"
"strings"
"github.com/dustinkirkland/golang-petname"
@@ -23,6 +24,7 @@ import (
"github.com/lxc/lxd/lxd/types"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/ioprogress"
"github.com/lxc/lxd/shared/logger"
"github.com/lxc/lxd/shared/osarch"
@@ -128,7 +130,14 @@ func createFromImage(d *Daemon, project string, req *api.ContainersPost) Respons
return err
}
- _, err = containerCreateFromImage(d, args, info.Fingerprint)
+ metadata := make(map[string]string)
+ _, err = containerCreateFromImage(d, args, info.Fingerprint, &ioprogress.ProgressTracker{
+ Handler: func(percent, speed int64) {
+ metadata["stage"] = "create_container_from_image_unpack"
+ metadata["percent"] = strconv.FormatInt(percent, 10)
+ metadata["speed"] = strconv.FormatInt(speed, 10)
+ op.UpdateMetadata(metadata)
+ }})
return err
}
@@ -356,7 +365,7 @@ func createFromMigration(d *Daemon, project string, req *api.ContainersPost) Res
}
if ps.MigrationType() == migration.MigrationFSType_RSYNC {
- c, err = containerCreateFromImage(d, args, req.Source.BaseImage)
+ c, err = containerCreateFromImage(d, args, req.Source.BaseImage, nil)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/images.go b/lxd/images.go
index 59dadf433a..d5f0a7eac0 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -32,6 +32,7 @@ import (
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/ioprogress"
"github.com/lxc/lxd/shared/logger"
"github.com/lxc/lxd/shared/logging"
"github.com/lxc/lxd/shared/osarch"
@@ -97,14 +98,14 @@ var aliasCmd = Command{
end for whichever finishes last. */
var imagePublishLock sync.Mutex
-func unpackImage(imagefname string, destpath string, sType storageType, runningInUserns bool) error {
+func unpackImage(imagefname string, destpath string, sType storageType, runningInUserns bool, tracker *ioprogress.ProgressTracker) error {
blockBackend := false
if sType == storageTypeLvm || sType == storageTypeCeph {
blockBackend = true
}
- err := shared.Unpack(imagefname, destpath, blockBackend, runningInUserns)
+ err := shared.Unpack(imagefname, destpath, blockBackend, runningInUserns, tracker)
if err != nil {
return err
}
@@ -116,7 +117,7 @@ func unpackImage(imagefname string, destpath string, sType storageType, runningI
return fmt.Errorf("Error creating rootfs directory")
}
- err = shared.Unpack(imagefname+".rootfs", rootfsPath, blockBackend, runningInUserns)
+ err = shared.Unpack(imagefname+".rootfs", rootfsPath, blockBackend, runningInUserns, tracker)
if err != nil {
return err
}
@@ -129,7 +130,7 @@ func unpackImage(imagefname string, destpath string, sType storageType, runningI
return nil
}
-func compressFile(path string, compress string) (string, error) {
+func compressFile(compress string, infile io.Reader, outfile io.Writer) error {
reproducible := []string{"gzip"}
args := []string{"-c"}
@@ -137,31 +138,18 @@ func compressFile(path string, compress string) (string, error) {
args = append(args, "-n")
}
- args = append(args, path)
cmd := exec.Command(compress, args...)
-
- outfile, err := os.Create(path + ".compressed")
- if err != nil {
- return "", err
- }
-
- defer outfile.Close()
+ cmd.Stdin = infile
cmd.Stdout = outfile
- err = cmd.Run()
- if err != nil {
- os.Remove(outfile.Name())
- return "", err
- }
-
- return outfile.Name(), nil
+ return cmd.Run()
}
/*
* This function takes a container or snapshot from the local image server and
* exports it as an image.
*/
-func imgPostContInfo(d *Daemon, r *http.Request, req api.ImagesPost, builddir string) (*api.Image, error) {
+func imgPostContInfo(d *Daemon, r *http.Request, req api.ImagesPost, op *operation, builddir string) (*api.Image, error) {
info := api.Image{}
info.Properties = map[string]string{}
project := projectParam(r)
@@ -204,14 +192,33 @@ func imgPostContInfo(d *Daemon, r *http.Request, req api.ImagesPost, builddir st
}
defer os.Remove(tarfile.Name())
- if err := c.Export(tarfile, req.Properties); err != nil {
- tarfile.Close()
+ // Track progress writing tarfile
+ metadata := make(map[string]string)
+ tarfileProgressWriter := &ioprogress.ProgressWriter{
+ WriteCloser: tarfile,
+ Tracker: &ioprogress.ProgressTracker{
+ Handler: func(percent, speed int64) {
+ metadata["stage"] = "create_image_from_container_tar"
+ metadata["percent"] = strconv.FormatInt(percent, 10)
+ metadata["speed"] = strconv.FormatInt(speed, 10)
+ op.UpdateMetadata(metadata)
+ },
+ },
+ }
+ // Calculate (close estimate of) total size of tarfile
+ sumSize := func(path string, fi os.FileInfo, err error) error {
+ tarfileProgressWriter.Tracker.Length += fi.Size()
+ return nil
+ }
+ err = filepath.Walk(c.RootfsPath(), sumSize)
+ if err != nil {
return nil, err
}
- tarfile.Close()
+ sha256 := sha256.New()
var compressedPath string
var compress string
+ var writer io.Writer
if req.CompressionAlgorithm != "" {
compress = req.CompressionAlgorithm
@@ -221,29 +228,66 @@ func imgPostContInfo(d *Daemon, r *http.Request, req api.ImagesPost, builddir st
return nil, err
}
}
+ usingCompression := compress != "none"
- if compress != "none" {
- compressedPath, err = compressFile(tarfile.Name(), compress)
- if err != nil {
- return nil, err
- }
+ // If there is no compression, then calculate sha256 on tarfile
+ if usingCompression {
+ writer = tarfileProgressWriter
} else {
+ writer = io.MultiWriter(tarfileProgressWriter, sha256)
compressedPath = tarfile.Name()
}
- defer os.Remove(compressedPath)
- sha256 := sha256.New()
- tarf, err := os.Open(compressedPath)
- if err != nil {
+ if err := c.Export(writer, req.Properties); err != nil {
+ tarfile.Close()
return nil, err
}
+ tarfile.Close()
- info.Size, err = io.Copy(sha256, tarf)
- tarf.Close()
+ if usingCompression {
+ tarfile, err = os.Open(tarfile.Name())
+ if err != nil {
+ return nil, err
+ }
+ defer tarfile.Close()
+ fi, err := tarfile.Stat()
+ if err != nil {
+ return nil, err
+ }
+ // Track progress writing gzipped file
+ metadata = make(map[string]string)
+ tarfileProgressReader := &ioprogress.ProgressReader{
+ ReadCloser: tarfile,
+ Tracker: &ioprogress.ProgressTracker{
+ Length: fi.Size(),
+ Handler: func(percent, speed int64) {
+ metadata["stage"] = "create_image_from_container_compress"
+ metadata["percent"] = strconv.FormatInt(percent, 10)
+ metadata["speed"] = strconv.FormatInt(speed, 10)
+ op.UpdateMetadata(metadata)
+ },
+ },
+ }
+ compressedPath = tarfile.Name() + ".compressed"
+ compressed, err := os.Create(compressedPath)
+ if err != nil {
+ return nil, err
+ }
+ defer compressed.Close()
+ defer os.Remove(compressed.Name())
+ // Calculate sha256 as we compress
+ writer := io.MultiWriter(compressed, sha256)
+ err = compressFile(compress, tarfileProgressReader, writer)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ fi, err := os.Stat(compressedPath)
if err != nil {
return nil, err
}
-
+ info.Size = fi.Size()
info.Fingerprint = fmt.Sprintf("%x", sha256.Sum(nil))
_, _, err = d.cluster.ImageGet(project, info.Fingerprint, false, true)
@@ -714,7 +758,7 @@ func imagesPost(d *Daemon, r *http.Request) Response {
} else {
/* Processing image creation from container */
imagePublishLock.Lock()
- info, err = imgPostContInfo(d, r, req, builddir)
+ info, err = imgPostContInfo(d, r, req, op, builddir)
imagePublishLock.Unlock()
}
}
diff --git a/lxd/patches.go b/lxd/patches.go
index 235410e66c..ba556c24d2 100644
--- a/lxd/patches.go
+++ b/lxd/patches.go
@@ -3154,7 +3154,17 @@ func patchMoveBackups(name string, d *Daemon) error {
}
// Compress it
- compressedPath, err := compressFile(backupPath, "xz")
+ infile, err := os.Open(backupPath)
+ if err != nil {
+ return err
+ }
+ defer infile.Close()
+ compressed, err := os.Create(backupPath + ".compressed")
+ if err != nil {
+ return err
+ }
+ defer compressed.Close()
+ err = compressFile("xz", infile, compressed)
if err != nil {
return err
}
@@ -3164,7 +3174,7 @@ func patchMoveBackups(name string, d *Daemon) error {
return err
}
- err = os.Rename(compressedPath, backupPath)
+ err = os.Rename(compressed.Name(), backupPath)
if err != nil {
return err
}
diff --git a/lxd/storage.go b/lxd/storage.go
index 3b2dca1ddc..11d9306a2f 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -237,6 +237,9 @@ type storage interface {
StorageMigrationSource(args MigrationSourceArgs) (MigrationStorageSourceDriver, error)
StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error
+
+ // For tracking long operations such as unpacking an image.
+ SetProgressTracker(tracker *ioprogress.ProgressTracker)
}
func storageCoreInit(driver string) (storage, error) {
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index 7c423976c3..451c2ee73c 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -2057,7 +2057,7 @@ func (s *storageBtrfs) ImageCreate(fingerprint string) error {
// Unpack the image in imageMntPoint.
imagePath := shared.VarPath("images", fingerprint)
- err = unpackImage(imagePath, tmpImageSubvolumeName, storageTypeBtrfs, s.s.OS.RunningInUserNS)
+ err = unpackImage(imagePath, tmpImageSubvolumeName, storageTypeBtrfs, s.s.OS.RunningInUserNS, s.tracker)
if err != nil {
return err
}
diff --git a/lxd/storage_ceph.go b/lxd/storage_ceph.go
index c141c26974..58cbe09baf 100644
--- a/lxd/storage_ceph.go
+++ b/lxd/storage_ceph.go
@@ -2189,7 +2189,7 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
// rsync contents into image
imagePath := shared.VarPath("images", fingerprint)
- err = unpackImage(imagePath, imageMntPoint, storageTypeCeph, s.s.OS.RunningInUserNS)
+ err = unpackImage(imagePath, imageMntPoint, storageTypeCeph, s.s.OS.RunningInUserNS, s.tracker)
if err != nil {
logger.Errorf(`Failed to unpack image for RBD storage volume for image "%s" on storage pool "%s": %s`, fingerprint, s.pool.Name, err)
diff --git a/lxd/storage_dir.go b/lxd/storage_dir.go
index dd7f7a3396..10ab2b98c2 100644
--- a/lxd/storage_dir.go
+++ b/lxd/storage_dir.go
@@ -542,7 +542,7 @@ func (s *storageDir) ContainerCreateFromImage(container container, imageFingerpr
}()
imagePath := shared.VarPath("images", imageFingerprint)
- err = unpackImage(imagePath, containerMntPoint, storageTypeDir, s.s.OS.RunningInUserNS)
+ err = unpackImage(imagePath, containerMntPoint, storageTypeDir, s.s.OS.RunningInUserNS, s.tracker)
if err != nil {
return errors.Wrap(err, "Unpack image")
}
diff --git a/lxd/storage_lvm.go b/lxd/storage_lvm.go
index d8022512f4..6071460470 100644
--- a/lxd/storage_lvm.go
+++ b/lxd/storage_lvm.go
@@ -1968,7 +1968,7 @@ func (s *storageLvm) ImageCreate(fingerprint string) error {
}
imagePath := shared.VarPath("images", fingerprint)
- err = unpackImage(imagePath, imageMntPoint, storageTypeLvm, s.s.OS.RunningInUserNS)
+ err = unpackImage(imagePath, imageMntPoint, storageTypeLvm, s.s.OS.RunningInUserNS, s.tracker)
if err != nil {
return err
}
diff --git a/lxd/storage_lvm_utils.go b/lxd/storage_lvm_utils.go
index d16175dbbf..f69d1f3597 100644
--- a/lxd/storage_lvm_utils.go
+++ b/lxd/storage_lvm_utils.go
@@ -502,7 +502,7 @@ func (s *storageLvm) containerCreateFromImageLv(c container, fp string) error {
imagePath := shared.VarPath("images", fp)
containerMntPoint := getContainerMountPoint(c.Project(), s.pool.Name, containerName)
- err = unpackImage(imagePath, containerMntPoint, storageTypeLvm, s.s.OS.RunningInUserNS)
+ err = unpackImage(imagePath, containerMntPoint, storageTypeLvm, s.s.OS.RunningInUserNS, s.tracker)
if err != nil {
logger.Errorf(`Failed to unpack image "%s" into non-thinpool LVM storage volume "%s" for container "%s" on storage pool "%s": %s`, imagePath, containerMntPoint, containerName, s.pool.Name, err)
return err
diff --git a/lxd/storage_shared.go b/lxd/storage_shared.go
index 74f8d19c2e..8cf351c234 100644
--- a/lxd/storage_shared.go
+++ b/lxd/storage_shared.go
@@ -7,6 +7,7 @@ import (
"github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/ioprogress"
"github.com/lxc/lxd/shared/logger"
"github.com/pkg/errors"
)
@@ -22,6 +23,8 @@ type storageShared struct {
pool *api.StoragePool
volume *api.StorageVolume
+
+ tracker *ioprogress.ProgressTracker
}
func (s *storageShared) GetStorageType() storageType {
@@ -36,6 +39,10 @@ func (s *storageShared) GetStorageTypeVersion() string {
return s.sTypeVersion
}
+func (s *storageShared) SetProgressTracker(tracker *ioprogress.ProgressTracker) {
+ s.tracker = tracker
+}
+
func (s *storageShared) shiftRootfs(c container, skipper func(dir string, absPath string, fi os.FileInfo) bool) error {
dpath := c.Path()
rpath := c.RootfsPath()
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index 76b347a9d4..0a66792d7c 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -2443,7 +2443,7 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
}
// Unpack the image into the temporary mountpoint.
- err = unpackImage(imagePath, tmpImageDir, storageTypeZfs, s.s.OS.RunningInUserNS)
+ err = unpackImage(imagePath, tmpImageDir, storageTypeZfs, s.s.OS.RunningInUserNS, s.tracker)
if err != nil {
return err
}
diff --git a/shared/archive_linux.go b/shared/archive_linux.go
index ca359763cf..da3c35a0d3 100644
--- a/shared/archive_linux.go
+++ b/shared/archive_linux.go
@@ -8,6 +8,7 @@ import (
"strings"
"syscall"
+ "github.com/lxc/lxd/shared/ioprogress"
"github.com/lxc/lxd/shared/logger"
)
@@ -54,7 +55,7 @@ func DetectCompressionFile(f io.ReadSeeker) ([]string, string, []string, error)
}
}
-func Unpack(file string, path string, blockBackend bool, runningInUserns bool) error {
+func Unpack(file string, path string, blockBackend bool, runningInUserns bool, tracker *ioprogress.ProgressTracker) error {
extractArgs, extension, _, err := DetectCompression(file)
if err != nil {
return err
@@ -62,6 +63,7 @@ func Unpack(file string, path string, blockBackend bool, runningInUserns bool) e
command := ""
args := []string{}
+ var reader io.Reader
if strings.HasPrefix(extension, ".tar") {
command = "tar"
if runningInUserns {
@@ -73,8 +75,28 @@ func Unpack(file string, path string, blockBackend bool, runningInUserns bool) e
}
args = append(args, "-C", path, "--numeric-owner", "--xattrs-include=*")
args = append(args, extractArgs...)
- args = append(args, file)
+ args = append(args, "-")
+ f, err := os.Open(file)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ reader = f
+ // Attach the ProgressTracker if supplied.
+ if tracker != nil {
+ fsinfo, err := f.Stat()
+ if err != nil {
+ return err
+ }
+ tracker.Length = fsinfo.Size()
+ reader = &ioprogress.ProgressReader{
+ ReadCloser: f,
+ Tracker: tracker,
+ }
+ }
} else if strings.HasPrefix(extension, ".squashfs") {
+ // unsquashfs does not support reading from stdin,
+ // so ProgressTracker is not possible.
command = "unsquashfs"
args = append(args, "-f", "-d", path, "-n")
@@ -91,7 +113,7 @@ func Unpack(file string, path string, blockBackend bool, runningInUserns bool) e
return fmt.Errorf("Unsupported image format: %s", extension)
}
- output, err := RunCommand(command, args...)
+ err = RunCommandWithFds(reader, nil, command, args...)
if err != nil {
// Check if we ran out of space
fs := syscall.Statfs_t{}
@@ -110,14 +132,9 @@ func Unpack(file string, path string, blockBackend bool, runningInUserns bool) e
}
}
- co := output
logger.Debugf("Unpacking failed")
- logger.Debugf(co)
-
- // Truncate the output to a single line for inclusion in the error
- // message. The first line isn't guaranteed to pinpoint the issue,
- // but it's better than nothing and better than a multi-line message.
- return fmt.Errorf("Unpack failed, %s. %s", err, strings.SplitN(co, "\n", 2)[0])
+ logger.Debugf(err.Error())
+ return fmt.Errorf("Unpack failed, %s.", err)
}
return nil
More information about the lxc-devel
mailing list