Skip to content

Commit

Permalink
fix --partitions and --restore-database-mapping, --restore-table-mapp…
Browse files Browse the repository at this point in the history
…ing works together, fix #1018
  • Loading branch information
Slach committed Oct 4, 2024
1 parent 7ee1178 commit 16f385f
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 13 deletions.
1 change: 1 addition & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# v2.6.2
BUG FIXES
- fix rare corner case, for system.disks query behavior fix[1007](https://github.com/Altinity/clickhouse-backup/issues/1007)
- fix --partitions and --restore-database-mapping, --restore-table-mapping works together, fix [1018](https://github.com/Altinity/clickhouse-backup/issues/1018)

# v2.6.1
BUG FIXES
Expand Down
11 changes: 11 additions & 0 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ func (b *Backuper) getTablesForRestoreLocal(ctx context.Context, backupName stri
if err != nil {
return nil, nil, err
}
partitionsNames, err = changePartitionsToAdjustDatabaseMapping(partitionsNames, b.cfg.General.RestoreDatabaseMapping)
if err != nil {
return nil, nil, err
}
}

// if restore-table-mapping is specified, create table in mapping rules instead of in backup files.
Expand All @@ -273,6 +277,10 @@ func (b *Backuper) getTablesForRestoreLocal(ctx context.Context, backupName stri
if err != nil {
return nil, nil, err
}
partitionsNames, err = changePartitionsToAdjustTableMapping(partitionsNames, b.cfg.General.RestoreTableMapping)
if err != nil {
return nil, nil, err
}
}

if len(tablesForRestore) == 0 {
Expand Down Expand Up @@ -839,6 +847,9 @@ func (b *Backuper) restoreBackupRelatedDir(backupName, backupPrefixDir, destinat
// execute ALTER TABLE db.table DROP PARTITION for corner case when we try to restore backup with the same structure, https://github.com/Altinity/clickhouse-backup/issues/756
func (b *Backuper) dropExistPartitions(ctx context.Context, tablesForRestore ListOfTables, partitionsIdMap map[metadata.TableTitle][]string, partitions []string, version int) error {
for _, table := range tablesForRestore {
if !strings.Contains(table.Query, "MergeTree") {
continue
}
partitionsIds, isExists := partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Table}]
if !isExists {
return fmt.Errorf("`%s`.`%s` doesn't contains %#v partitions", table.Database, table.Table, partitions)
Expand Down
22 changes: 22 additions & 0 deletions pkg/backup/table_pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,28 @@ func changeTableQueryToAdjustTableMapping(originTables *ListOfTables, tableMapRu
return nil
}

func changePartitionsToAdjustDatabaseMapping(partitionsNames map[metadata.TableTitle][]string, databaseMapping map[string]string) (map[metadata.TableTitle][]string, error) {
adjustedPartitionsNames := map[metadata.TableTitle][]string{}
for tableTitle, partitions := range partitionsNames {
if targetDb, isMapped := databaseMapping[tableTitle.Database]; isMapped {
tableTitle.Database = targetDb
}
adjustedPartitionsNames[tableTitle] = partitions
}
return adjustedPartitionsNames, nil
}

func changePartitionsToAdjustTableMapping(partitionsNames map[metadata.TableTitle][]string, tableMapping map[string]string) (map[metadata.TableTitle][]string, error) {
adjustedPartitionsNames := map[metadata.TableTitle][]string{}
for tableTitle, partitions := range partitionsNames {
if targetTable, isMapped := tableMapping[tableTitle.Table]; isMapped {
tableTitle.Table = targetTable
}
adjustedPartitionsNames[tableTitle] = partitions
}
return adjustedPartitionsNames, nil
}

func filterPartsAndFilesByPartitionsFilter(tableMetadata metadata.TableMetadata, partitionsFilter common.EmptyMap) {
if len(partitionsFilter) > 0 {
for disk, parts := range tableMetadata.Parts {
Expand Down
20 changes: 18 additions & 2 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,6 @@ func TestIntegrationS3Glacier(t *testing.T) {
env.Cleanup(t, r)
}


func TestIntegrationCustomKopia(t *testing.T) {
env, r := NewTestEnvironment(t)
env.InstallDebIfNotExists(r, "clickhouse-backup", "ca-certificates", "curl")
Expand Down Expand Up @@ -2260,7 +2259,7 @@ func TestRestoreMapping(t *testing.T) {
fullCleanup(t, r, env, []string{testBackupName}, []string{"local"}, databaseList, false, false, "config-database-mapping.yml")

env.queryWithNoError(r, "CREATE DATABASE database1")
env.queryWithNoError(r, "CREATE TABLE database1.t1 (dt DateTime, v UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/database1/t1','{replica}') PARTITION BY toYYYYMM(dt) ORDER BY dt")
env.queryWithNoError(r, "CREATE TABLE database1.t1 (dt DateTime, v UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/database1/t1','{replica}') PARTITION BY v % 10 ORDER BY dt")
env.queryWithNoError(r, "CREATE TABLE database1.d1 AS database1.t1 ENGINE=Distributed('{cluster}', 'database1', 't1')")
if compareVersion(os.Getenv("CLICKHOUSE_VERSION"), "22.3") < 0 {
env.queryWithNoError(r, "CREATE TABLE database1.t2 AS database1.t1 ENGINE=ReplicatedMergeTree('/clickhouse/tables/database1/t2','{replica}') PARTITION BY toYYYYMM(dt) ORDER BY dt")
Expand All @@ -2280,6 +2279,7 @@ func TestRestoreMapping(t *testing.T) {
log.Debug().Msg("Check result database1")
env.queryWithNoError(r, "INSERT INTO database1.t1 SELECT '2023-01-01 00:00:00', number FROM numbers(10)")
checkRecordset(1, 20, "SELECT count() FROM database1.t1")
checkRecordset(1, 20, "SELECT count() FROM database1.t2")
checkRecordset(1, 20, "SELECT count() FROM database1.d1")
checkRecordset(1, 20, "SELECT count() FROM database1.mv1")
checkRecordset(1, 20, "SELECT count() FROM database1.v1")
Expand All @@ -2292,13 +2292,29 @@ func TestRestoreMapping(t *testing.T) {

log.Debug().Msg("Check result database-2")
checkRecordset(1, 10, "SELECT count() FROM `database-2`.t3")
checkRecordset(1, 10, "SELECT count() FROM `database-2`.t4")
checkRecordset(1, 10, "SELECT count() FROM `database-2`.d2")
checkRecordset(1, 10, "SELECT count() FROM `database-2`.mv2")
checkRecordset(1, 10, "SELECT count() FROM `database-2`.v2")

log.Debug().Msg("Check database1 not exists")
checkRecordset(1, 0, "SELECT count() FROM system.databases WHERE name='database1' SETTINGS empty_result_for_aggregation_by_empty_set=0")

log.Debug().Msg("Drop database2")
r.NoError(env.dropDatabase("database2"))

log.Debug().Msg("Restore data with partitions")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/config-database-mapping.yml", "restore", "--restore-database-mapping", "database1:database-2", "--restore-table-mapping", "t1:t3,t2:t4,d1:d2,mv1:mv2,v1:v2", "--partitions", "3", "--partitions", "database1.t2:202201", "--tables", "database1.*", testBackupName)

log.Debug().Msg("Check result database-2 after restore with partitions")
// t1->t3 restored only 1 partition with name 3 partition with 1 rows
// t1->t3 restored only 1 partition with name 3 partition with 10 rows
checkRecordset(1, 1, "SELECT count() FROM `database-2`.t3")
checkRecordset(1, 10, "SELECT count() FROM `database-2`.t4")
checkRecordset(1, 1, "SELECT count() FROM `database-2`.d2")
checkRecordset(1, 10, "SELECT count() FROM `database-2`.mv2")
checkRecordset(1, 1, "SELECT count() FROM `database-2`.v2")

fullCleanup(t, r, env, []string{testBackupName}, []string{"local"}, databaseList, true, true, "config-database-mapping.yml")
env.Cleanup(t, r)
}
Expand Down
33 changes: 22 additions & 11 deletions test/integration/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
set -x
set -e

export CUR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
CUR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
export CUR_DIR
mkdir -p "${CUR_DIR}/_coverage_/"
rm -rf "${CUR_DIR}/_coverage_/*"

Expand All @@ -14,7 +15,8 @@ if [[ "${CLICKHOUSE_VERSION}" =~ ^2[1-9]+ || "${CLICKHOUSE_VERSION}" == "head" ]
else
export CLICKHOUSE_IMAGE=${CLICKHOUSE_IMAGE:-yandex/clickhouse-server}
fi
export CLICKHOUSE_BACKUP_BIN="$(pwd)/clickhouse-backup/clickhouse-backup-race"
CLICKHOUSE_BACKUP_BIN="$(pwd)/clickhouse-backup/clickhouse-backup-race"
export CLICKHOUSE_BACKUP_BIN
export LOG_LEVEL=${LOG_LEVEL:-info}
export TEST_LOG_LEVEL=${TEST_LOG_LEVEL:-info}

Expand Down Expand Up @@ -42,18 +44,27 @@ else
export COMPOSE_FILE=docker-compose.yml
fi

for id in $(docker ps -q); do
docker stop "${id}" --time 1
docker rm -f "${id}"
done

pids=()
for project in $(docker compose -f ${CUR_DIR}/${COMPOSE_FILE} ls --all -q); do
docker compose -f ${CUR_DIR}/${COMPOSE_FILE} --project-name ${project} --progress plain down --remove-orphans --volumes --timeout=1 &
project_ids=()
for project in $(docker compose -f "${CUR_DIR}/${COMPOSE_FILE}" ls --all -q); do
docker compose -f "${CUR_DIR}/${COMPOSE_FILE}" --project-name "${project}" --progress plain down --remove-orphans --volumes --timeout=1 &
pids+=($!)
project_ids+=("${project}")
done

for pid in "${pids[@]}"; do
for index in "${!pids[@]}"; do
pid=${pids[index]}
project_id=${project_ids[index]}
if wait "$pid"; then
echo "$pid docker compose down successful"
else
echo "$pid docker compose down failed. Exiting."
docker network inspect "${project_id}_default"
exit 1 # Exit with an error code if any command fails
fi
done
Expand All @@ -63,12 +74,12 @@ make clean build-race-docker build-race-fips-docker

export RUN_PARALLEL=${RUN_PARALLEL:-1}

docker compose -f ${CUR_DIR}/${COMPOSE_FILE} --progress=quiet pull
docker compose -f "${CUR_DIR}/${COMPOSE_FILE}" --progress=quiet pull

pids=()
project_ids=()
for ((i = 0; i < RUN_PARALLEL; i++)); do
docker compose -f ${CUR_DIR}/${COMPOSE_FILE} --project-name project${i} --progress plain up -d &
docker compose -f "${CUR_DIR}/${COMPOSE_FILE}" --project-name project${i} --progress plain up -d &
pids+=($!)
project_ids+=("project${i}")
done
Expand All @@ -79,14 +90,14 @@ for index in "${!pids[@]}"; do
if wait "$pid"; then
echo "$pid docker compose up successful"
else
docker compose -f ${CUR_DIR}/${COMPOSE_FILE} --project-name project${i} --progress plain logs
docker compose -f "${CUR_DIR}/${COMPOSE_FILE}" --project-name "${project_id}" --progress plain logs
echo "$pid the docker compose up failed."
exit 1 # Exit with an error code if any command fails
fi
done

set +e
go test -parallel ${RUN_PARALLEL} -race -timeout ${TEST_TIMEOUT:-60m} -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v ${CUR_DIR}/integration_test.go
go test -parallel "${RUN_PARALLEL}" -race -timeout "${TEST_TIMEOUT:-60m}" -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v "${CUR_DIR}/integration_test.go"
TEST_FAILED=$?
set -e

Expand All @@ -96,8 +107,8 @@ fi

if [[ "1" == "${CLEAN_AFTER:-0}" || "0" == "${TEST_FAILED}" ]]; then
pids=()
for project in $(docker compose -f ${CUR_DIR}/${COMPOSE_FILE} ls --all -q); do
docker compose -f ${CUR_DIR}/${COMPOSE_FILE} --project-name ${project} --progress plain down --remove-orphans --volumes --timeout=1 &
for project in $(docker compose -f "${CUR_DIR}/${COMPOSE_FILE}" ls --all -q); do
docker compose -f "${CUR_DIR}/${COMPOSE_FILE}" --project-name "${project}" --progress plain down --remove-orphans --volumes --timeout=1 &
pids+=($!)
done

Expand Down

0 comments on commit 16f385f

Please sign in to comment.