Skip to content

Commit

Permalink
tarfs: remount EROFS for existing tarfs instances on startup
Browse files Browse the repository at this point in the history
On startup, we need to recover information for all tarfs related
snapshots, and remount EROFS filesystems.

Signed-off-by: Jiang Liu <[email protected]>
  • Loading branch information
jiangliu committed Sep 19, 2023
1 parent 64fcc6b commit c001e9e
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
err = errors.Wrapf(err, "mount file system by daemon %s, snapshot %s", d.ID(), snapshotID)
}
case config.FsDriverBlockdev:
err = fs.tarfsMgr.MountTarErofs(snapshotID, s, labels, rafs)
err = fs.tarfsMgr.MountErofs(snapshotID, s, labels, rafs)
if err != nil {
err = errors.Wrapf(err, "mount tarfs for snapshot %s", snapshotID)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
NydusRefLayer = "containerd.io/snapshot/nydus-ref"
// The blobID of associated layer, also marking the layer as a nydus tarfs, set by the snapshotter
NydusTarfsLayer = "containerd.io/snapshot/nydus-tarfs"
// List of parent snapshot IDs, saved in `Rafs.Annotation`.
NydusTarfsParents = "containerd.io/snapshot/nydus-tarfs-parent-snapshot-list"
// Dm-verity information for image block device
NydusImageBlockInfo = "containerd.io/snapshot/nydus-image-block"
// Dm-verity information for layer block device
Expand Down
133 changes: 127 additions & 6 deletions pkg/tarfs/tarfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
)

type Manager struct {
RemountMap map[string]*rafs.Rafs // Scratch space to store rafs instances needing remount on startup
snapshotMap map[string]*snapshotStatus // tarfs snapshots status, indexed by snapshot ID
mutex sync.Mutex
mutexLoopDev sync.Mutex
Expand Down Expand Up @@ -87,6 +88,7 @@ type snapshotStatus struct {
func NewManager(insecure, checkTarfsHint bool, cacheDirPath, nydusImagePath string, maxConcurrentProcess int64) *Manager {
return &Manager{
snapshotMap: map[string]*snapshotStatus{},
RemountMap: map[string]*rafs.Rafs{},
cacheDirPath: cacheDirPath,
nydusImagePath: nydusImagePath,
insecure: insecure,
Expand Down Expand Up @@ -363,10 +365,9 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string,
err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath)
if err != nil && !errdefs.IsAlreadyExists(err) {
return epilog(err, "generate tarfs data from image layer blob")
} else {
msg := fmt.Sprintf("Nydus tarfs for snapshot %s is ready", snapshotID)
return epilog(nil, msg)
}
msg := fmt.Sprintf("Nydus tarfs for snapshot %s is ready", snapshotID)
return epilog(nil, msg)
}
}

Expand All @@ -389,7 +390,9 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string,
} else {
// Download and convert layer content in background.
// Will retry when the content is actually needed if the background process failed.
go process(rc, remote)
go func() {
_ = process(rc, remote)
}()
}

return err
Expand Down Expand Up @@ -418,7 +421,7 @@ func (t *Manager) retryPrepareLayer(snapshotID, upperDirPath string, labels map[
case TarfsStatusPrepare:
log.L.Infof("Another thread is retrying snapshot %s, wait for the result", snapshotID)
st.mutex.Unlock()
st, err = t.waitLayerReady(snapshotID, false)
_, err = t.waitLayerReady(snapshotID, false)
return err
case TarfsStatusReady:
log.L.Infof("Another thread has retried snapshot %s and succeed", snapshotID)
Expand Down Expand Up @@ -622,7 +625,7 @@ func (t *Manager) ExportBlockData(s storage.Snapshot, perLayer bool, labels map[
return updateFields, nil
}

func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels map[string]string, rafs *rafs.Rafs) error {
func (t *Manager) MountErofs(snapshotID string, s *storage.Snapshot, labels map[string]string, rafs *rafs.Rafs) error {
if s == nil {
return errors.New("snapshot object for MountTarErofs() is nil")
}
Expand All @@ -643,6 +646,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m
}

var devices []string
var parents []string
// When merging bootstrap, we need to arrange layer bootstrap in order from low to high
for idx := len(s.ParentIDs) - 1; idx >= 0; idx-- {
snapshotID := s.ParentIDs[idx]
Expand All @@ -663,10 +667,89 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m
st.dataLoopdev = loopdev
}
devices = append(devices, "device="+st.dataLoopdev.Name())
parents = append(parents, snapshotID)
}

st.mutex.Unlock()
}
parentList := strings.Join(parents, ",")
devices = append(devices, "ro")
mountOpts := strings.Join(devices, ",")

st, err := t.getSnapshotStatus(snapshotID, true)
if err != nil {
return err
}
defer st.mutex.Unlock()

mountPoint := path.Join(rafs.GetSnapshotDir(), "mnt")
if len(st.erofsMountPoint) > 0 {
if st.erofsMountPoint == mountPoint {
log.L.Debugf("tarfs for snapshot %s has already been mounted at %s", snapshotID, mountPoint)
return nil
}
return errors.Errorf("tarfs for snapshot %s has already been mounted at %s", snapshotID, st.erofsMountPoint)
}

if st.metaLoopdev == nil {
loopdev, err := t.attachLoopdev(mergedBootstrap)
if err != nil {
return errors.Wrapf(err, "attach merged bootstrap %s to loopdev", mergedBootstrap)
}
st.metaLoopdev = loopdev
}
devName := st.metaLoopdev.Name()

if err = os.MkdirAll(mountPoint, 0750); err != nil {
return errors.Wrapf(err, "create tarfs mount dir %s", mountPoint)
}

err = unix.Mount(devName, mountPoint, "erofs", 0, mountOpts)
if err != nil {
return errors.Wrapf(err, "mount erofs at %s with opts %s", mountPoint, mountOpts)
}
st.erofsMountPoint = mountPoint
rafs.SetMountpoint(mountPoint)
rafs.AddAnnotation(label.NydusTarfsParents, parentList)
return nil
}

