From 1acc53c23ba91f8f3d64677885ddf8f4bca32004 Mon Sep 17 00:00:00 2001 From: Slach Date: Mon, 7 Aug 2023 18:14:47 +0400 Subject: [PATCH 1/3] WIP s3:RestoreObject implementation Signed-off-by: Slach --- pkg/backup/create.go | 24 ++++++---- pkg/storage/general.go | 2 +- pkg/storage/s3.go | 71 ++++++++++++++++++++++++++-- test/integration/config-s3-fips.yml | 1 + test/integration/integration_test.go | 2 +- 5 files changed, 85 insertions(+), 15 deletions(-) diff --git a/pkg/backup/create.go b/pkg/backup/create.go index 8870a7b9..47cf2c4b 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -450,15 +450,21 @@ func (b *Backuper) createBackupRBAC(ctx context.Context, backupPath string, disk if err != nil { return 0, err } - log.Debugf("copy %s -> %s", accessPath, rbacBackup) - copyErr := recursiveCopy.Copy(accessPath, rbacBackup, recursiveCopy.Options{ - Skip: func(srcinfo os.FileInfo, src, dest string) (bool, error) { - rbacDataSize += uint64(srcinfo.Size()) - return false, nil - }, - }) - if copyErr != nil { - return 0, copyErr + var fInfo os.FileInfo + if fInfo, err = os.Stat(accessPath); err != nil && !os.IsNotExist(err) { + return 0, err + } + if fInfo.IsDir() { + log.Debugf("copy %s -> %s", accessPath, rbacBackup) + copyErr := recursiveCopy.Copy(accessPath, rbacBackup, recursiveCopy.Options{ + Skip: func(srcinfo os.FileInfo, src, dest string) (bool, error) { + rbacDataSize += uint64(srcinfo.Size()) + return false, nil + }, + }) + if copyErr != nil { + return 0, copyErr + } } replicatedRBACDataSize, err := b.createBackupRBACReplicated(ctx, rbacBackup) if err != nil { diff --git a/pkg/storage/general.go b/pkg/storage/general.go index 191cdadf..d58e0d01 100644 --- a/pkg/storage/general.go +++ b/pkg/storage/general.go @@ -427,7 +427,7 @@ func (bd *BackupDestination) UploadCompressedStream(ctx context.Context, baseLoc defer bar.Finish() pipeBuffer := buffer.New(BufferSize) body, w := nio.Pipe(pipeBuffer) - g, ctx := errgroup.WithContext(context.Background()) + g, ctx := errgroup.WithContext(ctx) var writerErr, readerErr error g.Go(func() error { diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 6945cbf3..94ab374a 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -197,14 +197,36 @@ func (s *S3) Close(ctx context.Context) error { } func (s *S3) GetFileReader(ctx context.Context, key string) (io.ReadCloser, error) { - resp, err := s.client.GetObject(ctx, &s3.GetObjectInput{ + getRequest := &s3.GetObjectInput{ Bucket: aws.String(s.Config.Bucket), Key: aws.String(path.Join(s.Config.Path, key)), - }) + } + resp, err := s.client.GetObject(ctx, getRequest) if err != nil { + var opError *smithy.OperationError + if errors.As(err, &opError) { + var httpErr *awsV2http.ResponseError + if errors.As(opError.Err, &httpErr) { + var stateErr *s3types.InvalidObjectState + if errors.As(httpErr, &stateErr) { + if strings.Contains(string(stateErr.StorageClass), "GLACIER") { + s.Log.Warnf("GetFileReader %s, storageClass %s receive error: %s", key, stateErr.StorageClass, stateErr.Error()) + if restoreErr := s.restoreObject(ctx, key); restoreErr != nil { + s.Log.Warnf("restoreObject %s, return error: %v", key, restoreErr) + return nil, err + } + if resp, err = s.client.GetObject(ctx, getRequest); err != nil { + s.Log.Warnf("second GetObject %s, return error: %v", key, err) + return nil, err + } + return resp.Body, nil + } + } + } + return nil, err + } return nil, err } - return resp.Body, nil } @@ -337,7 +359,7 @@ func (s *S3) StatFile(ctx context.Context, key string) (RemoteFile, error) { } return nil, err } - return &s3File{head.ContentLength, *head.LastModified, key}, nil + return &s3File{head.ContentLength, *head.LastModified, string(head.StorageClass), key}, nil } func (s *S3) Walk(ctx context.Context, s3Path string, recursive bool, process func(ctx context.Context, r RemoteFile) error) error { @@ -355,6 +377,7 @@ func (s *S3) Walk(ctx context.Context, s3Path string, recursive bool, process fu s3Files <- &s3File{ c.Size, *c.LastModified, + string(c.StorageClass), strings.TrimPrefix(*c.Key, path.Join(s.Config.Path, s3Path)), } } @@ -580,9 +603,45 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) ( return srcSize, nil } +func (s *S3) restoreObject(ctx context.Context, key string) error { + restoreRequest := s3.RestoreObjectInput{ + Bucket: aws.String(s.Config.Bucket), + Key: aws.String(path.Join(s.Config.Path, key)), + RestoreRequest: &s3types.RestoreRequest{ + Days: 1, + GlacierJobParameters: &s3types.GlacierJobParameters{ + Tier: s3types.Tier("Expedited"), + }, + }, + } + _, err := s.client.RestoreObject(ctx, &restoreRequest) + if err != nil { + return err + } + i := 0 + for { + headObjectRequest := &s3.HeadObjectInput{ + Bucket: aws.String(s.Config.Bucket), + Key: aws.String(path.Join(s.Config.Path, key)), + } + res, err := s.client.HeadObject(ctx, headObjectRequest) + if err != nil { + return fmt.Errorf("restoreObject: failed to head %s object metadata, %v", path.Join(s.Config.Path, key), err) + } + + if res.Restore != nil && *res.Restore == "ongoing-request=\"true\"" { + s.Log.Debugf("%s still not restored, will wait %d seconds", key, i*5) + time.Sleep(time.Duration(i*5) * time.Second) + } else { + return nil + } + } +} + type s3File struct { size int64 lastModified time.Time + storageClass string name string } @@ -597,3 +656,7 @@ func (f *s3File) Name() string { func (f *s3File) LastModified() time.Time { return f.lastModified } + +func (f *s3File) StorageClass() string { + return f.storageClass +} diff --git a/test/integration/config-s3-fips.yml b/test/integration/config-s3-fips.yml index 724835f8..88adad7a 100644 --- a/test/integration/config-s3-fips.yml +++ b/test/integration/config-s3-fips.yml @@ -35,6 +35,7 @@ s3: compression_format: tar allow_multipart_download: true concurrency: 3 + storage_class: GLACIER api: listen: :7171 create_integration_tables: true diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 371e7f4a..2c0551fe 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -932,7 +932,6 @@ func TestFIPS(t *testing.T) { ch.connectWithWait(r, 1*time.Second, 10*time.Second) defer ch.chbackend.Close() fipsBackupName := fmt.Sprintf("fips_backup_%d", rand.Int()) - r.NoError(dockerExec("clickhouse", "rm", "-fv", "/etc/apt/sources.list.d/clickhouse.list")) installDebIfNotExists(r, "clickhouse", "curl", "gettext-base", "bsdmainutils", "dnsutils", "git", "ca-certificates") r.NoError(dockerCP("config-s3-fips.yml", "clickhouse:/etc/clickhouse-backup/config.yml.fips-template")) r.NoError(dockerExec("clickhouse", "update-ca-certificates")) @@ -958,6 +957,7 @@ func TestFIPS(t *testing.T) { generateCerts("rsa", "4096", "") createSQL := "CREATE TABLE default.fips_table (v UInt64) ENGINE=MergeTree() ORDER BY tuple()" ch.queryWithNoError(r, createSQL) + ch.queryWithNoError(r, "INSERT INTO default.fips_table SELECT number FROM numbers(1000)") r.NoError(dockerExec("clickhouse", "bash", "-ce", "clickhouse-backup-fips create_remote --tables=default.fips_table "+fipsBackupName)) r.NoError(dockerExec("clickhouse", "bash", "-ce", "clickhouse-backup-fips delete local "+fipsBackupName)) r.NoError(dockerExec("clickhouse", "bash", "-ce", "clickhouse-backup-fips restore_remote --tables=default.fips_table "+fipsBackupName)) From 65af682d7330dbda87df307f19365d24a8db388b Mon Sep 17 00:00:00 2001 From: Slach Date: Mon, 7 Aug 2023 20:07:59 +0400 Subject: [PATCH 2/3] S3 explicitly credentials shall have more priority than environment variables --- pkg/storage/s3.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 94ab374a..5f9027f3 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -115,15 +115,6 @@ func (s *S3) Connect(ctx context.Context) error { if s.Config.Region != "" { awsConfig.Region = s.Config.Region } - if s.Config.AccessKey != "" && s.Config.SecretKey != "" { - awsConfig.Credentials = credentials.StaticCredentialsProvider{ - Value: aws.Credentials{ - AccessKeyID: s.Config.AccessKey, - SecretAccessKey: s.Config.SecretKey, - }, - } - } - awsRoleARN := os.Getenv("AWS_ROLE_ARN") if s.Config.AssumeRoleARN != "" || awsRoleARN != "" { stsClient := sts.NewFromConfig(awsConfig) @@ -142,6 +133,15 @@ func (s *S3) Connect(ctx context.Context) error { ) } + if s.Config.AccessKey != "" && s.Config.SecretKey != "" { + awsConfig.Credentials = credentials.StaticCredentialsProvider{ + Value: aws.Credentials{ + AccessKeyID: s.Config.AccessKey, + SecretAccessKey: s.Config.SecretKey, + }, + } + } + if s.Config.Debug { awsConfig.Logger = newS3Logger(s.Log) awsConfig.ClientLogMode = aws.LogRetries | aws.LogRequest | aws.LogResponse From 7f316b6895906d6f834ee8593fa66000cd0724e2 Mon Sep 17 00:00:00 2001 From: Slach Date: Tue, 8 Aug 2023 20:04:06 +0400 Subject: [PATCH 3/3] Add support for s3 `GLACIER` storage class, when GET return error, then, it requires 5 minutes per key and restore could be slow. Use `GLACIER_IR`, it looks more robust, fix https://github.com/Altinity/clickhouse-backup/issues/614 --- ChangeLog.md | 1 + pkg/storage/s3.go | 5 +-- test/integration/config-s3-fips.yml | 1 - test/integration/config-s3-glacier.yml | 44 ++++++++++++++++++++++++++ test/integration/integration_test.go | 22 +++++++++++-- test/integration/run.sh | 4 ++- 6 files changed, 70 insertions(+), 7 deletions(-) create mode 100644 test/integration/config-s3-glacier.yml diff --git a/ChangeLog.md b/ChangeLog.md index 8dad154b..0321d495 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -6,6 +6,7 @@ IMPROVEMENTS - Backup/Restore RBAC related objects from Zookeeper via direct connection to zookeeper/keeper, fix [604](https://github.com/Altinity/clickhouse-backup/issues/604) - Add `SHARDED_OPERATION_MODE` option, to easy create backup for sharded cluster, available values `none` (no sharding), `table` (table granularity), `database` (database granularity), `first-replica` (on the lexicographically sorted first active replica), thanks @mskwon, fix [639](https://github.com/Altinity/clickhouse-backup/issues/639), fix [648](https://github.com/Altinity/clickhouse-backup/pull/648) - Add support for `compression_format: none` for upload and download backups created with `--rbac` / `--rbac-only` or `--configs` / `--configs-only` options, fix [713](https://github.com/Altinity/clickhouse-backup/issues/713) +- Add support for s3 `GLACIER` storage class, when GET return error, then, it requires 5 minutes per key and restore could be slow. Use `GLACIER_IR`, it looks more robust, fix [614](https://github.com/Altinity/clickhouse-backup/issues/614) BUG FIXES - fix possible create backup failures during UNFREEZE not exists tables, affected 2.2.7+ version, fix [704](https://github.com/Altinity/clickhouse-backup/issues/704) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 5f9027f3..0918dc10 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -144,7 +144,7 @@ func (s *S3) Connect(ctx context.Context) error { if s.Config.Debug { awsConfig.Logger = newS3Logger(s.Log) - awsConfig.ClientLogMode = aws.LogRetries | aws.LogRequest | aws.LogResponse + awsConfig.ClientLogMode = aws.LogRetries | aws.LogRequest | aws.LogResponseWithBody } httpTransport := http.DefaultTransport @@ -630,7 +630,8 @@ func (s *S3) restoreObject(ctx context.Context, key string) error { } if res.Restore != nil && *res.Restore == "ongoing-request=\"true\"" { - s.Log.Debugf("%s still not restored, will wait %d seconds", key, i*5) + i += 1 + s.Log.Warnf("%s still not restored, will wait %d seconds", key, i*5) time.Sleep(time.Duration(i*5) * time.Second) } else { return nil diff --git a/test/integration/config-s3-fips.yml b/test/integration/config-s3-fips.yml index 88adad7a..724835f8 100644 --- a/test/integration/config-s3-fips.yml +++ b/test/integration/config-s3-fips.yml @@ -35,7 +35,6 @@ s3: compression_format: tar allow_multipart_download: true concurrency: 3 - storage_class: GLACIER api: listen: :7171 create_integration_tables: true diff --git a/test/integration/config-s3-glacier.yml b/test/integration/config-s3-glacier.yml new file mode 100644 index 00000000..209391e0 --- /dev/null +++ b/test/integration/config-s3-glacier.yml @@ -0,0 +1,44 @@ +general: + disable_progress_bar: true + remote_storage: s3 + upload_concurrency: 4 + download_concurrency: 4 + skip_tables: + - " system.*" + - "INFORMATION_SCHEMA.*" + - "information_schema.*" + - "_temporary_and_external_tables.*" + restore_schema_on_cluster: "{cluster}" +clickhouse: + host: clickhouse + port: 9440 + username: backup + password: meow=& 123?*%# МЯУ + secure: true + skip_verify: true + sync_replicated_tables: true + timeout: 1h + restart_command: bash -c 'echo "FAKE RESTART"' + backup_mutations: true +# secrets for `FISP` will provide from `.env` or from GitHub actions secrets +s3: + access_key: ${QA_AWS_ACCESS_KEY} + secret_key: ${QA_AWS_SECRET_KEY} + bucket: ${QA_AWS_BUCKET} +# endpoint: https://${QA_AWS_BUCKET}.s3-fips.${QA_AWS_REGION}.amazonaws.com/ + region: ${QA_AWS_REGION} + acl: private + force_path_style: false + path: backup/{cluster}/{shard} + object_disk_path: object_disks/{cluster}/{shard} + disable_ssl: false + compression_format: tar + allow_multipart_download: false + concurrency: 4 + # storage_class: GLACIER, 6000 seconds test execution + storage_class: GLACIER_IR +api: + listen: :7171 + create_integration_tables: true + integration_tables_host: "localhost" + allow_parallel: false \ No newline at end of file diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 2c0551fe..74b1f16e 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -952,7 +952,7 @@ func TestFIPS(t *testing.T) { r.NoError(dockerExec("clickhouse", "bash", "-xce", "openssl req -subj \"/CN=localhost\" -addext \"subjectAltName = DNS:localhost,DNS:*.cluster.local\" -new -key /etc/clickhouse-backup/server-key.pem -out /etc/clickhouse-backup/server-req.csr")) r.NoError(dockerExec("clickhouse", "bash", "-xce", "openssl x509 -req -days 365000 -extensions SAN -extfile <(printf \"\\n[SAN]\\nsubjectAltName=DNS:localhost,DNS:*.cluster.local\") -in /etc/clickhouse-backup/server-req.csr -out /etc/clickhouse-backup/server-cert.pem -CA /etc/clickhouse-backup/ca-cert.pem -CAkey /etc/clickhouse-backup/ca-key.pem -CAcreateserial")) } - r.NoError(dockerExec("clickhouse", "bash", "-c", "cat /etc/clickhouse-backup/config.yml.fips-template | envsubst > /etc/clickhouse-backup/config.yml")) + r.NoError(dockerExec("clickhouse", "bash", "-xec", "cat /etc/clickhouse-backup/config.yml.fips-template | envsubst > /etc/clickhouse-backup/config.yml")) generateCerts("rsa", "4096", "") createSQL := "CREATE TABLE default.fips_table (v UInt64) ENGINE=MergeTree() ORDER BY tuple()" @@ -1158,6 +1158,20 @@ func TestDoRestoreConfigs(t *testing.T) { ch.chbackend.Close() } +func TestIntegrationS3Glacier(t *testing.T) { + if isTestShouldSkip("GLACIER_TESTS") { + t.Skip("Skipping GLACIER integration tests...") + return + } + r := require.New(t) + r.NoError(dockerCP("config-s3-glacier.yml", "clickhouse-backup:/etc/clickhouse-backup/config.yml.s3glacier-template")) + installDebIfNotExists(r, "clickhouse-backup", "curl", "gettext-base", "bsdmainutils", "dnsutils", "git", "ca-certificates") + r.NoError(dockerExec("clickhouse-backup", "bash", "-xec", "cat /etc/clickhouse-backup/config.yml.s3glacier-template | envsubst > /etc/clickhouse-backup/config.yml")) + dockerExecTimeout = 60 * time.Minute + runMainIntegrationScenario(t, "GLACIER") + dockerExecTimeout = 3 * time.Minute +} + func TestIntegrationS3(t *testing.T) { r := require.New(t) r.NoError(dockerCP("config-s3.yml", "clickhouse-backup:/etc/clickhouse-backup/config.yml")) @@ -2412,6 +2426,8 @@ func (ch *TestClickHouse) queryWithNoError(r *require.Assertions, query string, r.NoError(err) } +var dockerExecTimeout = 180 * time.Second + func dockerExec(container string, cmd ...string) error { out, err := dockerExecOut(container, cmd...) log.Info(out) @@ -2421,7 +2437,7 @@ func dockerExec(container string, cmd ...string) error { func dockerExecOut(container string, cmd ...string) (string, error) { dcmd := []string{"exec", container} dcmd = append(dcmd, cmd...) - return utils.ExecCmdOut(context.Background(), 180*time.Second, "docker", dcmd...) + return utils.ExecCmdOut(context.Background(), dockerExecTimeout, "docker", dcmd...) } func dockerCP(src, dst string) error { @@ -2490,7 +2506,7 @@ func installDebIfNotExists(r *require.Assertions, container string, pkgs ...stri container, "bash", "-xec", fmt.Sprintf( - "export DEBIAN_FRONTEND=noniteractive; if [[ '%d' != $(dpkg -l | grep -c -E \"%s\" ) ]]; then rm -fv /etc/apt/sources.list.d/clickhouse.list; find /etc/apt/ -type f -exec sed -i 's/ru.archive.ubuntu.com/archive.ubuntu.com/g' {} +; apt-get -y update; apt-get install --no-install-recommends -y %s; fi", + "export DEBIAN_FRONTEND=noniteractive; if [[ '%d' != $(dpkg -l | grep -c -E \"%s\" ) ]]; then rm -fv /etc/apt/sources.list.d/clickhouse.list; find /etc/apt/ -type f -name *.list -exec sed -i 's/ru.archive.ubuntu.com/archive.ubuntu.com/g' {} +; apt-get -y update; apt-get install --no-install-recommends -y %s; fi", len(pkgs), "^ii\\s+"+strings.Join(pkgs, "|^ii\\s+"), strings.Join(pkgs, " "), ), )) diff --git a/test/integration/run.sh b/test/integration/run.sh index 19a97479..82e6ac41 100755 --- a/test/integration/run.sh +++ b/test/integration/run.sh @@ -23,6 +23,8 @@ else export GCS_TESTS=${GCS_TESTS:-} fi +export GLACIER_TESTS=${GLACIER_TESTS:-} + export AZURE_TESTS=${AZURE_TESTS:-1} export RUN_ADVANCED_TESTS=${RUN_ADVANCED_TESTS:-1} export S3_DEBUG=${S3_DEBUG:-false} @@ -44,5 +46,5 @@ make clean build-race-docker build-race-fips-docker docker-compose -f ${CUR_DIR}/${COMPOSE_FILE} up -d docker-compose -f ${CUR_DIR}/${COMPOSE_FILE} exec minio mc alias list -go test -timeout 30m -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v ${CUR_DIR}/integration_test.go +go test -timeout ${TESTS_TIMEOUT:-30m} -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v ${CUR_DIR}/integration_test.go go tool covdata textfmt -i "${CUR_DIR}/_coverage_/" -o "${CUR_DIR}/_coverage_/coverage.out" \ No newline at end of file