Skip to content

Commit

Permalink
add S3_REQUEST_PAYER config parameter, look https://docs.aws.amazon…
Browse files Browse the repository at this point in the history
  • Loading branch information
Slach committed Dec 5, 2023
1 parent bfc1673 commit 91b8e35
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 92 deletions.
1 change: 1 addition & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# v2.4.10
IMPROVEMENTS
- update go modules to latest versions
- add `S3_REQUEST_PAYER` config parameter, look https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html for details, fix [795](https://github.com/Altinity/clickhouse-backup/issues/795)

# v2.4.9
BUG FIXES
Expand Down
2 changes: 2 additions & 0 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ s3:
object_labels: {}
# S3_CUSTOM_STORAGE_CLASS_MAP, allow setup storage class depending on the backup name regexp pattern, format nameRegexp > className
custom_storage_class_map: {}
# S3_REQUEST_PAYER, define who will pay to request, look https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html for details, possible values requester, if empty then bucket owner
request_payer: ""
debug: false # S3_DEBUG
gcs:
credentials_file: "" # GCS_CREDENTIALS_FILE
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type S3Config struct {
MaxPartsCount int64 `yaml:"max_parts_count" envconfig:"S3_MAX_PARTS_COUNT"`
AllowMultipartDownload bool `yaml:"allow_multipart_download" envconfig:"S3_ALLOW_MULTIPART_DOWNLOAD"`
ObjectLabels map[string]string `yaml:"object_labels" envconfig:"S3_OBJECT_LABELS"`
RequestPayer string `yaml:"request_payer" envconfig:"S3_REQUEST_PAYER"`
Debug bool `yaml:"debug" envconfig:"S3_DEBUG"`
}

Expand Down
218 changes: 132 additions & 86 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,7 @@ func (s *S3) GetFileReader(ctx context.Context, key string) (io.ReadCloser, erro
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(path.Join(s.Config.Path, key)),
}
if s.Config.SSECustomerAlgorithm != "" {
params.SSECustomerAlgorithm = aws.String(s.Config.SSECustomerAlgorithm)
}
if s.Config.SSECustomerKey != "" {
params.SSECustomerKey = aws.String(s.Config.SSECustomerKey)
}
if s.Config.SSECustomerKeyMD5 != "" {
params.SSECustomerKeyMD5 = aws.String(s.Config.SSECustomerKeyMD5)
}
s.enrichGetObjectParams(params)
resp, err := s.client.GetObject(ctx, params)
if err != nil {
var opError *smithy.OperationError
Expand Down Expand Up @@ -241,6 +233,21 @@ func (s *S3) GetFileReader(ctx context.Context, key string) (io.ReadCloser, erro
return resp.Body, nil
}

func (s *S3) enrichGetObjectParams(params *s3.GetObjectInput) {
if s.Config.SSECustomerAlgorithm != "" {
params.SSECustomerAlgorithm = aws.String(s.Config.SSECustomerAlgorithm)
}
if s.Config.SSECustomerKey != "" {
params.SSECustomerKey = aws.String(s.Config.SSECustomerKey)
}
if s.Config.SSECustomerKeyMD5 != "" {
params.SSECustomerKeyMD5 = aws.String(s.Config.SSECustomerKeyMD5)
}
if s.Config.RequestPayer != "" {
params.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
}

func (s *S3) GetFileReaderWithLocalPath(ctx context.Context, key, localPath string) (io.ReadCloser, error) {
/* unfortunately, multipart download require allocate additional disk space
and don't allow us to decompress data directly from stream */
Expand Down Expand Up @@ -311,6 +318,9 @@ func (s *S3) deleteKey(ctx context.Context, key string) error {
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(key),
}
if s.Config.RequestPayer != "" {
params.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
if s.versioning {
objVersion, err := s.getObjectVersion(ctx, key)
if err != nil {
Expand Down Expand Up @@ -349,6 +359,9 @@ func (s *S3) getObjectVersion(ctx context.Context, key string) (*string, error)
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(key),
}
if s.Config.RequestPayer != "" {
params.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
object, err := s.client.HeadObject(ctx, params)
if err != nil {
return nil, err
Expand All @@ -361,7 +374,7 @@ func (s *S3) StatFile(ctx context.Context, key string) (RemoteFile, error) {
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(path.Join(s.Config.Path, key)),
}
s.enrichHeadParamsWithSSE(params)
s.enrichHeadParams(params)
head, err := s.client.HeadObject(ctx, params)
if err != nil {
var opError *smithy.OperationError
Expand Down Expand Up @@ -440,102 +453,47 @@ func (s *S3) remotePager(ctx context.Context, s3Path string, recursive bool, pro
func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) {
dstKey = path.Join(s.Config.ObjectDiskPath, dstKey)
if strings.Contains(s.Config.Endpoint, "storage.googleapis.com") {
params := s3.CopyObjectInput{
params := &s3.CopyObjectInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(dstKey),
CopySource: aws.String(path.Join(srcBucket, srcKey)),
StorageClass: s3types.StorageClass(strings.ToUpper(s.Config.StorageClass)),
}
// https://github.com/Altinity/clickhouse-backup/issues/588
if len(s.Config.ObjectLabels) > 0 {
tags := ""
for k, v := range s.Config.ObjectLabels {
if tags != "" {
tags += "&"
}
tags += k + "=" + v
}
params.Tagging = aws.String(tags)
}
if s.Config.SSE != "" {
params.ServerSideEncryption = s3types.ServerSideEncryption(s.Config.SSE)
}
if s.Config.SSEKMSKeyId != "" {
params.SSEKMSKeyId = aws.String(s.Config.SSEKMSKeyId)
}
if s.Config.SSECustomerAlgorithm != "" {
params.SSECustomerAlgorithm = aws.String(s.Config.SSECustomerAlgorithm)
}
if s.Config.SSECustomerKey != "" {
params.SSECustomerKey = aws.String(s.Config.SSECustomerKey)
}
if s.Config.SSECustomerKeyMD5 != "" {
params.SSECustomerKeyMD5 = aws.String(s.Config.SSECustomerKeyMD5)
}
if s.Config.SSEKMSEncryptionContext != "" {
params.SSEKMSEncryptionContext = aws.String(s.Config.SSEKMSEncryptionContext)
}
_, err := s.client.CopyObject(ctx, &params)
s.enrichCopyObjectParams(params)
_, err := s.client.CopyObject(ctx, params)
if err != nil {
return 0, err
}
dstHeadParams := &s3.HeadObjectInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(dstKey),
}
s.enrichHeadParamsWithSSE(dstHeadParams)
s.enrichHeadParams(dstHeadParams)
dstObjResp, err := s.client.HeadObject(ctx, dstHeadParams)
if err != nil {
return 0, err
}
return *dstObjResp.ContentLength, nil
}
// Get the size of the source object
sourceObjResp, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
headParams := &s3.HeadObjectInput{
Bucket: aws.String(srcBucket),
Key: aws.String(srcKey),
})
}
s.enrichHeadParams(headParams)
sourceObjResp, err := s.client.HeadObject(ctx, headParams)
if err != nil {
return 0, err
}
srcSize := *sourceObjResp.ContentLength
// Initiate a multipart upload
params := s3.CreateMultipartUploadInput{
createMultipartUploadParams := &s3.CreateMultipartUploadInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(dstKey),
StorageClass: s3types.StorageClass(strings.ToUpper(s.Config.StorageClass)),
}
// https://github.com/Altinity/clickhouse-backup/issues/588
if len(s.Config.ObjectLabels) > 0 {
tags := ""
for k, v := range s.Config.ObjectLabels {
if tags != "" {
tags += "&"
}
tags += k + "=" + v
}
params.Tagging = aws.String(tags)
}
if s.Config.SSE != "" {
params.ServerSideEncryption = s3types.ServerSideEncryption(s.Config.SSE)
}
if s.Config.SSEKMSKeyId != "" {
params.SSEKMSKeyId = aws.String(s.Config.SSEKMSKeyId)
}
if s.Config.SSECustomerAlgorithm != "" {
params.SSECustomerAlgorithm = aws.String(s.Config.SSECustomerAlgorithm)
}
if s.Config.SSECustomerKey != "" {
params.SSECustomerKey = aws.String(s.Config.SSECustomerKey)
}
if s.Config.SSECustomerKeyMD5 != "" {
params.SSECustomerKeyMD5 = aws.String(s.Config.SSECustomerKeyMD5)
}
if s.Config.SSEKMSEncryptionContext != "" {
params.SSEKMSEncryptionContext = aws.String(s.Config.SSEKMSEncryptionContext)
}

initResp, err := s.client.CreateMultipartUpload(ctx, &params)
s.enrichCreateMultipartUploadParams(createMultipartUploadParams)
initResp, err := s.client.CreateMultipartUpload(ctx, createMultipartUploadParams)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -578,14 +536,18 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (
copyPartErrGroup.Go(func() error {
defer copyPartSemaphore.Release(1)
// Copy the part
partResp, err := s.client.UploadPartCopy(ctx, &s3.UploadPartCopyInput{
uploadPartParams := &s3.UploadPartCopyInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(dstKey),
CopySource: aws.String(srcBucket + "/" + srcKey),
CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", start, end-1)),
UploadId: uploadID,
PartNumber: aws.Int32(currentPartNumber),
})
}
if s.Config.RequestPayer != "" {
uploadPartParams.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
partResp, err := s.client.UploadPartCopy(ctx, uploadPartParams)
if err != nil {
return err
}
Expand All @@ -599,33 +561,111 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (
})
}
if err := copyPartErrGroup.Wait(); err != nil {
_, abortErr := s.client.AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{
abortParams := &s3.AbortMultipartUploadInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(dstKey),
UploadId: uploadID,
})
}
if s.Config.RequestPayer != "" {
abortParams.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
_, abortErr := s.client.AbortMultipartUpload(context.Background(), abortParams)
if abortErr != nil {
return 0, fmt.Errorf("aborting CopyObject multipart upload: %v, original error was: %v", abortErr, err)
}
return 0, fmt.Errorf("one of CopyObject go-routine return error: %v", err)
}

// Complete the multipart upload
_, err = s.client.CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{
completeMultipartUploadParams := &s3.CompleteMultipartUploadInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(dstKey),
UploadId: uploadID,
MultipartUpload: &s3types.CompletedMultipartUpload{Parts: parts},
})
}
if s.Config.RequestPayer != "" {
completeMultipartUploadParams.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
_, err = s.client.CompleteMultipartUpload(context.Background(), completeMultipartUploadParams)
if err != nil {
return 0, fmt.Errorf("complete CopyObject multipart upload: %v", err)
}
s.Log.Debugf("S3->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, s.Config.Bucket, dstKey)
return srcSize, nil
}

func (s *S3) enrichCreateMultipartUploadParams(params *s3.CreateMultipartUploadInput) {
if s.Config.RequestPayer != "" {
params.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
// https://github.com/Altinity/clickhouse-backup/issues/588
if len(s.Config.ObjectLabels) > 0 {
tags := ""
for k, v := range s.Config.ObjectLabels {
if tags != "" {
tags += "&"
}
tags += k + "=" + v
}
params.Tagging = aws.String(tags)
}
if s.Config.SSE != "" {
params.ServerSideEncryption = s3types.ServerSideEncryption(s.Config.SSE)
}
if s.Config.SSEKMSKeyId != "" {
params.SSEKMSKeyId = aws.String(s.Config.SSEKMSKeyId)
}
if s.Config.SSECustomerAlgorithm != "" {
params.SSECustomerAlgorithm = aws.String(s.Config.SSECustomerAlgorithm)
}
if s.Config.SSECustomerKey != "" {
params.SSECustomerKey = aws.String(s.Config.SSECustomerKey)
}
if s.Config.SSECustomerKeyMD5 != "" {
params.SSECustomerKeyMD5 = aws.String(s.Config.SSECustomerKeyMD5)
}
if s.Config.SSEKMSEncryptionContext != "" {
params.SSEKMSEncryptionContext = aws.String(s.Config.SSEKMSEncryptionContext)
}
}

func (s *S3) enrichCopyObjectParams(params *s3.CopyObjectInput) {
// https://github.com/Altinity/clickhouse-backup/issues/588
if len(s.Config.ObjectLabels) > 0 {
tags := ""
for k, v := range s.Config.ObjectLabels {
if tags != "" {
tags += "&"
}
tags += k + "=" + v
}
params.Tagging = aws.String(tags)
}
if s.Config.SSE != "" {
params.ServerSideEncryption = s3types.ServerSideEncryption(s.Config.SSE)
}
if s.Config.SSEKMSKeyId != "" {
params.SSEKMSKeyId = aws.String(s.Config.SSEKMSKeyId)
}
if s.Config.SSECustomerAlgorithm != "" {
params.SSECustomerAlgorithm = aws.String(s.Config.SSECustomerAlgorithm)
}
if s.Config.SSECustomerKey != "" {
params.SSECustomerKey = aws.String(s.Config.SSECustomerKey)
}
if s.Config.SSECustomerKeyMD5 != "" {
params.SSECustomerKeyMD5 = aws.String(s.Config.SSECustomerKeyMD5)
}
if s.Config.SSEKMSEncryptionContext != "" {
params.SSEKMSEncryptionContext = aws.String(s.Config.SSEKMSEncryptionContext)
}
if s.Config.RequestPayer != "" {
params.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
}

func (s *S3) restoreObject(ctx context.Context, key string) error {
restoreRequest := s3.RestoreObjectInput{
restoreRequest := &s3.RestoreObjectInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(path.Join(s.Config.Path, key)),
RestoreRequest: &s3types.RestoreRequest{
Expand All @@ -635,7 +675,10 @@ func (s *S3) restoreObject(ctx context.Context, key string) error {
},
},
}
_, err := s.client.RestoreObject(ctx, &restoreRequest)
if s.Config.RequestPayer != "" {
restoreRequest.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
_, err := s.client.RestoreObject(ctx, restoreRequest)
if err != nil {
return err
}
Expand All @@ -645,7 +688,7 @@ func (s *S3) restoreObject(ctx context.Context, key string) error {
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(path.Join(s.Config.Path, key)),
}
s.enrichHeadParamsWithSSE(restoreHeadParams)
s.enrichHeadParams(restoreHeadParams)
res, err := s.client.HeadObject(ctx, restoreHeadParams)
if err != nil {
return fmt.Errorf("restoreObject: failed to head %s object metadata, %v", path.Join(s.Config.Path, key), err)
Expand All @@ -661,7 +704,10 @@ func (s *S3) restoreObject(ctx context.Context, key string) error {
}
}

func (s *S3) enrichHeadParamsWithSSE(headParams *s3.HeadObjectInput) {
func (s *S3) enrichHeadParams(headParams *s3.HeadObjectInput) {
if s.Config.RequestPayer != "" {
headParams.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
if s.Config.SSECustomerAlgorithm != "" {
headParams.SSECustomerAlgorithm = aws.String(s.Config.SSECustomerAlgorithm)
}
Expand Down
1 change: 1 addition & 0 deletions test/integration/config-s3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ s3:
compression_format: tar
allow_multipart_download: true
concurrency: 3
request_payer: requester
api:
listen: :7171
create_integration_tables: true
Expand Down
Loading

0 comments on commit 91b8e35

Please sign in to comment.