From b034551d5196641eae482261194533712377f208 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 26 Jul 2022 13:35:10 +0200 Subject: [PATCH 01/10] audb.load_to() use cache for tables --- audb/core/load_to.py | 87 ++++++++++++++++---------------------------- 1 file changed, 32 insertions(+), 55 deletions(-) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index a84ce072..520771d9 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -12,10 +12,12 @@ dependencies, latest_version, ) +from audb.core.cache import database_tmp_root from audb.core.dependencies import Dependencies +from audb.core.flavor import Flavor from audb.core.load import ( - database_tmp_root, load_header, + _load_tables, ) @@ -58,17 +60,17 @@ def _find_tables( tables = [] def job(table: str): - file = f'db.{table}.csv' + file = f'{define.DB}.{table}.csv' full_file = os.path.join(db_root, file) if not os.path.exists(full_file): - tables.append(file) + tables.append(table) else: checksum = audbackend.md5(full_file) # if the table already exists # we have to compare checksum # in case it was altered by flavor if checksum != deps.checksum(file): # pragma: no cover - tables.append(file) + tables.append(table) audeer.run_tasks( job, @@ -126,48 +128,6 @@ def job(archive: str, version: str): ) -def _get_tables( - tables: typing.List[str], - db_root: str, - db_root_tmp: str, - db_name: str, - deps: Dependencies, - backend: audbackend.Backend, - num_workers: typing.Optional[int], - verbose: bool, -): - - def job(table: str): - # If a pickled version of the table exists, - # we have to remove it to make sure that - # later on the new CSV tables are loaded. - # This can happen if we upgrading an existing - # database to a different version. - path_pkl = os.path.join( - db_root, table - )[:-3] + audformat.define.TableStorageFormat.PICKLE - if os.path.exists(path_pkl): - os.remove(path_pkl) - archive = backend.join( - db_name, - define.DEPEND_TYPE_NAMES[define.DependType.META], - deps.archive(table), - ) - backend.get_archive(archive, db_root_tmp, deps.version(table)) - audeer.move_file( - os.path.join(db_root_tmp, table), - os.path.join(db_root, table), - ) - - audeer.run_tasks( - job, - params=[([table], {}) for table in tables], - num_workers=num_workers, - progress_bar=verbose, - task_description='Get tables', - ) - - def _remove_empty_dirs(root): r"""Remove directories, fails if it contains non-empty sub-folders.""" @@ -255,7 +215,6 @@ def load_to( version = latest_version(name) db_root = audeer.path(root) - db_root_tmp = database_tmp_root(db_root) # remove files with a wrong checksum # to ensure we load correct version @@ -278,6 +237,7 @@ def load_to( # load database header without tables from backend + db_root_tmp = database_tmp_root(db_root) db_header, backend = load_header( db_root_tmp, name, @@ -288,17 +248,34 @@ def load_to( # get altered and new tables db_header.save(db_root_tmp, header_only=True) - tables = _find_tables(db_header, db_root, deps, num_workers, verbose) - _get_tables(tables, db_root, db_root_tmp, name, deps, backend, - num_workers, verbose) + tables = _find_tables( + db_header, + db_root, + deps, + num_workers, + verbose, + ) + _load_tables( + tables, + backend, + db_root, + db_header, + version, + None, + deps, + Flavor(), + cache_root, + num_workers, + verbose, + ) + + # recreate tmp folder as it is deleted by _get_tables + db_root_tmp = database_tmp_root(db_root) # load database - # move header to root and load database ... - audeer.move_file( - os.path.join(db_root_tmp, define.HEADER_FILE), - os.path.join(db_root, define.HEADER_FILE), - ) + # save header to root and load database ... + db_header.save(db_root, header_only=True) try: db = audformat.Database.load( db_root, From 4eb673270796ce306f5eaf8664b82650a7ca36b4 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 26 Jul 2022 13:51:21 +0200 Subject: [PATCH 02/10] Store tables in cache --- audb/core/info.py | 1 - audb/core/load_to.py | 63 +++++++++++++++++++++++++++++++++++++------- 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/audb/core/info.py b/audb/core/info.py index 0702a0af..9f792a5c 100644 --- a/audb/core/info.py +++ b/audb/core/info.py @@ -13,7 +13,6 @@ from audb.core.load import ( filtered_dependencies, load_header, - load_table, ) from audb.core.lock import FolderLock diff --git a/audb/core/load_to.py b/audb/core/load_to.py index 520771d9..b2ea7fda 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -12,7 +12,10 @@ dependencies, latest_version, ) -from audb.core.cache import database_tmp_root +from audb.core.cache import ( + database_cache_root, + database_tmp_root, +) from audb.core.dependencies import Dependencies from audb.core.flavor import Flavor from audb.core.load import ( @@ -128,6 +131,53 @@ def job(archive: str, version: str): ) +def _get_tables( + tables: typing.List[str], + db_root: str, + db: str, + deps: Dependencies, + version: str, + backend: audbackend.Backend, + cache_root: str, + num_workers: typing.Optional[int], + verbose: bool, +): + db_cache_root = database_cache_root(db.name, version, cache_root) + _load_tables( + tables, + backend, + db_cache_root, + db, + version, + None, + deps, + Flavor(), + cache_root, + num_workers, + verbose, + ) + + def job(table_id: str): + # Move from cache to db_root + for storage_format in [ + audformat.define.TableStorageFormat.PICKLE, + audformat.define.TableStorageFormat.CSV, + ]: + file = f'db.{table_id}.{storage_format}' + audeer.move_file( + os.path.join(db_cache_root, file), + os.path.join(db_root, file), + ) + + audeer.run_tasks( + job, + params=[([table], {}) for table in tables], + num_workers=num_workers, + progress_bar=verbose, + task_description='Copy tables from cache', + ) + + def _remove_empty_dirs(root): r"""Remove directories, fails if it contains non-empty sub-folders.""" @@ -255,23 +305,18 @@ def load_to( num_workers, verbose, ) - _load_tables( + _get_tables( tables, - backend, db_root, db_header, - version, - None, deps, - Flavor(), + version, + backend, cache_root, num_workers, verbose, ) - # recreate tmp folder as it is deleted by _get_tables - db_root_tmp = database_tmp_root(db_root) - # load database # save header to root and load database ... From ca04b6b97996ae171cc9f862a77a8c0b7105439b Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 26 Jul 2022 14:16:46 +0200 Subject: [PATCH 03/10] Use load_table() --- audb/core/load_to.py | 52 +++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index b2ea7fda..d67931b5 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -20,7 +20,7 @@ from audb.core.flavor import Flavor from audb.core.load import ( load_header, - _load_tables, + load_table, ) @@ -137,33 +137,36 @@ def _get_tables( db: str, deps: Dependencies, version: str, - backend: audbackend.Backend, cache_root: str, num_workers: typing.Optional[int], verbose: bool, ): - db_cache_root = database_cache_root(db.name, version, cache_root) - _load_tables( - tables, - backend, - db_cache_root, - db, - version, - None, - deps, - Flavor(), - cache_root, - num_workers, - verbose, - ) - def job(table_id: str): - # Move from cache to db_root - for storage_format in [ - audformat.define.TableStorageFormat.PICKLE, - audformat.define.TableStorageFormat.CSV, - ]: - file = f'db.{table_id}.{storage_format}' + pickle = audformat.define.TableStorageFormat.PICKLE + csv = audformat.define.TableStorageFormat.CSV + + def job(table: str): + # If a pickled version of the table exists, + # we have to remove it to make sure that + # later on the new CSV tables are loaded. + # This can happen if we upgrading an existing + # database to a different version. + path_pkl = os.path.join(db_root, f'db.{table}.{pickle}') + if os.path.exists(path_pkl): + os.remove(path_pkl) + name = db.name + version = deps.version(f'db.{table}.{csv}') + load_table( + name, + table, + version=version, + num_workers=num_workers, + cache_root=cache_root, + verbose=False, + ) + db_cache_root = database_cache_root(name, version, cache_root) + for storage_format in [csv, pickle]: + file = f'db.{table}.{storage_format}' audeer.move_file( os.path.join(db_cache_root, file), os.path.join(db_root, file), @@ -174,7 +177,7 @@ def job(table_id: str): params=[([table], {}) for table in tables], num_workers=num_workers, progress_bar=verbose, - task_description='Copy tables from cache', + task_description='Load tables', ) @@ -311,7 +314,6 @@ def load_to( db_header, deps, version, - backend, cache_root, num_workers, verbose, From 5ac233eef3856ba91ec4a210402d86058311f8a0 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 26 Jul 2022 14:38:23 +0200 Subject: [PATCH 04/10] Fix: copy tables from cache instead of move --- audb/core/load_to.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index d67931b5..f90e5a59 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -1,5 +1,6 @@ import glob import os +import shutil import typing import audbackend @@ -151,9 +152,9 @@ def job(table: str): # later on the new CSV tables are loaded. # This can happen if we upgrading an existing # database to a different version. - path_pkl = os.path.join(db_root, f'db.{table}.{pickle}') - if os.path.exists(path_pkl): - os.remove(path_pkl) + # path_pkl = os.path.join(db_root, f'db.{table}.{pickle}') + # if os.path.exists(path_pkl): + # os.remove(path_pkl) name = db.name version = deps.version(f'db.{table}.{csv}') load_table( @@ -167,7 +168,7 @@ def job(table: str): db_cache_root = database_cache_root(name, version, cache_root) for storage_format in [csv, pickle]: file = f'db.{table}.{storage_format}' - audeer.move_file( + shutil.copy( os.path.join(db_cache_root, file), os.path.join(db_root, file), ) From 6ba2f247f9f4904568fce3dd4c93eebd77e3b251 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 26 Jul 2022 15:31:53 +0200 Subject: [PATCH 05/10] Speedup database saving --- audb/core/load_to.py | 56 ++++++++++---------------------------------- 1 file changed, 13 insertions(+), 43 deletions(-) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index f90e5a59..103b84be 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -25,6 +25,10 @@ ) +CSV = audformat.define.TableStorageFormat.CSV +PKL = audformat.define.TableStorageFormat.PICKLE + + def _find_media( db: audformat.Database, db_root: str, @@ -64,7 +68,7 @@ def _find_tables( tables = [] def job(table: str): - file = f'{define.DB}.{table}.csv' + file = f'{define.DB}.{table}.{CSV}' full_file = os.path.join(db_root, file) if not os.path.exists(full_file): tables.append(table) @@ -143,9 +147,6 @@ def _get_tables( verbose: bool, ): - pickle = audformat.define.TableStorageFormat.PICKLE - csv = audformat.define.TableStorageFormat.CSV - def job(table: str): # If a pickled version of the table exists, # we have to remove it to make sure that @@ -156,7 +157,7 @@ def job(table: str): # if os.path.exists(path_pkl): # os.remove(path_pkl) name = db.name - version = deps.version(f'db.{table}.{csv}') + version = deps.version(f'db.{table}.{CSV}') load_table( name, table, @@ -166,7 +167,7 @@ def job(table: str): verbose=False, ) db_cache_root = database_cache_root(name, version, cache_root) - for storage_format in [csv, pickle]: + for storage_format in [CSV, PKL]: file = f'db.{table}.{storage_format}' shutil.copy( os.path.join(db_cache_root, file), @@ -195,39 +196,6 @@ def _remove_empty_dirs(root): os.rmdir(root) -def _save_database( - db: audformat.Database, - db_root: str, - db_root_tmp: str, - num_workers: typing.Optional[int], - verbose: bool, -): - - for storage_format in [ - audformat.define.TableStorageFormat.CSV, - audformat.define.TableStorageFormat.PICKLE, - ]: - db.save( - db_root_tmp, - storage_format=storage_format, - update_other_formats=False, - num_workers=num_workers, - verbose=verbose, - ) - audeer.move_file( - os.path.join(db_root_tmp, define.HEADER_FILE), - os.path.join(db_root, define.HEADER_FILE), - ) - for path in glob.glob( - os.path.join(db_root_tmp, f'*.{storage_format}') - ): - file = os.path.relpath(path, db_root_tmp) - audeer.move_file( - os.path.join(db_root_tmp, file), - os.path.join(db_root, file), - ) - - def load_to( root: str, name: str, @@ -301,7 +269,6 @@ def load_to( # get altered and new tables - db_header.save(db_root_tmp, header_only=True) tables = _find_tables( db_header, db_root, @@ -322,8 +289,11 @@ def load_to( # load database - # save header to root and load database ... - db_header.save(db_root, header_only=True) + # move header to root and load database ... + audeer.move_file( + os.path.join(db_root_tmp, define.HEADER_FILE), + os.path.join(db_root, define.HEADER_FILE), + ) try: db = audformat.Database.load( db_root, @@ -357,7 +327,7 @@ def load_to( # save database and remove the temporal directory # to signal all files were correctly loaded - _save_database(db, db_root, db_root_tmp, num_workers, verbose) + db_header.save(db_root, header_only=True) try: _remove_empty_dirs(db_root_tmp) except OSError: # pragma: no cover From 0caab58d33ab0d5b040d4c786cbda19b79f413a1 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 26 Jul 2022 16:10:45 +0200 Subject: [PATCH 06/10] Use define.DB instead of db --- audb/core/load_to.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index 103b84be..3f39a2f5 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -157,7 +157,7 @@ def job(table: str): # if os.path.exists(path_pkl): # os.remove(path_pkl) name = db.name - version = deps.version(f'db.{table}.{CSV}') + version = deps.version(f'{define.DB}.{table}.{CSV}') load_table( name, table, @@ -168,7 +168,7 @@ def job(table: str): ) db_cache_root = database_cache_root(name, version, cache_root) for storage_format in [CSV, PKL]: - file = f'db.{table}.{storage_format}' + file = f'{define.DB}.{table}.{storage_format}' shutil.copy( os.path.join(db_cache_root, file), os.path.join(db_root, file), From 8fc36cea6b01a76c04856a76d61651e47f316823 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 26 Jul 2022 16:12:00 +0200 Subject: [PATCH 07/10] Remove unused version argument --- audb/core/load_to.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index 3f39a2f5..ffcf3781 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -141,7 +141,6 @@ def _get_tables( db_root: str, db: str, deps: Dependencies, - version: str, cache_root: str, num_workers: typing.Optional[int], verbose: bool, @@ -281,7 +280,6 @@ def load_to( db_root, db_header, deps, - version, cache_root, num_workers, verbose, From af965d4ca7455b1217a855eb7ef24eac5574727a Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 26 Jul 2022 16:13:00 +0200 Subject: [PATCH 08/10] Fix typo --- audb/core/load_to.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index ffcf3781..ade2d79f 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -178,7 +178,7 @@ def job(table: str): params=[([table], {}) for table in tables], num_workers=num_workers, progress_bar=verbose, - task_description='Load tables', + task_description='Get tables', ) From ab2e51a3bb772223256c20d459614f5f3d5847f5 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 26 Jul 2022 16:13:36 +0200 Subject: [PATCH 09/10] Remove unneeded comment --- audb/core/load_to.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index ade2d79f..adb2301d 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -147,14 +147,6 @@ def _get_tables( ): def job(table: str): - # If a pickled version of the table exists, - # we have to remove it to make sure that - # later on the new CSV tables are loaded. - # This can happen if we upgrading an existing - # database to a different version. - # path_pkl = os.path.join(db_root, f'db.{table}.{pickle}') - # if os.path.exists(path_pkl): - # os.remove(path_pkl) name = db.name version = deps.version(f'{define.DB}.{table}.{CSV}') load_table( From 3d5de1e88133cac23060acd67fac480e1df7e693 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 26 Jul 2022 16:14:37 +0200 Subject: [PATCH 10/10] Move variable to old position --- audb/core/load_to.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index adb2301d..71a84676 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -228,6 +228,7 @@ def load_to( version = latest_version(name) db_root = audeer.path(root) + db_root_tmp = database_tmp_root(db_root) # remove files with a wrong checksum # to ensure we load correct version @@ -250,7 +251,6 @@ def load_to( # load database header without tables from backend - db_root_tmp = database_tmp_root(db_root) db_header, backend = load_header( db_root_tmp, name,