Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

audb.load_to() use cache for tables #221

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion audb/core/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from audb.core.load import (
filtered_dependencies,
load_header,
load_table,
)
from audb.core.lock import FolderLock

Expand Down
113 changes: 49 additions & 64 deletions audb/core/load_to.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import glob
import os
import shutil
import typing

import audbackend
Expand All @@ -12,13 +13,22 @@
dependencies,
latest_version,
)
from audb.core.cache import (
database_cache_root,
database_tmp_root,
)
from audb.core.dependencies import Dependencies
from audb.core.flavor import Flavor
from audb.core.load import (
database_tmp_root,
load_header,
load_table,
)


CSV = audformat.define.TableStorageFormat.CSV
PKL = audformat.define.TableStorageFormat.PICKLE


def _find_media(
db: audformat.Database,
db_root: str,
Expand Down Expand Up @@ -58,17 +68,17 @@ def _find_tables(
tables = []

def job(table: str):
file = f'db.{table}.csv'
file = f'{define.DB}.{table}.{CSV}'
full_file = os.path.join(db_root, file)
if not os.path.exists(full_file):
tables.append(file)
tables.append(table)
else:
checksum = audbackend.md5(full_file)
# if the table already exists
# we have to compare checksum
# in case it was altered by flavor
if checksum != deps.checksum(file): # pragma: no cover
tables.append(file)
tables.append(table)

audeer.run_tasks(
job,
Expand Down Expand Up @@ -129,35 +139,31 @@ def job(archive: str, version: str):
def _get_tables(
tables: typing.List[str],
db_root: str,
db_root_tmp: str,
db_name: str,
db: str,
deps: Dependencies,
backend: audbackend.Backend,
cache_root: str,
num_workers: typing.Optional[int],
verbose: bool,
):

def job(table: str):
# If a pickled version of the table exists,
# we have to remove it to make sure that
# later on the new CSV tables are loaded.
# This can happen if we upgrading an existing
# database to a different version.
path_pkl = os.path.join(
db_root, table
)[:-3] + audformat.define.TableStorageFormat.PICKLE
if os.path.exists(path_pkl):
os.remove(path_pkl)
archive = backend.join(
db_name,
define.DEPEND_TYPE_NAMES[define.DependType.META],
deps.archive(table),
)
backend.get_archive(archive, db_root_tmp, deps.version(table))
audeer.move_file(
os.path.join(db_root_tmp, table),
os.path.join(db_root, table),
name = db.name
version = deps.version(f'{define.DB}.{table}.{CSV}')
load_table(
name,
table,
version=version,
num_workers=num_workers,
cache_root=cache_root,
verbose=False,
)
db_cache_root = database_cache_root(name, version, cache_root)
for storage_format in [CSV, PKL]:
file = f'{define.DB}.{table}.{storage_format}'
shutil.copy(
os.path.join(db_cache_root, file),
os.path.join(db_root, file),
)

audeer.run_tasks(
job,
Expand All @@ -181,39 +187,6 @@ def _remove_empty_dirs(root):
os.rmdir(root)


def _save_database(
db: audformat.Database,
db_root: str,
db_root_tmp: str,
num_workers: typing.Optional[int],
verbose: bool,
):

for storage_format in [
audformat.define.TableStorageFormat.CSV,
audformat.define.TableStorageFormat.PICKLE,
]:
db.save(
db_root_tmp,
storage_format=storage_format,
update_other_formats=False,
num_workers=num_workers,
verbose=verbose,
)
audeer.move_file(
os.path.join(db_root_tmp, define.HEADER_FILE),
os.path.join(db_root, define.HEADER_FILE),
)
for path in glob.glob(
os.path.join(db_root_tmp, f'*.{storage_format}')
):
file = os.path.relpath(path, db_root_tmp)
audeer.move_file(
os.path.join(db_root_tmp, file),
os.path.join(db_root, file),
)


def load_to(
root: str,
name: str,
Expand Down Expand Up @@ -287,10 +260,22 @@ def load_to(

# get altered and new tables

db_header.save(db_root_tmp, header_only=True)
tables = _find_tables(db_header, db_root, deps, num_workers, verbose)
_get_tables(tables, db_root, db_root_tmp, name, deps, backend,
num_workers, verbose)
tables = _find_tables(
db_header,
db_root,
deps,
num_workers,
verbose,
)
_get_tables(
tables,
db_root,
db_header,
deps,
cache_root,
num_workers,
verbose,
)

# load database

Expand Down Expand Up @@ -332,7 +317,7 @@ def load_to(
# save database and remove the temporal directory
# to signal all files were correctly loaded

_save_database(db, db_root, db_root_tmp, num_workers, verbose)
db_header.save(db_root, header_only=True)
try:
_remove_empty_dirs(db_root_tmp)
except OSError: # pragma: no cover
Expand Down