Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object disk backup #702

Merged
merged 12 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,10 @@ jobs:
QA_AWS_SECRET_KEY: ${{ secrets.QA_AWS_SECRET_KEY }}
QA_AWS_BUCKET: ${{ secrets.QA_AWS_BUCKET }}
QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }}

# need for GCP over S3
QA_GCS_OVER_S3_ACCESS_KEY: ${{ secrets.QA_GCS_OVER_S3_ACCESS_KEY }}
QA_GCS_OVER_S3_SECRET_KEY: ${{ secrets.QA_GCS_OVER_S3_SECRET_KEY }}
QA_GCS_OVER_S3_BUCKET: ${{ secrets.QA_GCS_OVER_S3_BUCKET }}
run: |
set -x
echo "CLICKHOUSE_VERSION=${CLICKHOUSE_VERSION}"
Expand Down
96 changes: 88 additions & 8 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/Altinity/clickhouse-backup/pkg/config"
"github.com/Altinity/clickhouse-backup/pkg/partition"
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/Altinity/clickhouse-backup/pkg/storage"
"github.com/Altinity/clickhouse-backup/pkg/storage/object_disk"
"os"
"path"
"path/filepath"
"strings"
"time"

Expand Down Expand Up @@ -102,16 +106,18 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st
return err
}

diskMap := map[string]string{}
diskMap := make(map[string]string, len(disks))
diskTypes := make(map[string]string, len(disks))
for _, disk := range disks {
diskMap[disk.Name] = disk.Path
diskTypes[disk.Name] = disk.Type
}
partitionsIdMap, partitionsNameList := partition.ConvertPartitionsToIdsMapAndNamesList(ctx, b.ch, tables, nil, partitions)
// create
if b.cfg.ClickHouse.UseEmbeddedBackupRestore {
err = b.createBackupEmbedded(ctx, backupName, tablePattern, partitionsNameList, partitionsIdMap, schemaOnly, rbacOnly, configsOnly, tables, allDatabases, allFunctions, disks, diskMap, log, startBackup, version)
err = b.createBackupEmbedded(ctx, backupName, tablePattern, partitionsNameList, partitionsIdMap, schemaOnly, rbacOnly, configsOnly, tables, allDatabases, allFunctions, disks, diskMap, diskTypes, log, startBackup, version)
} else {
err = b.createBackupLocal(ctx, backupName, partitionsIdMap, tables, doBackupData, schemaOnly, rbacOnly, configsOnly, version, disks, diskMap, allDatabases, allFunctions, log, startBackup)
err = b.createBackupLocal(ctx, backupName, partitionsIdMap, tables, doBackupData, schemaOnly, rbacOnly, configsOnly, version, disks, diskMap, diskTypes, allDatabases, allFunctions, log, startBackup)
}
if err != nil {
return err
Expand All @@ -124,7 +130,7 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st
return nil
}

func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, doBackupData bool, schemaOnly bool, rbacOnly bool, configsOnly bool, version string, disks []clickhouse.Disk, diskMap map[string]string, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, log *apexLog.Entry, startBackup time.Time) error {
func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, doBackupData bool, schemaOnly bool, rbacOnly bool, configsOnly bool, version string, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, log *apexLog.Entry, startBackup time.Time) error {
// Create backup dir on all clickhouse disks
for _, disk := range disks {
if err := filesystemhelper.Mkdir(path.Join(disk.Path, "backup"), b.ch, disks); err != nil {
Expand Down Expand Up @@ -236,14 +242,14 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par
}

backupMetaFile := path.Join(defaultPath, "backup", backupName, "metadata.json")
if err := b.createBackupMetadata(ctx, backupMetaFile, backupName, version, "regular", diskMap, disks, backupDataSize, backupMetadataSize, backupRBACSize, backupConfigSize, tableMetas, allDatabases, allFunctions, log); err != nil {
if err := b.createBackupMetadata(ctx, backupMetaFile, backupName, version, "regular", diskMap, diskTypes, disks, backupDataSize, backupMetadataSize, backupRBACSize, backupConfigSize, tableMetas, allDatabases, allFunctions, log); err != nil {
return err
}
log.WithField("duration", utils.HumanizeDuration(time.Since(startBackup))).Info("done")
return nil
}

func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, tablePattern string, partitionsNameList map[metadata.TableTitle][]string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, schemaOnly, rbacOnly, configsOnly bool, tables []clickhouse.Table, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, disks []clickhouse.Disk, diskMap map[string]string, log *apexLog.Entry, startBackup time.Time, backupVersion string) error {
func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, tablePattern string, partitionsNameList map[metadata.TableTitle][]string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, schemaOnly, rbacOnly, configsOnly bool, tables []clickhouse.Table, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, log *apexLog.Entry, startBackup time.Time, backupVersion string) error {
if _, isBackupDiskExists := diskMap[b.cfg.ClickHouse.EmbeddedBackupDisk]; !isBackupDiskExists {
return fmt.Errorf("backup disk `%s` not exists in system.disks", b.cfg.ClickHouse.EmbeddedBackupDisk)
}
Expand Down Expand Up @@ -358,7 +364,7 @@ func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, tablePa
}
}
backupMetaFile := path.Join(diskMap[b.cfg.ClickHouse.EmbeddedBackupDisk], backupName, "metadata.json")
if err := b.createBackupMetadata(ctx, backupMetaFile, backupName, backupVersion, "embedded", diskMap, disks, backupDataSize[0].Size, backupMetadataSize, 0, 0, tableMetas, allDatabases, allFunctions, log); err != nil {
if err := b.createBackupMetadata(ctx, backupMetaFile, backupName, backupVersion, "embedded", diskMap, diskTypes, disks, backupDataSize[0].Size, backupMetadataSize, 0, 0, tableMetas, allDatabases, allFunctions, log); err != nil {
return err
}

Expand Down Expand Up @@ -480,6 +486,7 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku
}
realSize := map[string]int64{}
disksToPartsMap := map[string][]metadata.Part{}

