Skip to content

Commit

Permalink
Merge pull request #719 from Altinity/add_s3_restore_object
Browse files Browse the repository at this point in the history
execute RestoreObject for GLACIER storageClass
  • Loading branch information
Slach authored Aug 8, 2023
2 parents 562fed9 + 7f316b6 commit 0a2c71a
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 29 deletions.
1 change: 1 addition & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ IMPROVEMENTS
- Backup/Restore RBAC related objects from Zookeeper via direct connection to zookeeper/keeper, fix [604](https://github.com/Altinity/clickhouse-backup/issues/604)
- Add `SHARDED_OPERATION_MODE` option, to easy create backup for sharded cluster, available values `none` (no sharding), `table` (table granularity), `database` (database granularity), `first-replica` (on the lexicographically sorted first active replica), thanks @mskwon, fix [639](https://github.com/Altinity/clickhouse-backup/issues/639), fix [648](https://github.com/Altinity/clickhouse-backup/pull/648)
- Add support for `compression_format: none` for upload and download backups created with `--rbac` / `--rbac-only` or `--configs` / `--configs-only` options, fix [713](https://github.com/Altinity/clickhouse-backup/issues/713)
- Add support for s3 `GLACIER` storage class, when GET return error, then, it requires 5 minutes per key and restore could be slow. Use `GLACIER_IR`, it looks more robust, fix [614](https://github.com/Altinity/clickhouse-backup/issues/614)

BUG FIXES
- fix possible create backup failures during UNFREEZE not exists tables, affected 2.2.7+ version, fix [704](https://github.com/Altinity/clickhouse-backup/issues/704)
Expand Down
24 changes: 15 additions & 9 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,21 @@ func (b *Backuper) createBackupRBAC(ctx context.Context, backupPath string, disk
if err != nil {
return 0, err
}
log.Debugf("copy %s -> %s", accessPath, rbacBackup)
copyErr := recursiveCopy.Copy(accessPath, rbacBackup, recursiveCopy.Options{
Skip: func(srcinfo os.FileInfo, src, dest string) (bool, error) {
rbacDataSize += uint64(srcinfo.Size())
return false, nil
},
})
if copyErr != nil {
return 0, copyErr
var fInfo os.FileInfo
if fInfo, err = os.Stat(accessPath); err != nil && !os.IsNotExist(err) {
return 0, err
}
if fInfo.IsDir() {
log.Debugf("copy %s -> %s", accessPath, rbacBackup)
copyErr := recursiveCopy.Copy(accessPath, rbacBackup, recursiveCopy.Options{
Skip: func(srcinfo os.FileInfo, src, dest string) (bool, error) {
rbacDataSize += uint64(srcinfo.Size())
return false, nil
},
})
if copyErr != nil {
return 0, copyErr
}
}
replicatedRBACDataSize, err := b.createBackupRBACReplicated(ctx, rbacBackup)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (bd *BackupDestination) UploadCompressedStream(ctx context.Context, baseLoc
defer bar.Finish()
pipeBuffer := buffer.New(BufferSize)
body, w := nio.Pipe(pipeBuffer)
g, ctx := errgroup.WithContext(context.Background())
g, ctx := errgroup.WithContext(ctx)

var writerErr, readerErr error
g.Go(func() error {
Expand Down
92 changes: 78 additions & 14 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,6 @@ func (s *S3) Connect(ctx context.Context) error {
if s.Config.Region != "" {
awsConfig.Region = s.Config.Region
}
if s.Config.AccessKey != "" && s.Config.SecretKey != "" {
awsConfig.Credentials = credentials.StaticCredentialsProvider{
Value: aws.Credentials{
AccessKeyID: s.Config.AccessKey,
SecretAccessKey: s.Config.SecretKey,
},
}
}

awsRoleARN := os.Getenv("AWS_ROLE_ARN")
if s.Config.AssumeRoleARN != "" || awsRoleARN != "" {
stsClient := sts.NewFromConfig(awsConfig)
Expand All @@ -142,9 +133,18 @@ func (s *S3) Connect(ctx context.Context) error {
)
}

if s.Config.AccessKey != "" && s.Config.SecretKey != "" {
awsConfig.Credentials = credentials.StaticCredentialsProvider{
Value: aws.Credentials{
AccessKeyID: s.Config.AccessKey,
SecretAccessKey: s.Config.SecretKey,
},
}
}

if s.Config.Debug {
awsConfig.Logger = newS3Logger(s.Log)
awsConfig.ClientLogMode = aws.LogRetries | aws.LogRequest | aws.LogResponse
awsConfig.ClientLogMode = aws.LogRetries | aws.LogRequest | aws.LogResponseWithBody
}

httpTransport := http.DefaultTransport
Expand Down Expand Up @@ -197,14 +197,36 @@ func (s *S3) Close(ctx context.Context) error {
}

func (s *S3) GetFileReader(ctx context.Context, key string) (io.ReadCloser, error) {
resp, err := s.client.GetObject(ctx, &s3.GetObjectInput{
getRequest := &s3.GetObjectInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(path.Join(s.Config.Path, key)),
})
}
resp, err := s.client.GetObject(ctx, getRequest)
if err != nil {
var opError *smithy.OperationError
if errors.As(err, &opError) {
var httpErr *awsV2http.ResponseError
if errors.As(opError.Err, &httpErr) {
var stateErr *s3types.InvalidObjectState
if errors.As(httpErr, &stateErr) {
if strings.Contains(string(stateErr.StorageClass), "GLACIER") {
s.Log.Warnf("GetFileReader %s, storageClass %s receive error: %s", key, stateErr.StorageClass, stateErr.Error())
if restoreErr := s.restoreObject(ctx, key); restoreErr != nil {
s.Log.Warnf("restoreObject %s, return error: %v", key, restoreErr)
return nil, err
}
if resp, err = s.client.GetObject(ctx, getRequest); err != nil {
s.Log.Warnf("second GetObject %s, return error: %v", key, err)
return nil, err
}
return resp.Body, nil
}
}
}
return nil, err
}
return nil, err
}

return resp.Body, nil
}

Expand Down Expand Up @@ -337,7 +359,7 @@ func (s *S3) StatFile(ctx context.Context, key string) (RemoteFile, error) {
}
return nil, err
}
return &s3File{head.ContentLength, *head.LastModified, key}, nil
return &s3File{head.ContentLength, *head.LastModified, string(head.StorageClass), key}, nil
}

func (s *S3) Walk(ctx context.Context, s3Path string, recursive bool, process func(ctx context.Context, r RemoteFile) error) error {
Expand All @@ -355,6 +377,7 @@ func (s *S3) Walk(ctx context.Context, s3Path string, recursive bool, process fu
s3Files <- &s3File{
c.Size,
*c.LastModified,
string(c.StorageClass),
strings.TrimPrefix(*c.Key, path.Join(s.Config.Path, s3Path)),
}
}
Expand Down Expand Up @@ -580,9 +603,46 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (
return srcSize, nil
}

func (s *S3) restoreObject(ctx context.Context, key string) error {
restoreRequest := s3.RestoreObjectInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(path.Join(s.Config.Path, key)),
RestoreRequest: &s3types.RestoreRequest{
Days: 1,
GlacierJobParameters: &s3types.GlacierJobParameters{
Tier: s3types.Tier("Expedited"),
},
},
}
_, err := s.client.RestoreObject(ctx, &restoreRequest)
if err != nil {
return err
}
i := 0
for {
headObjectRequest := &s3.HeadObjectInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(path.Join(s.Config.Path, key)),
}
res, err := s.client.HeadObject(ctx, headObjectRequest)
if err != nil {
return fmt.Errorf("restoreObject: failed to head %s object metadata, %v", path.Join(s.Config.Path, key), err)
}

if res.Restore != nil && *res.Restore == "ongoing-request=\"true\"" {
i += 1
s.Log.Warnf("%s still not restored, will wait %d seconds", key, i*5)
time.Sleep(time.Duration(i*5) * time.Second)
} else {
return nil
}
}
}

type s3File struct {
size int64
lastModified time.Time
storageClass string
name string
}

Expand All @@ -597,3 +657,7 @@ func (f *s3File) Name() string {
func (f *s3File) LastModified() time.Time {
return f.lastModified
}

func (f *s3File) StorageClass() string {
return f.storageClass
}
44 changes: 44 additions & 0 deletions test/integration/config-s3-glacier.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
general:
disable_progress_bar: true
remote_storage: s3
upload_concurrency: 4
download_concurrency: 4
skip_tables:
- " system.*"
- "INFORMATION_SCHEMA.*"
- "information_schema.*"
- "_temporary_and_external_tables.*"
restore_schema_on_cluster: "{cluster}"
clickhouse:
host: clickhouse
port: 9440
username: backup
password: meow=& 123?*%# МЯУ
secure: true
skip_verify: true
sync_replicated_tables: true
timeout: 1h
restart_command: bash -c 'echo "FAKE RESTART"'
backup_mutations: true
# secrets for `FISP` will provide from `.env` or from GitHub actions secrets
s3:
access_key: ${QA_AWS_ACCESS_KEY}
secret_key: ${QA_AWS_SECRET_KEY}
bucket: ${QA_AWS_BUCKET}
# endpoint: https://${QA_AWS_BUCKET}.s3-fips.${QA_AWS_REGION}.amazonaws.com/
region: ${QA_AWS_REGION}
acl: private
force_path_style: false
path: backup/{cluster}/{shard}
object_disk_path: object_disks/{cluster}/{shard}
disable_ssl: false
compression_format: tar
allow_multipart_download: false
concurrency: 4
# storage_class: GLACIER, 6000 seconds test execution
storage_class: GLACIER_IR
api:
listen: :7171
create_integration_tables: true
integration_tables_host: "localhost"
allow_parallel: false
24 changes: 20 additions & 4 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,6 @@ func TestFIPS(t *testing.T) {
ch.connectWithWait(r, 1*time.Second, 10*time.Second)
defer ch.chbackend.Close()
fipsBackupName := fmt.Sprintf("fips_backup_%d", rand.Int())
r.NoError(dockerExec("clickhouse", "rm", "-fv", "/etc/apt/sources.list.d/clickhouse.list"))
installDebIfNotExists(r, "clickhouse", "curl", "gettext-base", "bsdmainutils", "dnsutils", "git", "ca-certificates")
r.NoError(dockerCP("config-s3-fips.yml", "clickhouse:/etc/clickhouse-backup/config.yml.fips-template"))
r.NoError(dockerExec("clickhouse", "update-ca-certificates"))
Expand All @@ -953,11 +952,12 @@ func TestFIPS(t *testing.T) {
r.NoError(dockerExec("clickhouse", "bash", "-xce", "openssl req -subj \"/CN=localhost\" -addext \"subjectAltName = DNS:localhost,DNS:*.cluster.local\" -new -key /etc/clickhouse-backup/server-key.pem -out /etc/clickhouse-backup/server-req.csr"))
r.NoError(dockerExec("clickhouse", "bash", "-xce", "openssl x509 -req -days 365000 -extensions SAN -extfile <(printf \"\\n[SAN]\\nsubjectAltName=DNS:localhost,DNS:*.cluster.local\") -in /etc/clickhouse-backup/server-req.csr -out /etc/clickhouse-backup/server-cert.pem -CA /etc/clickhouse-backup/ca-cert.pem -CAkey /etc/clickhouse-backup/ca-key.pem -CAcreateserial"))
}
r.NoError(dockerExec("clickhouse", "bash", "-c", "cat /etc/clickhouse-backup/config.yml.fips-template | envsubst > /etc/clickhouse-backup/config.yml"))
r.NoError(dockerExec("clickhouse", "bash", "-xec", "cat /etc/clickhouse-backup/config.yml.fips-template | envsubst > /etc/clickhouse-backup/config.yml"))

generateCerts("rsa", "4096", "")
createSQL := "CREATE TABLE default.fips_table (v UInt64) ENGINE=MergeTree() ORDER BY tuple()"
ch.queryWithNoError(r, createSQL)
ch.queryWithNoError(r, "INSERT INTO default.fips_table SELECT number FROM numbers(1000)")
r.NoError(dockerExec("clickhouse", "bash", "-ce", "clickhouse-backup-fips create_remote --tables=default.fips_table "+fipsBackupName))
r.NoError(dockerExec("clickhouse", "bash", "-ce", "clickhouse-backup-fips delete local "+fipsBackupName))
r.NoError(dockerExec("clickhouse", "bash", "-ce", "clickhouse-backup-fips restore_remote --tables=default.fips_table "+fipsBackupName))
Expand Down Expand Up @@ -1158,6 +1158,20 @@ func TestDoRestoreConfigs(t *testing.T) {
ch.chbackend.Close()
}

func TestIntegrationS3Glacier(t *testing.T) {
if isTestShouldSkip("GLACIER_TESTS") {
t.Skip("Skipping GLACIER integration tests...")
return
}
r := require.New(t)
r.NoError(dockerCP("config-s3-glacier.yml", "clickhouse-backup:/etc/clickhouse-backup/config.yml.s3glacier-template"))
installDebIfNotExists(r, "clickhouse-backup", "curl", "gettext-base", "bsdmainutils", "dnsutils", "git", "ca-certificates")
r.NoError(dockerExec("clickhouse-backup", "bash", "-xec", "cat /etc/clickhouse-backup/config.yml.s3glacier-template | envsubst > /etc/clickhouse-backup/config.yml"))
dockerExecTimeout = 60 * time.Minute
runMainIntegrationScenario(t, "GLACIER")
dockerExecTimeout = 3 * time.Minute
}

func TestIntegrationS3(t *testing.T) {
r := require.New(t)
r.NoError(dockerCP("config-s3.yml", "clickhouse-backup:/etc/clickhouse-backup/config.yml"))
Expand Down Expand Up @@ -2412,6 +2426,8 @@ func (ch *TestClickHouse) queryWithNoError(r *require.Assertions, query string,
r.NoError(err)
}

var dockerExecTimeout = 180 * time.Second

func dockerExec(container string, cmd ...string) error {
out, err := dockerExecOut(container, cmd...)
log.Info(out)
Expand All @@ -2421,7 +2437,7 @@ func dockerExec(container string, cmd ...string) error {
func dockerExecOut(container string, cmd ...string) (string, error) {
dcmd := []string{"exec", container}
dcmd = append(dcmd, cmd...)
return utils.ExecCmdOut(context.Background(), 180*time.Second, "docker", dcmd...)
return utils.ExecCmdOut(context.Background(), dockerExecTimeout, "docker", dcmd...)
}

func dockerCP(src, dst string) error {
Expand Down Expand Up @@ -2490,7 +2506,7 @@ func installDebIfNotExists(r *require.Assertions, container string, pkgs ...stri
container,
"bash", "-xec",
fmt.Sprintf(
"export DEBIAN_FRONTEND=noniteractive; if [[ '%d' != $(dpkg -l | grep -c -E \"%s\" ) ]]; then rm -fv /etc/apt/sources.list.d/clickhouse.list; find /etc/apt/ -type f -exec sed -i 's/ru.archive.ubuntu.com/archive.ubuntu.com/g' {} +; apt-get -y update; apt-get install --no-install-recommends -y %s; fi",
"export DEBIAN_FRONTEND=noniteractive; if [[ '%d' != $(dpkg -l | grep -c -E \"%s\" ) ]]; then rm -fv /etc/apt/sources.list.d/clickhouse.list; find /etc/apt/ -type f -name *.list -exec sed -i 's/ru.archive.ubuntu.com/archive.ubuntu.com/g' {} +; apt-get -y update; apt-get install --no-install-recommends -y %s; fi",
len(pkgs), "^ii\\s+"+strings.Join(pkgs, "|^ii\\s+"), strings.Join(pkgs, " "),
),
))
Expand Down
4 changes: 3 additions & 1 deletion test/integration/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ else
export GCS_TESTS=${GCS_TESTS:-}
fi

export GLACIER_TESTS=${GLACIER_TESTS:-}

export AZURE_TESTS=${AZURE_TESTS:-1}
export RUN_ADVANCED_TESTS=${RUN_ADVANCED_TESTS:-1}
export S3_DEBUG=${S3_DEBUG:-false}
Expand All @@ -44,5 +46,5 @@ make clean build-race-docker build-race-fips-docker
docker-compose -f ${CUR_DIR}/${COMPOSE_FILE} up -d
docker-compose -f ${CUR_DIR}/${COMPOSE_FILE} exec minio mc alias list

go test -timeout 30m -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v ${CUR_DIR}/integration_test.go
go test -timeout ${TESTS_TIMEOUT:-30m} -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v ${CUR_DIR}/integration_test.go
go tool covdata textfmt -i "${CUR_DIR}/_coverage_/" -o "${CUR_DIR}/_coverage_/coverage.out"

0 comments on commit 0a2c71a

Please sign in to comment.