Skip to content

Commit

Permalink
cleanup logging for 2.4.16
Browse files Browse the repository at this point in the history
  • Loading branch information
Slach committed Jan 8, 2024
1 parent 1ef1953 commit 5b5855b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku
return disksToPartsMap, realSize, err
}
realSize[disk.Name] += size
log.WithField("disk", disk.Name).WithField("duration", utils.HumanizeDuration(time.Since(start))).Info("object_disk data uploaded")
log.WithField("disk", disk.Name).WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("object_disk data uploaded")
}
// Clean all the files under the shadowPath, cause UNFREEZE unavailable
if version < 21004000 {
Expand Down
19 changes: 15 additions & 4 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/Altinity/clickhouse-backup/pkg/common"
Expand Down Expand Up @@ -811,8 +812,11 @@ func (b *Backuper) restoreDataRegularByParts(ctx context.Context, backupName str
}

func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName string, backupTable metadata.TableMetadata, diskMap, diskTypes map[string]string) error {
log := apexLog.WithFields(apexLog.Fields{"operation": "downloadObjectDiskParts"})
start := time.Now()
log := apexLog.WithFields(apexLog.Fields{
"operation": "downloadObjectDiskParts",
"table": fmt.Sprintf("%s.%s", backupTable.Database, backupTable.Table),
})
size := int64(0)
dbAndTableDir := path.Join(common.TablePathEncode(backupTable.Database), common.TablePathEncode(backupTable.Table))
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -843,8 +847,10 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, diskName); err != nil {
return err
}
start := time.Now()
downloadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx)
downloadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))
sizeMutex := sync.Mutex{}
for _, part := range parts {
partPath := path.Join(diskMap[diskName], "backup", backupName, "shadow", dbAndTableDir, diskName, part.Name)
walkErr := filepath.Walk(partPath, func(fPath string, fInfo fs.FileInfo, err error) error {
Expand Down Expand Up @@ -879,8 +885,12 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
} else {
return fmt.Errorf("incompatible object_disk[%s].Type=%s amd remote_storage: %s", diskName, diskType, b.cfg.General.RemoteStorage)
}
if err = object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, srcBucket, srcKey, storageObject.ObjectRelativePath); err != nil {
if copiedSize, copyObjectErr := object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, srcBucket, srcKey, storageObject.ObjectRelativePath); copyObjectErr != nil {
return fmt.Errorf("object_disk.CopyObject error: %v", err)
} else {
sizeMutex.Lock()
size += copiedSize
sizeMutex.Unlock()
}
}
return nil
Expand All @@ -894,9 +904,10 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
if wgWaitErr := downloadObjectDiskPartsWorkingGroup.Wait(); wgWaitErr != nil {
return fmt.Errorf("one of downloadObjectDiskParts go-routine return error: %v", wgWaitErr)
}
log.WithField("disk", diskName).WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("object_disk data downloaded")
}
}
log.WithField("duration", utils.HumanizeDuration(time.Since(start))).Debugf("done")

return nil
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/object_disk/object_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ func DeleteFile(ctx context.Context, diskName, remotePath string) error {
return remoteStorage.DeleteFile(ctx, remotePath)
}

/*
func DeleteFileWithContent(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName, localPath string) error {
if err := InitCredentialsAndConnections(ctx, ch, cfg, diskName); err != nil {
return err
Expand Down Expand Up @@ -629,13 +630,13 @@ func GetFileSize(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Con
}
return fileInfo.Size(), nil
}
*/

func CopyObject(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName, srcBucket, srcKey, dstPath string) error {
func CopyObject(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName, srcBucket, srcKey, dstPath string) (int64, error) {
if err := InitCredentialsAndConnections(ctx, ch, cfg, diskName); err != nil {
return err
return 0, err
}
connection := DisksConnections[diskName]
remoteStorage := connection.GetRemoteStorage()
_, err := remoteStorage.CopyObject(ctx, srcBucket, srcKey, dstPath)
return err
return remoteStorage.CopyObject(ctx, srcBucket, srcKey, dstPath)
}

0 comments on commit 5b5855b

Please sign in to comment.