for _, disk := range diskList {
select {
case <-ctx.Done():
Expand All @@ -503,6 +510,26 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku
realSize[disk.Name] = size
disksToPartsMap[disk.Name] = parts
log.WithField("disk", disk.Name).Debug("shadow moved")
if disk.Type == "s3" || disk.Type == "azure_blob_storage" && len(parts) > 0 {
if err = config.ValidateObjectDiskConfig(b.cfg); err != nil {
return nil, nil, err
}
start := time.Now()
if b.dst == nil {
b.dst, err = storage.NewBackupDestination(ctx, b.cfg, b.ch, false, backupName)
if err != nil {
return nil, nil, err
}
if err := b.dst.Connect(ctx); err != nil {
return nil, nil, fmt.Errorf("can't connect to %s: %v", b.dst.Kind(), err)
}
}
if size, err = b.uploadObjectDiskParts(ctx, backupName, backupShadowPath, disk); err != nil {
return disksToPartsMap, realSize, err
}
realSize[disk.Name] += size
log.WithField("disk", disk.Name).WithField("duration", utils.HumanizeDuration(time.Since(start))).Info("object_disk data uploaded")
}
// Clean all the files under the shadowPath, cause UNFREEZE unavailable
if version < 21004000 {
if err := os.RemoveAll(shadowPath); err != nil {
Expand All @@ -517,18 +544,71 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku
return disksToPartsMap, realSize, err
}
}
if b.dst != nil {
if err := b.dst.Close(ctx); err != nil {
b.log.Warnf("uploadObjectDiskParts: can't close BackupDestination error: %v", err)
}
}
log.Debug("done")
return disksToPartsMap, realSize, nil
}

func (b *Backuper) createBackupMetadata(ctx context.Context, backupMetaFile, backupName, version, tags string, diskMap map[string]string, disks []clickhouse.Disk, backupDataSize, backupMetadataSize, backupRBACSize, backupConfigSize uint64, tableMetas []metadata.TableTitle, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, log *apexLog.Entry) error {
func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backupShadowPath string, disk clickhouse.Disk) (int64, error) {
var size int64
var err error
if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, disk.Name); err != nil {
return 0, err
}

if err := filepath.Walk(backupShadowPath, func(fPath string, fInfo os.FileInfo, err error) error {
if err != nil {
return err
}
if fInfo.IsDir() {
return nil
}
objPartFileMeta, err := object_disk.ReadMetadataFromFile(fPath)
if err != nil {
return err
}
var realSize, objSize int64
// @TODO think about parallelization here after test pass
for _, storageObject := range objPartFileMeta.StorageObjects {
srcDiskConnection, exists := object_disk.DisksConnections[disk.Name]
if !exists {
return fmt.Errorf("uploadObjectDiskParts: %s not present in object_disk.DisksConnections", disk.Name)
}
if objSize, err = b.dst.CopyObject(
ctx,
srcDiskConnection.GetRemoteBucket(),
path.Join(srcDiskConnection.GetRemotePath(), storageObject.ObjectRelativePath),
path.Join(backupName, disk.Name, storageObject.ObjectRelativePath),
); err != nil {
return err
}
realSize += objSize
}
if realSize > objPartFileMeta.TotalSize {
size += realSize
} else {
size += objPartFileMeta.TotalSize
}
return nil
}); err != nil {
return 0, err
}
return size, nil
}

func (b *Backuper) createBackupMetadata(ctx context.Context, backupMetaFile, backupName, version, tags string, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, backupDataSize, backupMetadataSize, backupRBACSize, backupConfigSize uint64, tableMetas []metadata.TableTitle, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, log *apexLog.Entry) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
backupMetadata := metadata.BackupMetadata{
BackupName: backupName,
Disks: diskMap,
DiskTypes: diskTypes,
ClickhouseBackupVersion: version,
CreationDate: time.Now().UTC(),
Tags: tags,
Expand Down
Loading