func (t *Manager) RemountErofs(snapshotID string, rafs *rafs.Rafs) error {
upperDirPath := path.Join(rafs.GetSnapshotDir(), "fs")

log.L.Infof("remount EROFS for tarfs snapshot %s at %s", snapshotID, upperDirPath)
var parents []string
if parentList, ok := rafs.Annotations[label.NydusTarfsParents]; ok {
parents = strings.Split(parentList, ",")
} else {
if !config.GetTarfsMountOnHost() {
rafs.SetMountpoint(upperDirPath)
}
return nil
}

var devices []string
for idx := 0; idx < len(parents); idx++ {
snapshotID := parents[idx]
st, err := t.waitLayerReady(snapshotID, true)
if err != nil {
return errors.Wrapf(err, "wait for tarfs conversion task")
}

if st.dataLoopdev == nil {
blobTarFilePath := t.layerTarFilePath(st.blobID)
loopdev, err := t.attachLoopdev(blobTarFilePath)
if err != nil {
st.mutex.Unlock()
return errors.Wrapf(err, "attach layer tar file %s to loopdev", blobTarFilePath)
}
st.dataLoopdev = loopdev
}
devices = append(devices, "device="+st.dataLoopdev.Name())

st.mutex.Unlock()
}
devices = append(devices, "ro")
mountOpts := strings.Join(devices, ",")

st, err := t.getSnapshotStatus(snapshotID, true)
Expand All @@ -685,6 +768,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m
}

