Skip to content

Commit

Permalink
remove HeadObject request to calculate source key size in CopyObject,…
Browse files Browse the repository at this point in the history
… to allow cross region S3 disks backup, fix #813
  • Loading branch information
Slach committed Jan 17, 2024
1 parent b40e561 commit e6fa684
Show file tree
Hide file tree
Showing 13 changed files with 31 additions and 34 deletions.
4 changes: 4 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# v2.4.18
BUG FIXES
- remove HeadObject request to calculate source key size in CopyObject, to allow cross region S3 disks backup, fix https://github.com/Altinity/clickhouse-backup/issues/813

# v2.4.17
BUG FIXES
- skip CopyObject execution for keys which have zero size, to allow properly backup S3, GCS over S3 and Azure disks
Expand Down
5 changes: 3 additions & 2 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backup
if !exists {
return 0, fmt.Errorf("uploadObjectDiskParts: %s not present in object_disk.DisksConnections", disk.Name)
}

srcBucket := srcDiskConnection.GetRemoteBucket()
walkErr := filepath.Walk(backupShadowPath, func(fPath string, fInfo os.FileInfo, err error) error {
if err != nil {
return err
Expand All @@ -662,7 +662,8 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backup
}
if objSize, err = b.dst.CopyObject(
ctx,
srcDiskConnection.GetRemoteBucket(),
storageObject.ObjectSize,
srcBucket,
path.Join(srcDiskConnection.GetRemotePath(), storageObject.ObjectRelativePath),
path.Join(backupName, disk.Name, storageObject.ObjectRelativePath),
); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ func (b *Backuper) restoreDataRegularByAttach(ctx context.Context, backupName st

func (b *Backuper) restoreDataRegularByParts(ctx context.Context, backupName string, table metadata.TableMetadata, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, dstTable clickhouse.Table, log *apexLog.Entry, tablesForRestore ListOfTables, i int) error {
if err := filesystemhelper.HardlinkBackupPartsToStorage(backupName, table, disks, dstTable.DataPaths, b.ch, true); err != nil {
return fmt.Errorf("can't copy data to datached '%s.%s': %v", table.Database, table.Table, err)
return fmt.Errorf("can't copy data to detached '%s.%s': %v", table.Database, table.Table, err)
}
log.Debug("data to 'detached' copied")
if err := b.downloadObjectDiskParts(ctx, backupName, table, diskMap, diskTypes); err != nil {
Expand Down Expand Up @@ -888,7 +888,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
} else {
return fmt.Errorf("incompatible object_disk[%s].Type=%s amd remote_storage: %s", diskName, diskType, b.cfg.General.RemoteStorage)
}
if copiedSize, copyObjectErr := object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, srcBucket, srcKey, storageObject.ObjectRelativePath); copyObjectErr != nil {
if copiedSize, copyObjectErr := object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, storageObject.ObjectSize, srcBucket, srcKey, storageObject.ObjectRelativePath); copyObjectErr != nil {
return fmt.Errorf("object_disk.CopyObject error: %v", err)
} else {
sizeMutex.Lock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func DefaultConfig() *Config {
CompressionFormat: "tar",
BufferSize: 0,
MaxBuffers: 3,
MaxPartsCount: 5000,
MaxPartsCount: 256,
Timeout: "4h",
},
S3: S3Config{
Expand All @@ -556,7 +556,7 @@ func DefaultConfig() *Config {
StorageClass: string(s3types.StorageClassStandard),
Concurrency: int(downloadConcurrency + 1),
PartSize: 0,
MaxPartsCount: 1000,
MaxPartsCount: 256,
},
GCS: GCSConfig{
CompressionLevel: 1,
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (a *AzureBlob) Walk(ctx context.Context, azPath string, recursive bool, pro
return nil
}

func (a *AzureBlob) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) {
func (a *AzureBlob) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) {
dstKey = path.Join(a.Config.ObjectDiskPath, dstKey)
srcURLString := fmt.Sprintf("%s://%s.%s/%s/%s", a.Config.EndpointSchema, a.Config.AccountName, a.Config.EndpointSuffix, srcBucket, srcKey)
log.Debugf("AZBLOB->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, a.Config.Container, dstKey)
Expand Down Expand Up @@ -302,7 +302,9 @@ func (a *AzureBlob) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey st
copyStatus = dstMeta.CopyStatus()
copyStatusDesc = dstMeta.CopyStatusDescription()
size = dstMeta.ContentLength()
pollCount++
if pollCount < 8 {
pollCount++
}
}
if copyStatus == azblob.CopyStatusFailed {
return 0, fmt.Errorf("azblob->CopyObject got CopyStatusFailed %s", copyStatusDesc)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (c *COS) PutFile(ctx context.Context, key string, r io.ReadCloser) error {
return err
}

func (c *COS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) {
func (c *COS) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) {
return 0, fmt.Errorf("CopyObject not imlemented for %s", c.Kind())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/ftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (f *FTP) PutFile(ctx context.Context, key string, r io.ReadCloser) error {
return client.Stor(k, r)
}

func (f *FTP) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) {
func (f *FTP) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) {
return 0, fmt.Errorf("CopyObject not imlemented for %s", f.Kind())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (gcs *GCS) DeleteFileFromObjectDiskBackup(ctx context.Context, key string)
return gcs.deleteKey(ctx, key)
}

func (gcs *GCS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) {
func (gcs *GCS) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) {
dstKey = path.Join(gcs.Config.ObjectDiskPath, dstKey)
log.Debugf("GCS->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, gcs.Config.Bucket, dstKey)
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"archive/tar"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
"github.com/Altinity/clickhouse-backup/pkg/config"
Expand Down Expand Up @@ -409,7 +410,7 @@ func (bd *BackupDestination) DownloadCompressedStream(ctx context.Context, remot

func (bd *BackupDestination) UploadCompressedStream(ctx context.Context, baseLocalPath string, files []string, remotePath string) error {
if _, err := bd.StatFile(ctx, remotePath); err != nil {
if err != ErrNotFound && !os.IsNotExist(err) {
if !errors.Is(err, ErrNotFound) && !os.IsNotExist(err) {
return err
}
}
Expand Down Expand Up @@ -658,8 +659,8 @@ func NewBackupDestination(ctx context.Context, cfg *config.Config, ch *clickhous
if cfg.General.MaxFileSize%cfg.S3.MaxPartsCount > 0 {
partSize++
}
if partSize < 25*1024*1024 {
partSize = 25 * 1024 * 1024
if partSize < 5*1024*1024 {
partSize = 5 * 1024 * 1024
}
if partSize > 5*1024*1024*1024 {
partSize = 5 * 1024 * 1024 * 1024
Expand Down Expand Up @@ -763,7 +764,7 @@ func NewBackupDestination(ctx context.Context, cfg *config.Config, ch *clickhous
}
}

// https://github.com/Altinity/clickhouse-backup/issues/588
// ApplyMacrosToObjectLabels https://github.com/Altinity/clickhouse-backup/issues/588
func ApplyMacrosToObjectLabels(ctx context.Context, objectLabels map[string]string, ch *clickhouse.ClickHouse, backupName string) (map[string]string, error) {
var err error
for k, v := range objectLabels {
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/object_disk/object_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func getObjectDisksCredentials(ctx context.Context, ch *clickhouse.ClickHouse) (
for _, d := range disks {
diskName := d.Data
if diskTypeNode := d.SelectElement("type"); diskTypeNode != nil {
diskType := diskTypeNode.InnerText()
diskType := strings.Trim(diskTypeNode.InnerText(), "\r\n \t")
switch diskType {
case "s3", "s3_plain":
creds := ObjectStorageCredentials{
Expand Down Expand Up @@ -632,11 +632,11 @@ func GetFileSize(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Con
}
*/

func CopyObject(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName, srcBucket, srcKey, dstPath string) (int64, error) {
func CopyObject(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName string, srcSize int64, srcBucket, srcKey, dstPath string) (int64, error) {
if err := InitCredentialsAndConnections(ctx, ch, cfg, diskName); err != nil {
return 0, err
}
connection := DisksConnections[diskName]
remoteStorage := connection.GetRemoteStorage()
return remoteStorage.CopyObject(ctx, srcBucket, srcKey, dstPath)
return remoteStorage.CopyObject(ctx, srcSize, srcBucket, srcKey, dstPath)
}
17 changes: 3 additions & 14 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (s *S3) remotePager(ctx context.Context, s3Path string, recursive bool, pro
return nil
}

func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) {
func (s *S3) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) {
dstKey = path.Join(s.Config.ObjectDiskPath, dstKey)
s.Log.Debugf("S3->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, s.Config.Bucket, dstKey)
if strings.Contains(s.Config.Endpoint, "storage.googleapis.com") {
Expand All @@ -472,17 +472,6 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (
}
return *dstObjResp.ContentLength, nil
}
// Get the size of the source object
headParams := &s3.HeadObjectInput{
Bucket: aws.String(srcBucket),
Key: aws.String(srcKey),
}
s.enrichHeadParams(headParams)
sourceObjResp, err := s.client.HeadObject(ctx, headParams)
if err != nil {
return 0, errors.Wrapf(err, "s3://%s/%s", srcBucket, srcKey)
}
srcSize := *sourceObjResp.ContentLength
// just copy object without multipart
if srcSize == 0 {
params := &s3.CopyObjectInput{
Expand Down Expand Up @@ -518,8 +507,8 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (
if srcSize%s.Config.MaxPartsCount > 0 {
partSize++
}
if partSize < 25*1024*1024 {
partSize = 25 * 1024 * 1024
if partSize < 5*1024*1024 {
partSize = 5 * 1024 * 1024
}

// Calculate the number of parts
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/sftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (sftp *SFTP) PutFile(ctx context.Context, key string, localFile io.ReadClos
return nil
}

func (sftp *SFTP) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) {
func (sftp *SFTP) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error) {
return 0, fmt.Errorf("CopyObject not imlemented for %s", sftp.Kind())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ type RemoteStorage interface {
GetFileReader(ctx context.Context, key string) (io.ReadCloser, error)
GetFileReaderWithLocalPath(ctx context.Context, key, localPath string) (io.ReadCloser, error)
PutFile(ctx context.Context, key string, r io.ReadCloser) error
CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error)
CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, dstKey string) (int64, error)
}

0 comments on commit e6fa684

Please sign in to comment.