Skip to content

Commit

Permalink
add chunk-dedup flag to support local cas chunk deduplication.
Browse files Browse the repository at this point in the history
Signed-off-by: xwb1136021767 <[email protected]>
  • Loading branch information
xwb1136021767 committed Aug 16, 2023
1 parent a6f4457 commit eaf7963
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 27 deletions.
19 changes: 12 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ type CgroupConfig struct {

// Configure how to start and recover nydusd daemons
type DaemonConfig struct {
NydusdPath string `toml:"nydusd_path"`
NydusdConfigPath string `toml:"nydusd_config"`
NydusImagePath string `toml:"nydusimage_path"`
RecoverPolicy string `toml:"recover_policy"`
FsDriver string `toml:"fs_driver"`
ThreadsNumber int `toml:"threads_number"`
LogRotationSize int `toml:"log_rotation_size"`
NydusdPath string `toml:"nydusd_path"`
NydusdConfigPath string `toml:"nydusd_config"`
NydusImagePath string `toml:"nydusimage_path"`
RecoverPolicy string `toml:"recover_policy"`
FsDriver string `toml:"fs_driver"`
ThreadsNumber int `toml:"threads_number"`
LogRotationSize int `toml:"log_rotation_size"`
EnableChunkDeduplication bool `toml:"enable_chunk_deduplication"`
}

type LoggingConfig struct {
Expand Down Expand Up @@ -326,6 +327,10 @@ func ParseParameters(args *flags.Args, cfg *SnapshotterConfig) error {
daemonConfig.FsDriver = args.FsDriver
}

if args.EnableChunkDeduplicationCount > 0 {
daemonConfig.EnableChunkDeduplication = args.EnableChunkDeduplication
}

// --- cache manager configuration
// empty

Expand Down
8 changes: 7 additions & 1 deletion config/daemonconfig/daemonconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,21 @@ type DaemonConfig interface {
}

// Daemon configurations factory
func NewDaemonConfig(fsDriver, path string) (DaemonConfig, error) {
func NewDaemonConfig(fsDriver, path string, enableDeduplication bool) (DaemonConfig, error) {
switch fsDriver {
case config.FsDriverFscache:
cfg, err := LoadFscacheConfig(path)
if err != nil {
return nil, err
}
cfg.Config.DeduplicationConfig.Enable = enableDeduplication
return cfg, nil
case config.FsDriverFusedev:
cfg, err := LoadFuseConfig(path)
if err != nil {
return nil, err
}
cfg.Device.Deduplication.Enable = enableDeduplication
return cfg, nil
default:
return nil, errors.Errorf("unsupported, fs driver %q", fsDriver)
Expand Down Expand Up @@ -118,6 +120,10 @@ type DeviceConfig struct {
DisableIndexedMap bool `json:"disable_indexed_map"`
} `json:"config"`
} `json:"cache"`
Deduplication struct {
Enable bool `json:"enable"`
WorkDir string `json:"work_dir"`
} `json:"deduplication"`
}

// For nydusd as FUSE daemon. Serialize Daemon info and persist to a json file
Expand Down
4 changes: 4 additions & 0 deletions config/daemonconfig/fscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type FscacheDaemonConfig struct {
CacheConfig struct {
WorkDir string `json:"work_dir"`
} `json:"cache_config"`
DeduplicationConfig struct {
Enable bool `json:"enable"`
WorkDir string `json:"work_dir"`
} `json:"deduplication_config"`
BlobPrefetchConfig BlobPrefetchConfig `json:"prefetch_config"`
MetadataPath string `json:"metadata_path"`
} `json:"config"`
Expand Down
4 changes: 4 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func GetDaemonProfileCPUDuration() int64 {
return globalConfig.origin.SystemControllerConfig.DebugConfig.ProfileDuration
}

func GetEnableChunkDeduplication() bool {
return globalConfig.origin.DaemonConfig.EnableChunkDeduplication
}

func ProcessConfigurations(c *SnapshotterConfig) error {
if c.LoggingConfig.LogDir == "" {
c.LoggingConfig.LogDir = filepath.Join(c.Root, logging.DefaultLogDirName)
Expand Down
33 changes: 21 additions & 12 deletions internal/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@ import (
)

type Args struct {
Address string
NydusdConfigPath string
SnapshotterConfigPath string
RootDir string
NydusdPath string
NydusImagePath string
DaemonMode string
FsDriver string
LogLevel string
LogToStdout bool
LogToStdoutCount int
PrintVersion bool
Address string
NydusdConfigPath string
SnapshotterConfigPath string
RootDir string
NydusdPath string
NydusImagePath string
DaemonMode string
FsDriver string
LogLevel string
LogToStdout bool
LogToStdoutCount int
PrintVersion bool
EnableChunkDeduplication bool
EnableChunkDeduplicationCount int
}

type Flags struct {
Expand Down Expand Up @@ -97,6 +99,13 @@ func buildFlags(args *Args) []cli.Flag {
Usage: "print version and build information",
Destination: &args.PrintVersion,
},
&cli.BoolFlag{
Name: "chunk-deduplication",
Value: false,
Usage: "(experiment) whether to enable local cas chunk deduplication",
Destination: &args.EnableChunkDeduplication,
Count: &args.EnableChunkDeduplicationCount,
},
}
}

Expand Down
25 changes: 23 additions & 2 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,14 @@ func (d *Daemon) sharedFusedevMount(rafs *Rafs) error {
return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID)
}

configPath := d.ConfigFile(rafs.SnapshotID)
enableDeduplication := config.GetEnableChunkDeduplication()
bootstrap, err := rafs.BootstrapFile()
if err != nil {
return err
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID), enableDeduplication)
if err != nil {
return errors.Wrapf(err, "Failed to reload instance configuration %s",
d.ConfigFile(rafs.SnapshotID))
Expand All @@ -260,6 +262,15 @@ func (d *Daemon) sharedFusedevMount(rafs *Rafs) error {
return errors.Wrap(err, "dump instance configuration")
}

// exec static deduplication for bootstrap
if enableDeduplication {
log.L.Debugln("Enable chunk decuplication, sharedFusedevMount, cfg = ", cfg)
bootstrap, err = rafs.DeduplicateBootstrap(bootstrap, configPath)
if err != nil {
return errors.Wrapf(err, "failed to dedup rafs bootstrap")
}
}

err = client.Mount(rafs.RelaMountpoint(), bootstrap, cfg)
if err != nil {
return errors.Wrapf(err, "mount rafs instance")
Expand All @@ -279,7 +290,9 @@ func (d *Daemon) sharedErofsMount(rafs *Rafs) error {
return errors.Wrapf(err, "failed to create fscache work dir %s", rafs.FscacheWorkDir())
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
configPath := d.ConfigFile(rafs.SnapshotID)
enableDeduplication := config.GetEnableChunkDeduplication()
c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, configPath, enableDeduplication)
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(rafs.SnapshotID), err)
return err
Expand Down Expand Up @@ -309,6 +322,14 @@ func (d *Daemon) sharedErofsMount(rafs *Rafs) error {
rafs.AddAnnotation(AnnoFsCacheDomainID, cfg.DomainID)
rafs.AddAnnotation(AnnoFsCacheID, fscacheID)

if enableDeduplication {
log.L.Debugln("Enable chunk deduplication, sharedErofsMount, cfg = ", cfg)
bootstrapPath, err = rafs.DeduplicateBootstrap(bootstrapPath, configPath)
if err != nil {
return errors.Wrapf(err, "dedup rafs bootstrap")
}
}

if err := erofs.Mount(bootstrapPath, cfg.DomainID, fscacheID, mountPoint); err != nil {
if !errdefs.IsErofsMounted(err) {
return errors.Wrapf(err, "mount erofs to %s", mountPoint)
Expand Down
38 changes: 38 additions & 0 deletions pkg/daemon/rafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ package daemon

import (
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"sync"

"github.com/mohae/deepcopy"
"github.com/pkg/errors"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"

"github.com/containerd/nydus-snapshotter/config"
)
Expand Down Expand Up @@ -195,3 +198,38 @@ func (r *Rafs) BootstrapFile() (string, error) {

return "", errors.Wrapf(errdefs.ErrNotFound, "bootstrap %s", bootstrap)
}

func buildDeduplicationCommand(bootstrapPath, configPath, nydusImagePath string) *exec.Cmd {
args := []string{
"dedup",
"--bootstrap", bootstrapPath,
"--config", configPath,
}

log.L.Infof("start bootstrap deduplication: %s %s", nydusImagePath, strings.Join(args, " "))

cmd := exec.Command(nydusImagePath, args...)

return cmd

}

func (r *Rafs) DeduplicateBootstrap(bootstrapPath, configPath string) (string, error) {
nydusImagePath, err := exec.LookPath("nydus-image")
if err != nil {
return "", err
}

cmd := buildDeduplicationCommand(bootstrapPath, configPath, nydusImagePath)
_, err = cmd.CombinedOutput()
if err != nil {
return "", err
}

_, err = os.Stat(bootstrapPath + ".dedup")
if err != nil {
return "", err
}
bootstrapPath += ".dedup"
return bootstrapPath, err
}
26 changes: 26 additions & 0 deletions pkg/daemon/rafs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package daemon

import (
"fmt"
"strings"
"testing"
)

func TestBuildDeduplicationCommand(t *testing.T) {
bootstrapPath := "/path/to/bootstrap"
configPath := "/path/to/config.json"
nydusImagePath := "/path/to/nydus-image"

cmd := buildDeduplicationCommand(bootstrapPath, configPath, nydusImagePath)

expectedArgs := []string{
"dedup",
"--bootstrap", bootstrapPath,
"--config", configPath,
}
expectedCmd := fmt.Sprintf("%s %s", nydusImagePath, strings.Join(expectedArgs, " "))

if expectedCmd != cmd.String() {
t.Errorf("unexpected command string '%s'", cmd.String())
}
}
20 changes: 19 additions & 1 deletion pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
return errors.Wrapf(err, "create command for daemon %s", d.ID())
}

// not deduplicate bootstrap for shared daemon mode
if config.GetEnableChunkDeduplication() && !d.IsSharedDaemon() {
rafs := d.Instances.Head()
if rafs == nil {
return errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}
configPath := d.ConfigFile("")
bootstrap, _ := rafs.BootstrapFile()
_, err = rafs.DeduplicateBootstrap(bootstrap, configPath)
if err != nil {
return errors.Errorf("fail to deduplicate bootstrap")
}
}

if err := cmd.Start(); err != nil {
return err
}
Expand Down Expand Up @@ -118,7 +132,7 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
var cmdOpts []command.Opt

nydusdThreadNum := d.NydusdThreadNum()

enableDeduplication := config.GetEnableChunkDeduplication()
if d.States.FsDriver == config.FsDriverFscache {
cmdOpts = append(cmdOpts,
command.WithMode("singleton"),
Expand All @@ -145,6 +159,10 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
}

if enableDeduplication {
bootstrap += ".dedup"
}

cmdOpts = append(cmdOpts,
command.WithConfig(d.ConfigFile("")),
command.WithBootstrap(bootstrap),
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (m *Manager) Recover(ctx context.Context,
d.States = *s

m.daemonStates.RecoverDaemonState(d)

enableDeduplication := config.GetEnableChunkDeduplication()
if m.SupervisorSet != nil {
su := m.SupervisorSet.NewSupervisor(d.ID())
if su == nil {
Expand All @@ -486,7 +486,7 @@ func (m *Manager) Recover(ctx context.Context,
}

if d.States.FsDriver == config.FsDriverFusedev {
cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(""))
cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(""), enableDeduplication)
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(""), err)
return err
Expand Down
12 changes: 10 additions & 2 deletions snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho
return nil, errors.Wrap(err, "initialize image verifier")
}

daemonConfig, err := daemonconfig.NewDaemonConfig(config.GetFsDriver(), cfg.DaemonConfig.NydusdConfigPath)
daemonConfig, err := daemonconfig.NewDaemonConfig(config.GetFsDriver(), cfg.DaemonConfig.NydusdConfigPath, cfg.DaemonConfig.EnableChunkDeduplication)
if err != nil {
return nil, errors.Wrap(err, "load daemon configuration")
}
Expand Down Expand Up @@ -781,8 +781,9 @@ func (o *snapshotter) remoteMountWithExtraOptions(ctx context.Context, s storage
}

var c daemonconfig.DaemonConfig
enableDeduplication := config.GetEnableChunkDeduplication()
if daemon.IsSharedDaemon() {
c, err = daemonconfig.NewDaemonConfig(daemon.States.FsDriver, daemon.ConfigFile(instance.SnapshotID))
c, err = daemonconfig.NewDaemonConfig(daemon.States.FsDriver, daemon.ConfigFile(instance.SnapshotID), enableDeduplication)
if err != nil {
return nil, errors.Wrapf(err, "Failed to load instance configuration %s",
daemon.ConfigFile(instance.SnapshotID))
Expand Down Expand Up @@ -811,6 +812,13 @@ func (o *snapshotter) remoteMountWithExtraOptions(ctx context.Context, s storage
return nil, errors.Wrapf(err, "remoteMounts: failed to detect filesystem version")
}

if enableDeduplication {
configPath := daemon.ConfigFile(instance.SnapshotID)
source, err = instance.DeduplicateBootstrap(source, configPath)
if err != nil {
return nil, errors.Wrapf(err, "remoteMounts: failed to dedup rafs bootstrap")
}
}
// when enable nydus-overlayfs, return unified mount slice for runc and kata
extraOption := &ExtraOption{
Source: source,
Expand Down

0 comments on commit eaf7963

Please sign in to comment.