From e6fa684cc06179b8995497bc042684aa91ac8e72 Mon Sep 17 00:00:00 2001 From: Slach Date: Wed, 17 Jan 2024 12:30:54 +0400 Subject: [PATCH] remove HeadObject request to calculate source key size in CopyObject, to allow cross region S3 disks backup, fix https://github.com/Altinity/clickhouse-backup/issues/813 --- ChangeLog.md | 4 ++++ pkg/backup/create.go | 5 +++-- pkg/backup/restore.go | 4 ++-- pkg/config/config.go | 4 ++-- pkg/storage/azblob.go | 6 ++++-- pkg/storage/cos.go | 2 +- pkg/storage/ftp.go | 2 +- pkg/storage/gcs.go | 2 +- pkg/storage/general.go | 9 +++++---- pkg/storage/object_disk/object_disk.go | 6 +++--- pkg/storage/s3.go | 17 +++-------------- pkg/storage/sftp.go | 2 +- pkg/storage/structs.go | 2 +- 13 files changed, 31 insertions(+), 34 deletions(-) diff --git a/ChangeLog.md b/ChangeLog.md index a36f4307..5aa118d6 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,3 +1,7 @@ +# v2.4.18 +BUG FIXES +- remove HeadObject request to calculate source key size in CopyObject, to allow cross region S3 disks backup, fix https://github.com/Altinity/clickhouse-backup/issues/813 + # v2.4.17 BUG FIXES - skip CopyObject execution for keys which have zero size, to allow properly backup S3, GCS over S3 and Azure disks diff --git a/pkg/backup/create.go b/pkg/backup/create.go index 76c5cbde..3d4930bd 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -638,7 +638,7 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backup if !exists { return 0, fmt.Errorf("uploadObjectDiskParts: %s not present in object_disk.DisksConnections", disk.Name) } - + srcBucket := srcDiskConnection.GetRemoteBucket() walkErr := filepath.Walk(backupShadowPath, func(fPath string, fInfo os.FileInfo, err error) error { if err != nil { return err @@ -662,7 +662,8 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backup } if objSize, err = b.dst.CopyObject( ctx, - srcDiskConnection.GetRemoteBucket(), + storageObject.ObjectSize, + srcBucket, path.Join(srcDiskConnection.GetRemotePath(), storageObject.ObjectRelativePath), path.Join(backupName, disk.Name, storageObject.ObjectRelativePath), ); err != nil { diff --git a/pkg/backup/restore.go b/pkg/backup/restore.go index cdcc7d55..b2156c69 100644 --- a/pkg/backup/restore.go +++ b/pkg/backup/restore.go @@ -799,7 +799,7 @@ func (b *Backuper) restoreDataRegularByAttach(ctx context.Context, backupName st func (b *Backuper) restoreDataRegularByParts(ctx context.Context, backupName string, table metadata.TableMetadata, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, dstTable clickhouse.Table, log *apexLog.Entry, tablesForRestore ListOfTables, i int) error { if err := filesystemhelper.HardlinkBackupPartsToStorage(backupName, table, disks, dstTable.DataPaths, b.ch, true); err != nil { - return fmt.Errorf("can't copy data to datached '%s.%s': %v", table.Database, table.Table, err) + return fmt.Errorf("can't copy data to detached '%s.%s': %v", table.Database, table.Table, err) } log.Debug("data to 'detached' copied") if err := b.downloadObjectDiskParts(ctx, backupName, table, diskMap, diskTypes); err != nil { @@ -888,7 +888,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin } else { return fmt.Errorf("incompatible object_disk[%s].Type=%s amd remote_storage: %s", diskName, diskType, b.cfg.General.RemoteStorage) } - if copiedSize, copyObjectErr := object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, srcBucket, srcKey, storageObject.ObjectRelativePath); copyObjectErr != nil { + if copiedSize, copyObjectErr := object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, storageObject.ObjectSize, srcBucket, srcKey, storageObject.ObjectRelativePath); copyObjectErr != nil { return fmt.Errorf("object_disk.CopyObject error: %v", err) } else { sizeMutex.Lock() diff --git a/pkg/config/config.go b/pkg/config/config.go index ff555bec..bb45712a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -541,7 +541,7 @@ func DefaultConfig() *Config { CompressionFormat: "tar", BufferSize: 0, MaxBuffers: 3, - MaxPartsCount: 5000, + MaxPartsCount: 256, Timeout: "4h", }, S3: S3Config{ @@ -556,7 +556,7 @@ func DefaultConfig() *Config { StorageClass: string(s3types.StorageClassStandard), Concurrency: int(downloadConcurrency + 1), PartSize: 0, - MaxPartsCount: 1000, + MaxPartsCount: 256, }, GCS: GCSConfig{ CompressionLevel: 1, diff --git a/pkg/storage/azblob.go b/pkg/storage/azblob.go index c3329d98..b0ed946b 100644 --- a/pkg/storage/azblob.go +++ b/pkg/storage/azblob.go @@ -271,7 +271,7 @@ func (a *AzureBlob) Walk(ctx context.Context, azPath string, recursive bool, pro return nil } -func (a *AzureBlob) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) { +func (a *AzureBlob) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) { dstKey = path.Join(a.Config.ObjectDiskPath, dstKey) srcURLString := fmt.Sprintf("%s://%s.%s/%s/%s", a.Config.EndpointSchema, a.Config.AccountName, a.Config.EndpointSuffix, srcBucket, srcKey) log.Debugf("AZBLOB->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, a.Config.Container, dstKey) @@ -302,7 +302,9 @@ func (a *AzureBlob) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey st copyStatus = dstMeta.CopyStatus() copyStatusDesc = dstMeta.CopyStatusDescription() size = dstMeta.ContentLength() - pollCount++ + if pollCount < 8 { + pollCount++ + } } if copyStatus == azblob.CopyStatusFailed { return 0, fmt.Errorf("azblob->CopyObject got CopyStatusFailed %s", copyStatusDesc) diff --git a/pkg/storage/cos.go b/pkg/storage/cos.go index 31aa9944..801e62f8 100644 --- a/pkg/storage/cos.go +++ b/pkg/storage/cos.go @@ -146,7 +146,7 @@ func (c *COS) PutFile(ctx context.Context, key string, r io.ReadCloser) error { return err } -func (c *COS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) { +func (c *COS) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) { return 0, fmt.Errorf("CopyObject not imlemented for %s", c.Kind()) } diff --git a/pkg/storage/ftp.go b/pkg/storage/ftp.go index b6c93024..984f71d6 100644 --- a/pkg/storage/ftp.go +++ b/pkg/storage/ftp.go @@ -208,7 +208,7 @@ func (f *FTP) PutFile(ctx context.Context, key string, r io.ReadCloser) error { return client.Stor(k, r) } -func (f *FTP) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) { +func (f *FTP) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) { return 0, fmt.Errorf("CopyObject not imlemented for %s", f.Kind()) } diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 034e8b1b..50e1f507 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -325,7 +325,7 @@ func (gcs *GCS) DeleteFileFromObjectDiskBackup(ctx context.Context, key string) return gcs.deleteKey(ctx, key) } -func (gcs *GCS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) { +func (gcs *GCS) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) { dstKey = path.Join(gcs.Config.ObjectDiskPath, dstKey) log.Debugf("GCS->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, gcs.Config.Bucket, dstKey) pClientObj, err := gcs.clientPool.BorrowObject(ctx) diff --git a/pkg/storage/general.go b/pkg/storage/general.go index e585ce5a..55fe746d 100644 --- a/pkg/storage/general.go +++ b/pkg/storage/general.go @@ -4,6 +4,7 @@ import ( "archive/tar" "context" "encoding/json" + "errors" "fmt" "github.com/Altinity/clickhouse-backup/pkg/clickhouse" "github.com/Altinity/clickhouse-backup/pkg/config" @@ -409,7 +410,7 @@ func (bd *BackupDestination) DownloadCompressedStream(ctx context.Context, remot func (bd *BackupDestination) UploadCompressedStream(ctx context.Context, baseLocalPath string, files []string, remotePath string) error { if _, err := bd.StatFile(ctx, remotePath); err != nil { - if err != ErrNotFound && !os.IsNotExist(err) { + if !errors.Is(err, ErrNotFound) && !os.IsNotExist(err) { return err } } @@ -658,8 +659,8 @@ func NewBackupDestination(ctx context.Context, cfg *config.Config, ch *clickhous if cfg.General.MaxFileSize%cfg.S3.MaxPartsCount > 0 { partSize++ } - if partSize < 25*1024*1024 { - partSize = 25 * 1024 * 1024 + if partSize < 5*1024*1024 { + partSize = 5 * 1024 * 1024 } if partSize > 5*1024*1024*1024 { partSize = 5 * 1024 * 1024 * 1024 @@ -763,7 +764,7 @@ func NewBackupDestination(ctx context.Context, cfg *config.Config, ch *clickhous } } -// https://github.com/Altinity/clickhouse-backup/issues/588 +// ApplyMacrosToObjectLabels https://github.com/Altinity/clickhouse-backup/issues/588 func ApplyMacrosToObjectLabels(ctx context.Context, objectLabels map[string]string, ch *clickhouse.ClickHouse, backupName string) (map[string]string, error) { var err error for k, v := range objectLabels { diff --git a/pkg/storage/object_disk/object_disk.go b/pkg/storage/object_disk/object_disk.go index 8b93cbb4..86f05034 100644 --- a/pkg/storage/object_disk/object_disk.go +++ b/pkg/storage/object_disk/object_disk.go @@ -309,7 +309,7 @@ func getObjectDisksCredentials(ctx context.Context, ch *clickhouse.ClickHouse) ( for _, d := range disks { diskName := d.Data if diskTypeNode := d.SelectElement("type"); diskTypeNode != nil { - diskType := diskTypeNode.InnerText() + diskType := strings.Trim(diskTypeNode.InnerText(), "\r\n \t") switch diskType { case "s3", "s3_plain": creds := ObjectStorageCredentials{ @@ -632,11 +632,11 @@ func GetFileSize(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Con } */ -func CopyObject(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName, srcBucket, srcKey, dstPath string) (int64, error) { +func CopyObject(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName string, srcSize int64, srcBucket, srcKey, dstPath string) (int64, error) { if err := InitCredentialsAndConnections(ctx, ch, cfg, diskName); err != nil { return 0, err } connection := DisksConnections[diskName] remoteStorage := connection.GetRemoteStorage() - return remoteStorage.CopyObject(ctx, srcBucket, srcKey, dstPath) + return remoteStorage.CopyObject(ctx, srcSize, srcBucket, srcKey, dstPath) } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index fc4a94ce..7c7b678b 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -446,7 +446,7 @@ func (s *S3) remotePager(ctx context.Context, s3Path string, recursive bool, pro return nil } -func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) { +func (s *S3) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) { dstKey = path.Join(s.Config.ObjectDiskPath, dstKey) s.Log.Debugf("S3->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, s.Config.Bucket, dstKey) if strings.Contains(s.Config.Endpoint, "storage.googleapis.com") { @@ -472,17 +472,6 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) ( } return *dstObjResp.ContentLength, nil } - // Get the size of the source object - headParams := &s3.HeadObjectInput{ - Bucket: aws.String(srcBucket), - Key: aws.String(srcKey), - } - s.enrichHeadParams(headParams) - sourceObjResp, err := s.client.HeadObject(ctx, headParams) - if err != nil { - return 0, errors.Wrapf(err, "s3://%s/%s", srcBucket, srcKey) - } - srcSize := *sourceObjResp.ContentLength // just copy object without multipart if srcSize == 0 { params := &s3.CopyObjectInput{ @@ -518,8 +507,8 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) ( if srcSize%s.Config.MaxPartsCount > 0 { partSize++ } - if partSize < 25*1024*1024 { - partSize = 25 * 1024 * 1024 + if partSize < 5*1024*1024 { + partSize = 5 * 1024 * 1024 } // Calculate the number of parts diff --git a/pkg/storage/sftp.go b/pkg/storage/sftp.go index 9fe39cf4..c1fe5743 100644 --- a/pkg/storage/sftp.go +++ b/pkg/storage/sftp.go @@ -238,7 +238,7 @@ func (sftp *SFTP) PutFile(ctx context.Context, key string, localFile io.ReadClos return nil } -func (sftp *SFTP) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) { +func (sftp *SFTP) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) { return 0, fmt.Errorf("CopyObject not imlemented for %s", sftp.Kind()) } diff --git a/pkg/storage/structs.go b/pkg/storage/structs.go index d26b4c35..9c318247 100644 --- a/pkg/storage/structs.go +++ b/pkg/storage/structs.go @@ -31,5 +31,5 @@ type RemoteStorage interface { GetFileReader(ctx context.Context, key string) (io.ReadCloser, error) GetFileReaderWithLocalPath(ctx context.Context, key, localPath string) (io.ReadCloser, error) PutFile(ctx context.Context, key string, r io.ReadCloser) error - CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) + CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) }