Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunk-dedup flag to support local cas chunk deduplication. #519

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ type CgroupConfig struct {

// Configure how to start and recover nydusd daemons
type DaemonConfig struct {
NydusdPath string `toml:"nydusd_path"`
NydusdConfigPath string `toml:"nydusd_config"`
NydusImagePath string `toml:"nydusimage_path"`
RecoverPolicy string `toml:"recover_policy"`
FsDriver string `toml:"fs_driver"`
ThreadsNumber int `toml:"threads_number"`
LogRotationSize int `toml:"log_rotation_size"`
NydusdPath string `toml:"nydusd_path"`
NydusdConfigPath string `toml:"nydusd_config"`
NydusImagePath string `toml:"nydusimage_path"`
RecoverPolicy string `toml:"recover_policy"`
FsDriver string `toml:"fs_driver"`
ThreadsNumber int `toml:"threads_number"`
LogRotationSize int `toml:"log_rotation_size"`
EnableChunkDeduplication bool `toml:"enable_chunk_deduplication"`
}

type LoggingConfig struct {
Expand Down Expand Up @@ -326,6 +327,10 @@ func ParseParameters(args *flags.Args, cfg *SnapshotterConfig) error {
daemonConfig.FsDriver = args.FsDriver
}

if args.EnableChunkDeduplicationCount > 0 {
daemonConfig.EnableChunkDeduplication = args.EnableChunkDeduplication
}

// --- cache manager configuration
// empty

Expand Down
8 changes: 7 additions & 1 deletion config/daemonconfig/daemonconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,21 @@ type DaemonConfig interface {
}

// Daemon configurations factory
func NewDaemonConfig(fsDriver, path string) (DaemonConfig, error) {
func NewDaemonConfig(fsDriver, path string, enableDeduplication bool) (DaemonConfig, error) {
switch fsDriver {
case config.FsDriverFscache:
cfg, err := LoadFscacheConfig(path)
if err != nil {
return nil, err
}
cfg.Config.DeduplicationConfig.Enable = enableDeduplication
return cfg, nil
case config.FsDriverFusedev:
cfg, err := LoadFuseConfig(path)
if err != nil {
return nil, err
}
cfg.Device.Deduplication.Enable = enableDeduplication
return cfg, nil
default:
return nil, errors.Errorf("unsupported, fs driver %q", fsDriver)
Expand Down Expand Up @@ -118,6 +120,10 @@ type DeviceConfig struct {
DisableIndexedMap bool `json:"disable_indexed_map"`
} `json:"config"`
} `json:"cache"`
Deduplication struct {
Enable bool `json:"enable"`
WorkDir string `json:"work_dir"`
Copy link
Member

@sctb512 sctb512 Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this WorkDir option redundant? Otherwise, it should be exported as well to the configuration file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this WorkDir option redundant? Otherwise, it should be exported as well to the configuration file.

Nydusd needs WorkDir to specify the location of the CAS database. "Exported to the configuration file" means to dump WorkDir into the config file passed to nydusd?

} `json:"deduplication"`
}

// For nydusd as FUSE daemon. Serialize Daemon info and persist to a json file
Expand Down
4 changes: 4 additions & 0 deletions config/daemonconfig/fscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type FscacheDaemonConfig struct {
CacheConfig struct {
WorkDir string `json:"work_dir"`
} `json:"cache_config"`
DeduplicationConfig struct {
Enable bool `json:"enable"`
WorkDir string `json:"work_dir"`
} `json:"deduplication_config"`
BlobPrefetchConfig BlobPrefetchConfig `json:"prefetch_config"`
MetadataPath string `json:"metadata_path"`
} `json:"config"`
Expand Down
4 changes: 4 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func GetDaemonProfileCPUDuration() int64 {
return globalConfig.origin.SystemControllerConfig.DebugConfig.ProfileDuration
}

func GetEnableChunkDeduplication() bool {
return globalConfig.origin.DaemonConfig.EnableChunkDeduplication
}

func ProcessConfigurations(c *SnapshotterConfig) error {
if c.LoggingConfig.LogDir == "" {
c.LoggingConfig.LogDir = filepath.Join(c.Root, logging.DefaultLogDirName)
Expand Down
33 changes: 21 additions & 12 deletions internal/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@ import (
)

type Args struct {
Address string
NydusdConfigPath string
SnapshotterConfigPath string
RootDir string
NydusdPath string
NydusImagePath string
DaemonMode string
FsDriver string
LogLevel string
LogToStdout bool
LogToStdoutCount int
PrintVersion bool
Address string
NydusdConfigPath string
SnapshotterConfigPath string
RootDir string
NydusdPath string
NydusImagePath string
DaemonMode string
FsDriver string
LogLevel string
LogToStdout bool
LogToStdoutCount int
PrintVersion bool
EnableChunkDeduplication bool
EnableChunkDeduplicationCount int
}

type Flags struct {
Expand Down Expand Up @@ -97,6 +99,13 @@ func buildFlags(args *Args) []cli.Flag {
Usage: "print version and build information",
Destination: &args.PrintVersion,
},
&cli.BoolFlag{
Name: "chunk-deduplication",
Value: false,
Usage: "(experiment) whether to enable local cas chunk deduplication",
Destination: &args.EnableChunkDeduplication,
Count: &args.EnableChunkDeduplicationCount,
},
}
}

Expand Down
25 changes: 23 additions & 2 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,14 @@ func (d *Daemon) sharedFusedevMount(rafs *Rafs) error {
return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID)
}

configPath := d.ConfigFile(rafs.SnapshotID)
enableDeduplication := config.GetEnableChunkDeduplication()
bootstrap, err := rafs.BootstrapFile()
if err != nil {
return err
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID), enableDeduplication)
if err != nil {
return errors.Wrapf(err, "Failed to reload instance configuration %s",
d.ConfigFile(rafs.SnapshotID))
Expand All @@ -260,6 +262,15 @@ func (d *Daemon) sharedFusedevMount(rafs *Rafs) error {
return errors.Wrap(err, "dump instance configuration")
}

// exec static deduplication for bootstrap
if enableDeduplication {
log.L.Debugln("Enable chunk decuplication, sharedFusedevMount, cfg = ", cfg)
bootstrap, err = rafs.DeduplicateBootstrap(bootstrap, configPath)
if err != nil {
return errors.Wrapf(err, "failed to dedup rafs bootstrap")
}
}

err = client.Mount(rafs.RelaMountpoint(), bootstrap, cfg)
if err != nil {
return errors.Wrapf(err, "mount rafs instance")
Expand All @@ -279,7 +290,9 @@ func (d *Daemon) sharedErofsMount(rafs *Rafs) error {
return errors.Wrapf(err, "failed to create fscache work dir %s", rafs.FscacheWorkDir())
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
configPath := d.ConfigFile(rafs.SnapshotID)
enableDeduplication := config.GetEnableChunkDeduplication()
c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, configPath, enableDeduplication)
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(rafs.SnapshotID), err)
return err
Expand Down Expand Up @@ -309,6 +322,14 @@ func (d *Daemon) sharedErofsMount(rafs *Rafs) error {
rafs.AddAnnotation(AnnoFsCacheDomainID, cfg.DomainID)
rafs.AddAnnotation(AnnoFsCacheID, fscacheID)

if enableDeduplication {
log.L.Debugln("Enable chunk deduplication, sharedErofsMount, cfg = ", cfg)
bootstrapPath, err = rafs.DeduplicateBootstrap(bootstrapPath, configPath)
if err != nil {
return errors.Wrapf(err, "dedup rafs bootstrap")
}
}

if err := erofs.Mount(bootstrapPath, cfg.DomainID, fscacheID, mountPoint); err != nil {
if !errdefs.IsErofsMounted(err) {
return errors.Wrapf(err, "mount erofs to %s", mountPoint)
Expand Down
38 changes: 38 additions & 0 deletions pkg/daemon/rafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ package daemon

import (
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"sync"

"github.com/mohae/deepcopy"
"github.com/pkg/errors"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"

"github.com/containerd/nydus-snapshotter/config"
)
Expand Down Expand Up @@ -195,3 +198,38 @@ func (r *Rafs) BootstrapFile() (string, error) {

return "", errors.Wrapf(errdefs.ErrNotFound, "bootstrap %s", bootstrap)
}

func buildDeduplicationCommand(bootstrapPath, configPath, nydusImagePath string) *exec.Cmd {
args := []string{
"dedup",
"--bootstrap", bootstrapPath,
"--config", configPath,
}

log.L.Infof("start bootstrap deduplication: %s %s", nydusImagePath, strings.Join(args, " "))

cmd := exec.Command(nydusImagePath, args...)

return cmd

}

func (r *Rafs) DeduplicateBootstrap(bootstrapPath, configPath string) (string, error) {
nydusImagePath, err := exec.LookPath("nydus-image")
if err != nil {
return "", err
}

cmd := buildDeduplicationCommand(bootstrapPath, configPath, nydusImagePath)
_, err = cmd.CombinedOutput()
if err != nil {
return "", err
}

_, err = os.Stat(bootstrapPath + ".dedup")
if err != nil {
return "", err
}
bootstrapPath += ".dedup"
xwb1136021767 marked this conversation as resolved.
Show resolved Hide resolved
return bootstrapPath, err
}
26 changes: 26 additions & 0 deletions pkg/daemon/rafs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package daemon

import (
"fmt"
"strings"
"testing"
)

func TestBuildDeduplicationCommand(t *testing.T) {
bootstrapPath := "/path/to/bootstrap"
configPath := "/path/to/config.json"
nydusImagePath := "/path/to/nydus-image"

cmd := buildDeduplicationCommand(bootstrapPath, configPath, nydusImagePath)

expectedArgs := []string{
"dedup",
"--bootstrap", bootstrapPath,
"--config", configPath,
}
expectedCmd := fmt.Sprintf("%s %s", nydusImagePath, strings.Join(expectedArgs, " "))

if expectedCmd != cmd.String() {
t.Errorf("unexpected command string '%s'", cmd.String())
}
}
20 changes: 19 additions & 1 deletion pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
return errors.Wrapf(err, "create command for daemon %s", d.ID())
}

// not deduplicate bootstrap for shared daemon mode
if config.GetEnableChunkDeduplication() && !d.IsSharedDaemon() {
rafs := d.Instances.Head()
if rafs == nil {
return errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}
configPath := d.ConfigFile("")
bootstrap, _ := rafs.BootstrapFile()
_, err = rafs.DeduplicateBootstrap(bootstrap, configPath)
if err != nil {
return errors.Errorf("fail to deduplicate bootstrap")
}
}

if err := cmd.Start(); err != nil {
return err
}
Expand Down Expand Up @@ -118,7 +132,7 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
var cmdOpts []command.Opt

nydusdThreadNum := d.NydusdThreadNum()

enableDeduplication := config.GetEnableChunkDeduplication()
if d.States.FsDriver == config.FsDriverFscache {
cmdOpts = append(cmdOpts,
command.WithMode("singleton"),
Expand All @@ -145,6 +159,10 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
}

if enableDeduplication {
bootstrap += ".dedup"
}

cmdOpts = append(cmdOpts,
command.WithConfig(d.ConfigFile("")),
command.WithBootstrap(bootstrap),
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (m *Manager) Recover(ctx context.Context,
d.States = *s

m.daemonStates.RecoverDaemonState(d)

enableDeduplication := config.GetEnableChunkDeduplication()
if m.SupervisorSet != nil {
su := m.SupervisorSet.NewSupervisor(d.ID())
if su == nil {
Expand All @@ -486,7 +486,7 @@ func (m *Manager) Recover(ctx context.Context,
}

if d.States.FsDriver == config.FsDriverFusedev {
cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(""))
cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(""), enableDeduplication)
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(""), err)
return err
Expand Down
12 changes: 10 additions & 2 deletions snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho
return nil, errors.Wrap(err, "initialize image verifier")
}

daemonConfig, err := daemonconfig.NewDaemonConfig(config.GetFsDriver(), cfg.DaemonConfig.NydusdConfigPath)
daemonConfig, err := daemonconfig.NewDaemonConfig(config.GetFsDriver(), cfg.DaemonConfig.NydusdConfigPath, cfg.DaemonConfig.EnableChunkDeduplication)
if err != nil {
return nil, errors.Wrap(err, "load daemon configuration")
}
Expand Down Expand Up @@ -781,8 +781,9 @@ func (o *snapshotter) remoteMountWithExtraOptions(ctx context.Context, s storage
}

var c daemonconfig.DaemonConfig
enableDeduplication := config.GetEnableChunkDeduplication()
if daemon.IsSharedDaemon() {
c, err = daemonconfig.NewDaemonConfig(daemon.States.FsDriver, daemon.ConfigFile(instance.SnapshotID))
c, err = daemonconfig.NewDaemonConfig(daemon.States.FsDriver, daemon.ConfigFile(instance.SnapshotID), enableDeduplication)
if err != nil {
return nil, errors.Wrapf(err, "Failed to load instance configuration %s",
daemon.ConfigFile(instance.SnapshotID))
Expand Down Expand Up @@ -811,6 +812,13 @@ func (o *snapshotter) remoteMountWithExtraOptions(ctx context.Context, s storage
return nil, errors.Wrapf(err, "remoteMounts: failed to detect filesystem version")
}

if enableDeduplication {
configPath := daemon.ConfigFile(instance.SnapshotID)
source, err = instance.DeduplicateBootstrap(source, configPath)
if err != nil {
return nil, errors.Wrapf(err, "remoteMounts: failed to dedup rafs bootstrap")
}
}
// when enable nydus-overlayfs, return unified mount slice for runc and kata
extraOption := &ExtraOption{
Source: source,
Expand Down
Loading