if st.metaLoopdev == nil {
mergedBootstrap := t.imageMetaFilePath(upperDirPath)
loopdev, err := t.attachLoopdev(mergedBootstrap)
if err != nil {
return errors.Wrapf(err, "attach merged bootstrap %s to loopdev", mergedBootstrap)
Expand Down Expand Up @@ -767,6 +851,43 @@ func (t *Manager) DetachLayer(snapshotID string) error {
return nil
}

func (t *Manager) RecoverSnapshoInfo(ctx context.Context, id string, info snapshots.Info, upperPath string) error {
t.mutex.Lock()
defer t.mutex.Unlock()
log.L.Infof("recover tarfs snapshot %s with path %s", id, upperPath)

if _, ok := t.snapshotMap[id]; ok {
// RecoverSnapshotInfo() is called after RecoverRafsInstance(), so there may be some snapshots already exist.
return nil
}

layerMetaFilePath := t.layerMetaFilePath(upperPath)
if _, err := os.Stat(layerMetaFilePath); err == nil {
layerDigest := digest.Digest(info.Labels[label.CRILayerDigest])
if layerDigest.Validate() != nil {
return errors.Errorf("not found layer digest label")
}
ctx, cancel := context.WithCancel(context.Background())
t.snapshotMap[id] = &snapshotStatus{
status: TarfsStatusReady,
blobID: layerDigest.Hex(),
cancel: cancel,
ctx: ctx,
}
} else {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
t.snapshotMap[id] = &snapshotStatus{
status: TarfsStatusFailed,
wg: wg,
cancel: cancel,
ctx: ctx,
}
}
return nil
}

// This method is called in single threaded mode during startup, so we do not lock `snapshotStatus` objects.
func (t *Manager) RecoverRafsInstance(r *rafs.Rafs) error {
t.mutex.Lock()
Expand Down
2 changes: 1 addition & 1 deletion snapshot/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func chooseProcessor(ctx context.Context, logger *logrus.Entry,
// which have already been prepared by overlay snapshotter

logger.Infof("Prepare active snapshot %s in Nydus tarfs mode", key)
err = sn.mergeTarfs(ctx, s, parent, pID, pInfo)
err = sn.mergeTarfs(ctx, s, parent, pInfo)
if err != nil {
return nil, "", errors.Wrapf(err, "merge tarfs layers for snapshot %s", pID)
}
Expand Down
46 changes: 41 additions & 5 deletions snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/metrics"
"github.com/containerd/nydus-snapshotter/pkg/metrics/collector"
"github.com/containerd/nydus-snapshotter/pkg/pprof"
"github.com/containerd/nydus-snapshotter/pkg/rafs"
"github.com/containerd/nydus-snapshotter/pkg/referrer"
"github.com/containerd/nydus-snapshotter/pkg/system"
"github.com/containerd/nydus-snapshotter/pkg/tarfs"
Expand Down Expand Up @@ -225,8 +226,9 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho
opts = append(opts, filesystem.WithReferrerManager(referrerMgr))
}

var tarfsMgr *tarfs.Manager
if cfg.Experimental.TarfsConfig.EnableTarfs {
tarfsMgr := tarfs.NewManager(skipSSLVerify, cfg.Experimental.TarfsConfig.TarfsHint,
tarfsMgr = tarfs.NewManager(skipSSLVerify, cfg.Experimental.TarfsConfig.TarfsHint,
cacheConfig.CacheDir, cfg.DaemonConfig.NydusImagePath,
int64(cfg.Experimental.TarfsConfig.MaxConcurrentProc))
opts = append(opts, filesystem.WithTarfsManager(tarfsMgr))
Expand Down Expand Up @@ -284,7 +286,7 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho
syncRemove = true
}

return &snapshotter{
snapshotter := &snapshotter{
root: cfg.Root,
nydusdPath: cfg.DaemonConfig.NydusdPath,
ms: ms,
Expand All @@ -294,7 +296,41 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho
enableNydusOverlayFS: cfg.SnapshotsConfig.EnableNydusOverlayFS,
enableKataVolume: cfg.SnapshotsConfig.EnableKataVolume,
cleanupOnClose: cfg.CleanupOnClose,
}, nil
}

// There's special requirement to recover tarfs instance because it depdens on `snapshotter.ms`
// so it can't be done in `Filesystem.Recover()`
if tarfsMgr != nil {
snapshotter.recoverTarfs(ctx, tarfsMgr)
}

return snapshotter, nil
}

func (o *snapshotter) recoverTarfs(ctx context.Context, tarfsMgr *tarfs.Manager) {
// First recover all snapshot information related to tarfs, mount operation depends on snapshots.
_ = o.Walk(ctx, func(ctx context.Context, i snapshots.Info) error {
if _, ok := i.Labels[label.NydusTarfsLayer]; ok {
id, _, _, err := snapshot.GetSnapshotInfo(ctx, o.ms, i.Name)
if err != nil {
return errors.Wrapf(err, "get id for snapshot %s", i.Name)
}
log.L.Infof("found tarfs snapshot %s with name %s", id, i.Name)
upperPath := o.upperPath(id)
if err = tarfsMgr.RecoverSnapshoInfo(ctx, id, i, upperPath); err != nil {
return errors.Wrapf(err, "get id for snapshot %s", i.Name)
}
}
return nil
})

for id, r := range tarfsMgr.RemountMap {
log.L.Infof("remount tarfs snapshot %s", id)
if err := tarfsMgr.RemountErofs(id, r); err != nil {
log.L.Warnf("failed to remount EROFS filesystem for tarfs, %s", err)
}
}
tarfsMgr.RemountMap = map[string]*rafs.Rafs{}
}

func (o *snapshotter) Cleanup(ctx context.Context) error {
Expand Down Expand Up @@ -519,7 +555,7 @@ func (o *snapshotter) View(ctx context.Context, key, parent string, opts ...snap

if o.fs.TarfsEnabled() && label.IsTarfsDataLayer(pInfo.Labels) {
log.L.Infof("Prepare view snapshot %s in Nydus tarfs mode", pID)
err = o.mergeTarfs(ctx, s, parent, pID, pInfo)
err = o.mergeTarfs(ctx, s, parent, pInfo)
if err != nil {
return nil, errors.Wrapf(err, "merge tarfs layers for snapshot %s", pID)
}
Expand Down Expand Up @@ -796,7 +832,7 @@ func (o *snapshotter) createSnapshot(ctx context.Context, kind snapshots.Kind, k
return &base, s, nil
}

func (o *snapshotter) mergeTarfs(ctx context.Context, s storage.Snapshot, parent, pID string, pInfo snapshots.Info) error {
func (o *snapshotter) mergeTarfs(ctx context.Context, s storage.Snapshot, parent string, pInfo snapshots.Info) error {
infoGetter := func(ctx context.Context, id string) (string, snapshots.Info, error) {
for {
id2, info, _, err := snapshot.GetSnapshotInfo(ctx, o.ms, parent)
Expand Down

0 comments on commit c001e9e

Please sign in to comment.