Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to cache package payloads synchronously #1679

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/rez/cli/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ def setup_parser(parser, completions=False):
parser.add_argument(
"--no-pkg-cache", action="store_true",
help="Disable package caching")
parser.add_argument(
"--pkg-cache-mode", choices=["sync", "async"],
help="If provided, override the rezconfig's package_cache_async key. "
"If 'sync', the process will block until packages are cached. "
"If 'async', the process will not block while packages are cached.")
parser.add_argument(
"--pre-command", type=str, help=SUPPRESS)
PKG_action = parser.add_argument(
Expand Down Expand Up @@ -198,6 +203,13 @@ def command(opts, parser, extra_arg_groups=None):
rule = Rule.parse_rule(rule_str)
package_filter.add_inclusion(rule)

if opts.pkg_cache_mode == "async":
package_cache_mode = True
elif opts.pkg_cache_mode == "sync":
package_cache_mode = False
else:
package_cache_mode = None

# perform the resolve
context = ResolvedContext(
package_requests=request,
Expand All @@ -212,7 +224,8 @@ def command(opts, parser, extra_arg_groups=None):
caching=(not opts.no_cache),
suppress_passive=opts.no_passive,
print_stats=opts.stats,
package_caching=(not opts.no_pkg_cache)
package_caching=(not opts.no_pkg_cache),
package_cache_async=package_cache_mode,
)

success = (context.status == ResolverStatus.solved)
Expand Down
1 change: 1 addition & 0 deletions src/rez/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ def _parse_env_var(self, value):
"package_cache_during_build": Bool,
"package_cache_local": Bool,
"package_cache_same_device": Bool,
"package_cache_async": Bool,
"color_enabled": ForceOrBool,
"resolve_caching": Bool,
"cache_package_files": Bool,
Expand Down
118 changes: 103 additions & 15 deletions src/rez/package_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from rez.config import config
from rez.exceptions import PackageCacheError
from rez.vendor.lockfile import LockFile, NotLocked
from rez.vendor.progress.spinner import PixelSpinner
from rez.utils.filesystem import safe_listdir, safe_makedirs, safe_remove, \
forceful_rmtree
from rez.utils.colorize import ColorizedStreamHandler
Expand Down Expand Up @@ -69,6 +70,18 @@ class PackageCache(object):
VARIANT_PENDING = 5 #: Variant is pending caching
VARIANT_REMOVED = 6 #: Variant was deleted

STATUS_DESCRIPTIONS = {
VARIANT_NOT_FOUND: "was not found",
VARIANT_FOUND: "was found",
VARIANT_CREATED: "was created",
VARIANT_COPYING: "payload is still being copied to this cache",
VARIANT_COPY_STALLED: "payload copy has stalled.\nSee "
"https://rez.readthedocs.io/en/stable/caching.html#cleaning-the-cache "
"for more information.",
VARIANT_PENDING: "is pending caching",
VARIANT_REMOVED: "was deleted",
}

_FILELOCK_TIMEOUT = 10
_COPYING_TIME_INC = 0.2
_COPYING_TIME_MAX = 5.0
Expand Down Expand Up @@ -116,7 +129,7 @@ def get_cached_root(self, variant):

return rootpath

def add_variant(self, variant, force=False):
def add_variant(self, variant, force=False, wait_for_copying=False, logger=None):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think wait_for_copying should be True by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would change the default behavior from async to sync, which I don't think we want to do, do we?

"""Copy a variant's payload into the cache.

The following steps are taken to ensure muti-thread/proc safety, and to
Expand Down Expand Up @@ -147,6 +160,9 @@ def add_variant(self, variant, force=False):
variant (Variant): The variant to copy into this cache
force (bool): Copy the variant regardless. Use at your own risk (there
is no guarantee the resulting variant payload will be functional).
wait_for_copying (bool): Whether the caching step should block when one of the
pending variants is marked as already copying.
logger (None | Logger): If a logger is provided, log information to it.

Returns:
tuple: 2-tuple:
Expand Down Expand Up @@ -214,15 +230,40 @@ def add_variant(self, variant, force=False):
% package.repository
)

no_op_statuses = (
no_op_statuses = {
self.VARIANT_FOUND,
self.VARIANT_COPYING,
self.VARIANT_COPY_STALLED
)
self.VARIANT_COPY_STALLED,
}
if not wait_for_copying:
# Copying variants are only no-ops if we want to ignore them.
no_op_statuses.add(self.VARIANT_COPYING)

# variant already exists, or is being copied to cache by another thread/proc
status, rootpath = self._get_cached_root(variant)
if status in no_op_statuses:
if logger:
logger.warning(f"Not caching {variant.qualified_name}. "
f"Variant {self.STATUS_DESCRIPTIONS[status]}")
return (rootpath, status)

if wait_for_copying and status == self.VARIANT_COPYING:
spinner = PixelSpinner(f"Waiting for {variant.qualified_name} to finish copying. ")
JeanChristopheMorinPerso marked this conversation as resolved.
Show resolved Hide resolved
while status == self.VARIANT_COPYING:
spinner.next()
time.sleep(self._COPYING_TIME_INC)
status, rootpath = self._get_cached_root(variant)

# Status has changed, so report the change and return
if logger:
if status == self.VARIANT_FOUND:
# We have resolved into a satisfactory state
logger.info(
f"{variant.qualified_name} {self.STATUS_DESCRIPTIONS[status]}"
)
else:
logger.warning(
f"{variant.qualified_name} {self.STATUS_DESCRIPTIONS[status]}"
)
return (rootpath, status)

# 1.
Expand Down Expand Up @@ -371,28 +412,47 @@ def add_variants_async(self, variants):

This method is called when a context is created or sourced. Variants
are then added to the cache in a separate process.

.. deprecated:: 3.2.0
Use :method:`add_variants` instead.
"""
return self.add_variants(variants, package_cache_async=True)

# A prod install is necessary because add_variants_async works by
def add_variants(self, variants, package_cache_async=True):
"""Add the given variants to the package payload cache.
"""

# A prod install is necessary because add_variants works by
# starting a rez-pkg-cache proc, and this can only be done reliably in
# a prod install. On non-windows we could fork instead, but there would
# remain no good solution on windows.
#
if not system.is_production_rez_install:
raise PackageCacheError(
"PackageCache.add_variants_async is only supported in a "
"PackageCache.add_variants is only supported in a "
"production rez installation."
)

variants_ = []
cachable_statuses = {
self.VARIANT_NOT_FOUND,
}
if not package_cache_async:
# We want to monitor copying variants if we're synchronous.
# We also want to report that a status has been stalled, so we'll
# hand that off to the caching function as well
cachable_statuses.update({
self.VARIANT_COPYING,
self.VARIANT_COPY_STALLED,
})

# trim down to those variants that are cachable, and not already cached
for variant in variants:
if not variant.parent.is_cachable:
continue

status, _ = self._get_cached_root(variant)
if status == self.VARIANT_NOT_FOUND:
if status in cachable_statuses:
variants_.append(variant)

# if there are no variants to add, and no potential cleanup to do, then exit
Expand Down Expand Up @@ -433,6 +493,20 @@ def add_variants_async(self, variants):
with open(filepath, 'w') as f:
f.write(json.dumps(handle_dict))

if package_cache_async:
self._subprocess_package_caching_daemon(self.path)
else:
# syncronous caching
self._run_caching_operation(wait_for_copying=True)

@staticmethod
def _subprocess_package_caching_daemon(path):
"""
Run the package cache in a daemon process

Returns:
subprocess.Popen : The package caching daemon process
"""
# configure executable
if platform.system() == "Windows":
kwargs = {
Expand All @@ -449,7 +523,7 @@ def add_variants_async(self, variants):
raise RuntimeError("Did not find rez-pkg-cache executable")

# start caching subproc
args = [exe, "--daemon", self.path]
args = [exe, "--daemon", path]

try:
with open(os.devnull, 'w') as devnull:
Expand All @@ -460,8 +534,8 @@ def add_variants_async(self, variants):
else:
out_target = devnull

subprocess.Popen(
[exe, "--daemon", self.path],
isohedronpipeline marked this conversation as resolved.
Show resolved Hide resolved
return subprocess.Popen(
args,
stdout=out_target,
stderr=out_target,
**kwargs
Expand Down Expand Up @@ -564,6 +638,15 @@ def run_daemon(self):
if pid > 0:
sys.exit(0)

self._run_caching_operation(wait_for_copying=False)

def _run_caching_operation(self, wait_for_copying=True):
"""Copy pending variants.

Args:
wait_for_copying (bool): Whether the caching step should block when one of the
pending variants is marked as already copying.
"""
logger = self._init_logging()

# somewhere for the daemon to store stateful info
Expand All @@ -574,7 +657,7 @@ def run_daemon(self):
# copy variants into cache
try:
while True:
keep_running = self._run_daemon_step(state)
keep_running = self._run_caching_step(state, wait_for_copying=wait_for_copying)
if not keep_running:
break
except Exception:
Expand Down Expand Up @@ -688,12 +771,13 @@ def _lock(self):
except NotLocked:
pass

def _run_daemon_step(self, state):
def _run_caching_step(self, state, wait_for_copying=False):
logger = state["logger"]

# pick a random pending variant to copy
pending_filenames = set(os.listdir(self._pending_dir))
pending_filenames -= set(state.get("copying", set()))
if not wait_for_copying:
pending_filenames -= set(state.get("copying", set()))
if not pending_filenames:
return False

Expand All @@ -716,7 +800,11 @@ def _run_daemon_step(self, state):
t = time.time()

try:
rootpath, status = self.add_variant(variant)
rootpath, status = self.add_variant(
variant,
wait_for_copying=wait_for_copying,
logger=logger,
)

except PackageCacheError as e:
# variant cannot be cached, so remove as a pending variant
Expand Down
16 changes: 12 additions & 4 deletions src/rez/resolved_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def __init__(self, package_requests, verbosity=0, timestamp=None,
package_filter=None, package_orderers=None, max_fails=-1,
add_implicit_packages=True, time_limit=-1, callback=None,
package_load_callback=None, buf=None, suppress_passive=False,
print_stats=False, package_caching=None):
print_stats=False, package_caching=None, package_cache_async=None):
"""Perform a package resolve, and store the result.

Args:
Expand Down Expand Up @@ -205,6 +205,8 @@ def __init__(self, package_requests, verbosity=0, timestamp=None,
package_caching (bool|None): If True, apply package caching settings
as per the config. If None, enable as determined by config
setting :data:`package_cache_during_build`.
package_cache_async (bool|None): If True, cache packages asynchronously.
If None, use the config setting :data:`package_cache_async`
"""
self.load_path = None

Expand Down Expand Up @@ -246,9 +248,12 @@ def __init__(self, package_requests, verbosity=0, timestamp=None,
package_caching = config.package_cache_during_build
else:
package_caching = True

self.package_caching = package_caching

if package_cache_async is None:
package_cache_async = config.package_cache_async
self.package_cache_async = package_cache_async
JeanChristopheMorinPerso marked this conversation as resolved.
Show resolved Hide resolved

# patch settings
self.default_patch_lock = PatchLock.no_lock
self.patch_locks = {}
Expand Down Expand Up @@ -1838,13 +1843,16 @@ def _update_package_cache(self):
not self.success:
return

# see PackageCache.add_variants_async
# see PackageCache.add_variants
if not system.is_production_rez_install:
return

pkgcache = self._get_package_cache()
if pkgcache:
pkgcache.add_variants_async(self.resolved_packages)
pkgcache.add_variants(
self.resolved_packages,
self.package_cache_async,
)

@classmethod
def _init_context_tracking_payload_base(cls):
Expand Down
10 changes: 8 additions & 2 deletions src/rez/rezconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@
# Enable package caching during a package build.
package_cache_during_build = False

# Asynchronously cache packages. If this is false, resolves will block until
# all packages are cached.
#
# .. versionadded:: 3.2.0
package_cache_async = True

# Allow caching of local packages. You would only want to set this True for
# testing purposes.
package_cache_local = False
Expand Down Expand Up @@ -313,7 +319,7 @@
# This is useful as Platform.os might show different
# values depending on the availability of ``lsb-release`` on the system.
# The map supports regular expression, e.g. to keep versions.
#
#
# .. note::
# The following examples are not necessarily recommendations.
#
Expand Down Expand Up @@ -1119,7 +1125,7 @@

# Enables/disables colorization globally.
#
# .. warning::
# .. warning::
# Turned off for Windows currently as there seems to be a problem with the colorama module.
#
# May also set to the string ``force``, which will make rez output color styling
Expand Down
Loading
Loading