Skip to content

Commit

Permalink
refactoring of create command to allow parallel execution of `FREEZ…
Browse files Browse the repository at this point in the history
…E` and `UNFREEZE` and table level parallelization `object_disk.CopyObject`
  • Loading branch information
Slach committed Jan 23, 2024
1 parent ca9c20b commit 15b0774
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 41 deletions.
4 changes: 4 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# v2.4.20
IMPROVEMENTS
- refactoring of `create` command to allow parallel execution of `FREEZE` and `UNFREEZE` and table level parallelization `object_disk.CopyObject`

# v2.4.19
BUG FIXES
- use single s3:CopyObject call instead s3:CreateMultipartUpload+s3:UploadCopyPart+s3:CompleteMultipartUpload for files with size less 5Gb
Expand Down
99 changes: 58 additions & 41 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/Altinity/clickhouse-backup/pkg/config"
"github.com/Altinity/clickhouse-backup/pkg/storage"
"golang.org/x/sync/errgroup"
"os"
"path"
Expand All @@ -15,13 +17,11 @@ import (

"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
"github.com/Altinity/clickhouse-backup/pkg/common"
"github.com/Altinity/clickhouse-backup/pkg/config"
"github.com/Altinity/clickhouse-backup/pkg/filesystemhelper"
"github.com/Altinity/clickhouse-backup/pkg/keeper"
"github.com/Altinity/clickhouse-backup/pkg/metadata"
"github.com/Altinity/clickhouse-backup/pkg/partition"
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/Altinity/clickhouse-backup/pkg/storage"
"github.com/Altinity/clickhouse-backup/pkg/storage/object_disk"
"github.com/Altinity/clickhouse-backup/pkg/utils"

Expand Down Expand Up @@ -155,34 +155,56 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par
return err
}
}
isObjectDiskPresent := false
for _, disk := range disks {
if disk.Type == "s3" || disk.Type == "azure_blob_storage" {
isObjectDiskPresent = true
break
}
}
if isObjectDiskPresent {
if err = config.ValidateObjectDiskConfig(b.cfg); err != nil {
return err
}
b.dst, err = storage.NewBackupDestination(ctx, b.cfg, b.ch, false, backupName)
if err != nil {
return err
}
if err := b.dst.Connect(ctx); err != nil {
return fmt.Errorf("can't connect to %s: %v", b.dst.Kind(), err)
}
defer b.dst.Close(ctx)
}
var backupDataSize, backupMetadataSize uint64
createBackupWorkingGroup, createCtx := errgroup.WithContext(ctx)
createBackupWorkingGroup.SetLimit(int(b.cfg.General.UploadConcurrency))

