Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pixel_filter_util.py and pool_decorator function #464

Merged
merged 6 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions cooltools/lib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import bioframe

from multiprocessing import Pool


def assign_view_paired(
features,
Expand Down Expand Up @@ -506,3 +508,39 @@ def align_track_with_cooler(
clr_track.loc[~valid_bins, "value"] = np.nan

return clr_track[["chrom", "start", "end", "value"]]


def pool_decorator(func):
"""
A decorator function that enables multiprocessing for a given function.

Parameters
----------
func : callable
The function to be decorated.

Returns
-------
A wrapper function that enables multiprocessing for the given function.
"""

def wrapper(*args, **kwargs):
pool = None
if "nproc" in kwargs.keys():
if kwargs["nproc"] > 1:
pool = Pool(kwargs["nproc"])
mymap = pool.map
Copy link
Member

@nvictus nvictus Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mp.Pool.map is not always what you want to use in the decorated function. It is eager and will block the Python interpreter until it materializes all outputs in a list before returning.

This is problematic if the function you are mapping is not reductive (e.g. when making a pixel chunk iterator, all chunks will be materialized in memory). Instead you want to use a lazier map implementation like imap or in some cases imap_unordered.

Copy link
Member

@gfudenberg gfudenberg Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

option for a test for this function could be returning what map the decorator is specifying

Copy link
Member

@gfudenberg gfudenberg Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@gfudenberg gfudenberg Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pool.imap_unordered was used for cooler.balance b/c could hold nproc copies of the array in memory and this collection is then summed over.

else:
mymap = map

func(*args, **kwargs, map=mymap)

else:
raise TypeError(
"nproc must be specified as a keyword argument (e.g. nproc=1, nproc=5...)"
)

if pool != None:
pool.close()

return wrapper
214 changes: 214 additions & 0 deletions cooltools/sandbox/pixel_filter_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import numpy as np
from toolz import curry
import cooltools
import cooler
import functools
from multiprocessing import Pool
import logging

formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s : %(message)s")
logger = logging.getLogger("data_util")
logger.propagate = False # Disable propagation to the root logger

ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)


@curry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curry looks nice here!

def cis_total_ratio_filter(clr, thres=0.5):
"""
Filter out bins with low cis-to-total coverage ratio from a Cooler object.

Parameters
----------
clr : cooler.Cooler
A Cooler object containing Hi-C contact matrices.
thres : float
The threshold cis-to-total coverage ratio below which bins are considered bad.

Returns
-------
numpy.ndarray
An array of bin mask.

Note
----
This curried function accepts partial evaluation with only providing threshold value.
"""
if isinstance(clr, float):
raise TypeError(
"If only threshold value is provided, please use 'thres' keyword to set threshold value (e.g. thres=0.2)"
)
coverage = cooltools.coverage(clr)
cis_total_cov = coverage[0] / coverage[1]
bin_mask = cis_total_cov > thres

return bin_mask


def generate_bin_mask(
clr, filters=[None], store=False, store_name="cis_total_ratio_>_0.5_thres"
):
"""
Generates a binary mask for a given `clr` object based on a list of filters and thresholds.

Parameters
----------
clr : cooler.Cooler
A cooler object containing Hi-C contact matrices.
filters : list
A list of filter functions to apply to the contact matrices.
store : bool, optional
If True, store the results in the input cooler file when finished. Default is False.
store_name : str, optional
Name of the columns of the bin table to save bin mask.

Returns
-------
bin_mask : numpy.ndarray
A binary mask indicating which genomic bins pass all filters.
"""
if not isinstance(filters, list):
logger.error("filter_lst parameter takes a list")

bin_mask = np.array([True] * clr.bins().shape[0])
for filter in filters:
bin_mask *= filter(clr)

if store:
with clr.open("r+") as grp:
if store_name in grp["bins"]:
del grp["bins"][store_name]
h5opts = dict(compression="gzip", compression_opts=6)
grp["bins"].create_dataset(store_name, data=bin_mask, **h5opts, dtype=bool)

return bin_mask


def _pixel_filter(chunk_pixels, good_bins_index):
"""
Filters a chunk of pixels based on a list of good bin indices.

Parameters
----------
chunk_pixels : pandas.DataFrame
A DataFrame containing the pixels to be filtered. It must have columns 'bin1_id' and 'bin2_id'.
good_bins_index : list of int
A list of indices representing the good bins.

Returns
-------
pandas.DataFrame
A DataFrame containing only the pixels whose bin1_id and bin2_id are in good_bins_index.
"""

pixels_mask = chunk_pixels["bin1_id"].isin(good_bins_index) * chunk_pixels[
"bin2_id"
].isin(good_bins_index)
return chunk_pixels[pixels_mask]


def pixel_iter_chunks(clr, chunksize):
"""
Iterate over the pixels of a cooler object in chunks of a given size.

Parameters
----------
clr : cooler.Cooler
A cooler object containing Hi-C data.
chunksize : int
The size of each chunk of pixels to iterate over.

Yields
------
chunk : numpy.ndarray
A chunk of pixels of size `chunksize`.
"""
selector = clr.pixels()
for lo, hi in cooler.util.partition(0, len(selector), chunksize):
chunk = selector[lo:hi]
yield chunk


def pool_decorator(func):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably delete this one & import from lib?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

"""
A decorator function that enables multiprocessing for a given function.

Parameters
----------
func : callable
The function to be decorated.

Returns
-------
A wrapper function that enables multiprocessing for the given function.
"""

def wrapper(*args, **kwargs):
pool = None
if "nproc" in kwargs.keys():
if kwargs["nproc"] > 1:
logger.debug("Start to use pool")
pool = Pool(kwargs["nproc"])
mymap = pool.map
else:
mymap = map

func(*args, **kwargs, map=mymap)

else:
raise TypeError(
"nproc must be specified as a keyword argument (e.g. nproc=1, nproc=5...)"
)

if pool != None:
pool.close()

return wrapper


@pool_decorator
def create_filtered_cooler(
output_uri, clr, bin_mask, chunksize=10_000_000, nproc=1, map=map
):
"""
Create a filtered cooler file from a given cooler object and a binary mask of good bins.

Parameters
----------
output_uri : str
The URI of the output cooler file to be created.
clr : cooler.Cooler
The cooler object to be filtered.
bin_mask : numpy.ndarray
A boolean array indicating which bins to keep (True) and which to discard (False).
Must have the same length as the number of bins in the cooler object.
nproc : int, optional
The number of processes to use for parallelization. Default is 16.
chunksize : int, optional
The number of pixels to process per chunk. Default is 10,000,000.

Returns
-------
None
"""
if len(bin_mask) != clr.bins().shape[0]:
raise ValueError(
"bin_mask should have the same length as bin table in cool file"
)
logger.debug("Start to create cooler file...")
bin_table = clr.bins()[:]
good_bins_index = np.array(range(clr.bins().shape[0]))[bin_mask]
pixels_filter = functools.partial(_pixel_filter, good_bins_index=good_bins_index)

cooler.create_cooler(
output_uri,
bins=bin_table,
pixels=map(pixels_filter, pixel_iter_chunks(clr, chunksize)),
ordered=True,
columns=["count"],
)

logger.debug("done")
Loading