var tableMetas []metadata.TableTitle
for _, table := range tables {
select {
case <-ctx.Done():
return ctx.Err()
default:
for _, tableItem := range tables {
//to avoid race condition
table := tableItem
if table.Skip {
continue
}
createBackupWorkingGroup.Go(func() error {
log := log.WithField("table", fmt.Sprintf("%s.%s", table.Database, table.Name))
if table.Skip {
continue
}
var realSize map[string]int64
var disksToPartsMap map[string][]metadata.Part
if doBackupData && table.BackupType == clickhouse.ShardBackupFull {
log.Debug("create data")
shadowBackupUUID := strings.ReplaceAll(uuid.New().String(), "-", "")
disksToPartsMap, realSize, err = b.AddTableToBackup(ctx, backupName, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}])
if err != nil {
log.Error(err.Error())
if removeBackupErr := b.RemoveBackupLocal(ctx, backupName, disks); removeBackupErr != nil {
var addTableToBackupErr error
disksToPartsMap, realSize, addTableToBackupErr = b.AddTableToBackup(createCtx, backupName, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}])
if addTableToBackupErr != nil {
log.Error(addTableToBackupErr.Error())
if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil {
log.Error(removeBackupErr.Error())
}
// fix corner cases after https://github.com/Altinity/clickhouse-backup/issues/379
if cleanShadowErr := b.Clean(ctx); cleanShadowErr != nil {
log.Error(cleanShadowErr.Error())
}
return err
return addTableToBackupErr
}
// more precise data size calculation
for _, size := range realSize {
Expand All @@ -193,18 +215,19 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par
log.Debug("get in progress mutations list")
inProgressMutations := make([]metadata.MutationMetadata, 0)
if b.cfg.ClickHouse.BackupMutations && !schemaOnly && !rbacOnly && !configsOnly {
inProgressMutations, err = b.ch.GetInProgressMutations(ctx, table.Database, table.Name)
if err != nil {
log.Error(err.Error())
if removeBackupErr := b.RemoveBackupLocal(ctx, backupName, disks); removeBackupErr != nil {
var inProgressMutationsErr error
inProgressMutations, inProgressMutationsErr = b.ch.GetInProgressMutations(createCtx, table.Database, table.Name)
if inProgressMutationsErr != nil {
log.Error(inProgressMutationsErr.Error())
if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil {
log.Error(removeBackupErr.Error())
}
return err
return inProgressMutationsErr
}
}
log.Debug("create metadata")
if schemaOnly || doBackupData {
metadataSize, err := b.createTableMetadata(path.Join(backupPath, "metadata"), metadata.TableMetadata{
metadataSize, createTableMetadataErr := b.createTableMetadata(path.Join(backupPath, "metadata"), metadata.TableMetadata{
Table: table.Name,
Database: table.Database,
Query: table.CreateTableQuery,
Expand All @@ -214,11 +237,11 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par
Mutations: inProgressMutations,
MetadataOnly: schemaOnly || table.BackupType == clickhouse.ShardBackupSchema,
}, disks)
if err != nil {
if removeBackupErr := b.RemoveBackupLocal(ctx, backupName, disks); removeBackupErr != nil {
if createTableMetadataErr != nil {
if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil {
log.Error(removeBackupErr.Error())
}
return err
return createTableMetadataErr
}
backupMetadataSize += metadataSize
tableMetas = append(tableMetas, metadata.TableTitle{
Expand All @@ -227,28 +250,34 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par
})
}
log.Infof("done")
}
return nil
})
}
if wgWaitErr := createBackupWorkingGroup.Wait(); wgWaitErr != nil {
return fmt.Errorf("one of createBackupLocal go-routine return error: %v", wgWaitErr)
}
backupRBACSize, backupConfigSize := uint64(0), uint64(0)

if createRBAC || rbacOnly {
if backupRBACSize, err = b.createBackupRBAC(ctx, backupPath, disks); err != nil {
var createRBACErr error
if backupRBACSize, createRBACErr = b.createBackupRBAC(ctx, backupPath, disks); createRBACErr != nil {
log.Fatalf("error during do RBAC backup: %v", err)
} else {
log.WithField("size", utils.FormatBytes(backupRBACSize)).Info("done createBackupRBAC")
}
}
if createConfigs || configsOnly {
if backupConfigSize, err = b.createBackupConfigs(ctx, backupPath); err != nil {
log.Fatalf("error during do CONFIG backup: %v", err)
var createConfigsErr error
if backupConfigSize, createConfigsErr = b.createBackupConfigs(ctx, backupPath); createConfigsErr != nil {
log.Fatalf("error during do CONFIG backup: %v", createConfigsErr)
} else {
log.WithField("size", utils.FormatBytes(backupConfigSize)).Info("done createBackupConfigs")
}
}

backupMetaFile := path.Join(defaultPath, "backup", backupName, "metadata.json")
if err := b.createBackupMetadata(ctx, backupMetaFile, backupName, version, "regular", diskMap, diskTypes, disks, backupDataSize, backupMetadataSize, backupRBACSize, backupConfigSize, tableMetas, allDatabases, allFunctions, log); err != nil {
return err
return fmt.Errorf("createBackupMetadata return error: %v", err)
}
log.WithField("duration", utils.HumanizeDuration(time.Since(startBackup))).Info("done")
return nil
Expand Down Expand Up @@ -576,19 +605,7 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku
disksToPartsMap[disk.Name] = parts
log.WithField("disk", disk.Name).Debug("shadow moved")
if (disk.Type == "s3" || disk.Type == "azure_blob_storage") && len(parts) > 0 {
if err = config.ValidateObjectDiskConfig(b.cfg); err != nil {
return nil, nil, err
}
start := time.Now()
if b.dst == nil {
b.dst, err = storage.NewBackupDestination(ctx, b.cfg, b.ch, false, backupName)
if err != nil {
return nil, nil, err
}
}
if err := b.dst.Connect(ctx); err != nil {
return nil, nil, fmt.Errorf("can't connect to %s: %v", b.dst.Kind(), err)
}
if size, err = b.uploadObjectDiskParts(ctx, backupName, backupShadowPath, disk); err != nil {
return disksToPartsMap, realSize, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/object_disk/object_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
Expand Down Expand Up @@ -236,6 +237,9 @@ var SystemDisks map[string]clickhouse.Disk

func InitCredentialsAndConnections(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName string) error {
var err error
var mu sync.Mutex
mu.Lock()
defer mu.Unlock()
if _, exists := DisksCredentials[diskName]; !exists {
DisksCredentials, err = getObjectDisksCredentials(ctx, ch)
if err != nil {
Expand Down

0 comments on commit 15b0774

Please sign in to comment.