Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fit_partial() for ImplicitALSWrapperModel and LightFMWrapperModel #179

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `Debias` mechanism for classification, ranking and auc metrics. New parameter `is_debiased` to `calc_from_confusion_df`, `calc_per_user_from_confusion_df` methods of classification metrics, `calc_from_fitted`, `calc_per_user_from_fitted` methods of auc and rankning (`MAP`) metrics, `calc_from_merged`, `calc_per_user_from_merged` methods of ranking (`NDCG`, `MRR`) metrics. ([#152](https://github.com/MobileTeleSystems/RecTools/pull/152))
- `nbformat >= 4.2.0` dependency to `[visuals]` extra ([#169](https://github.com/MobileTeleSystems/RecTools/pull/169))
- Implement `fit_partial()` for `ImplicitALSWrapperModel` and `LightFMWrapperModel` ([#179](https://github.com/MobileTeleSystems/RecTools/pull/179))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very excited about the new feature. Thank you for the PR!

Let's discuss what we're trying to achieve with fit_partial.
In implicit ALS has partial_fit_users and partial_fit_items methods that allow to update weights only for specific parts of users/items and also support feeding new users and items to already fitted model.
LightFM model has different logic. It has fit_partial method. From docs: "Unlike fit, repeated calls to this method will cause training to resume from the current model state". This method does not support adding new users/items. And it is not meant to update only parts of embeddings.

What exactly do we want to achieve with fit_partial in rectools? I would say that first step would be to follow LightFM logic. We are missing the ability to resume training, out fit always refits models from the beginning, just like LightFM fit.

As for partial_fit_users and partial_fit_items - these are very model specific ALS methods. I wouldn't call them fit_partial. I would move them to a separate issue and PR.

@chezou @feldlime what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

My motivation for adding fit_partial is to enable incremental training. For example, weekly batches for full training and daily incremental training to adopt new and existing user transactions.

I don't seriously care about preserving old users/items. Rather, shortening training time would be important.

Actually, LightFM has a known workaround for adding a new user/item. lyst/lightfm#347 (comment)
I included that workaround into the patch. ea33c05

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great feature for LigtFM, thank you!

So there are two major usage cases for fit_partial:

  1. Incremental training and support for adding new items and features. This is executed when dataset is changing during calls.
  2. Epochal training. This is executed when dataset is staying the same during calls. It just resumes training from the previous state like Lightfm.fit_partial

Right now in LightFM you implemented support for both cases but in ALS only for one of them (incremental). Could you please add support for ALS epochal training (with NotImplementedError if features are present)? Otherwise users would be very confused for different behaviour of the same method. And could you please add an optional epochs argument to fit_partial of ALS?

We would also need tests for both models. With fitting model for 10 epochs with fit and 10 times fitting models with fit_partial for 1 epoch. And finals embeds should be equal.

How do you feel about the whole story?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Let me add tests for re-training with the same dataset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the tests 117c5c7

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blondered I think 117c5c7 shows how it works for the use case 2.

Please let me know if you'd like to add anything else.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Sorry for the delay. It's summer and it was a vacation time :)


### Fixed
- `display()` method in `MetricsApp` ([#169](https://github.com/MobileTeleSystems/RecTools/pull/169))
Expand Down
88 changes: 85 additions & 3 deletions rectools/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from scipy import sparse

from rectools import Columns
from rectools.types import InternalIdsArray

from .features import AbsentIdError, DenseFeatures, Features, SparseFeatures
from .identifiers import IdMap
Expand Down Expand Up @@ -91,6 +92,14 @@ def get_hot_item_features(self) -> tp.Optional[Features]:
return None
return self.item_features.take(range(self.n_hot_items))

def get_hot_users(self) -> InternalIdsArray:
"""Return internal ids of hot users."""
return self.interactions.df[Columns.User].unique()
feldlime marked this conversation as resolved.
Show resolved Hide resolved

def get_hot_items(self) -> InternalIdsArray:
"""Return internal ids of hot items."""
return self.interactions.df[Columns.Item].unique()

@classmethod
def construct(
cls,
Expand Down Expand Up @@ -138,9 +147,7 @@ def construct(
Dataset
Container with all input data, converted to `rectools` structures.
"""
for col in (Columns.User, Columns.Item):
if col not in interactions_df:
raise KeyError(f"Column '{col}' must be present in `interactions_df`")
cls._check_columns_present(interactions_df)
user_id_map = IdMap.from_values(interactions_df[Columns.User].values)
item_id_map = IdMap.from_values(interactions_df[Columns.Item].values)
interactions = Interactions.from_raw(interactions_df, user_id_map, item_id_map)
Expand Down Expand Up @@ -194,6 +201,12 @@ def _make_features(
except Exception as e: # pragma: no cover
raise RuntimeError(f"An error has occurred while constructing {feature_type} features: {e!r}")

@staticmethod
def _check_columns_present(interactions_df: pd.DataFrame) -> None:
for col in (Columns.User, Columns.Item):
if col not in interactions_df:
raise KeyError(f"Column '{col}' must be present in `interactions_df`")

def get_user_item_matrix(
self,
include_weights: bool = True,
Expand Down Expand Up @@ -245,3 +258,72 @@ def get_raw_interactions(self, include_weight: bool = True, include_datetime: bo
pd.DataFrame
"""
return self.interactions.to_external(self.user_id_map, self.item_id_map, include_weight, include_datetime)

def construct_new_datasets(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably construct_new_dataset would be more reasonable since only one dataset is returned

Copy link
Collaborator

@blondered blondered Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd call it build_new_dataset_on_id_map or smth like it. But we need to think about the conception first.

Right now it's not working properly in the conceptions that we use. We have a rule that all hot users (users that have interactions) always go before warm users (users that have only features). And we use this assumption in some of our impertant internal logic.

New method can break this rule because new users with interactions will have ids after warm users from previous dataset state. So we really can't make this a public method in the library.

Options are:

  • Rewrite logic. We need to keep only hot users ids from previous state. Then add new hot users ids. Then add new warm user ids. I suppose we are dropping all previous warm users because we are totally changing the actual data. (All the same is for items) In this case I'd call it or rebuild_with_new_data or construct_with_new_data or replace_data
  • Just don't make it a public method. Move it to the tests if we need it there. But then again write a correct public method for required logic. Also tests will be needed for it anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. I wasn't aware of the warm users' assumption. And yeah, I expected this to be a controversial change. And that's why I haven't written the test yet 😅

Originally, I expected for RecTools' Dataset to do similar to LightFM's Dataset.fit_partial(), but I found that RecTools Dataset is frozen.

I'm okay to make it private, or I can rebuild IdMap by concatenating new and old users/items something like:

old_hot_user_id_map = IdMap.from_dict({e: i for e,i in  zip(ds.user_id_map.convert_to_external(ds.get_hot_users()), ds.get_hot_users())})
old_hot_item_id_map = ...

new_user_id = old_hot_user_id_map.add_ids(...

I'm not confident if it's okay to drop the previous warm user yet though,.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I attempt to drop old warm ids, but I face UnknownIdError.

raise UnknownIdError("All ids in `df` must be present in `id_map`")

Maybe, storing hot/warm marks like hot_ids in IdMap can be an option like np.array([1, 0, 1]) where 1 represents hot and 0 represents warm id. Then, we can fetch hot or warm users/items explicitly.

Copy link
Collaborator

@blondered blondered Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall conception of ids in rectools Dataset are the following:

  1. All user ids are row indexes in both user-interactions matrix and user features matrix and vector model embeddings matrix
  2. Because we don't wont empty rows in user-interactions matrix we can't have warm user ids before hot ones. Empty rows are bad because some models will add them to loss calculation which is conceptually wrong.
  3. So we always need to have all hot user ids -> then all warm user ids. They have the same order in user features matrix
  4. That's why we don't need hot_ids flag. But we always need to make sure we keep the assumption of hot/warm ids order

As for keeping warm user ids in the dataset it is very simple. If those users don't have features in user features matrix in the final dataset then they need to be dropped from user_id_map. Cause this is the only application of those warm ids - to get features for them (during recommend method)
So if you are dropping feature values for some users - you should drop their ids from IdMap.
Models do not store any embeds for warm users so nothing will break.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the error, one possible workaround is to keep all (old) hot user features inside the dataset.
For sparse features we can drop the values for memory efficiency.
For dense features don' really see what we can do using current implementations. Need to think about it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, since we are not dropping "old hot" user embeddings from the models, why should we drop their features from dataset? For recommend method of incrementally trained model we still need all features in the dataset for all users that ever were hot.
So I think that no-hot-any-more users that are not present in new user_features_df should keep all their values in user_features matrix.
But all users (both warm and hot) that are present in new user_features_df should have all their values updated (real service scenario).

Now about corner cases. I think that for incremental training we need to require exactly the same number of features, feature names and feature types as were present in the original dataset. At least in this PR. Or we gonna dive too deep. So all this checks for correct data in features are needed. And cat_user_features and make_dense_user_features should not be passed to rebuild_with_new_data.

Items follow same logic as users.

Wow. That's a tough story :) What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let me revive my dropping code and see what it happens on CI. I'll share it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed to drop old warm ids e8929c5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blondered I think I completed implementing the "Rewrite logic" option, but let me know if you find anything else.

self,
interactions_df: pd.DataFrame,
user_features_df: tp.Optional[pd.DataFrame] = None,
cat_user_features: tp.Iterable[str] = (),
make_dense_user_features: bool = False,
item_features_df: tp.Optional[pd.DataFrame] = None,
cat_item_features: tp.Iterable[str] = (),
make_dense_item_features: bool = False,
) -> "Dataset":
"""
Create new dataset by merging user_id_map and item_id_map.
This function is useful when you want to use fit_partial.

Parameters
----------
interactions_df : pd.DataFrame
New interactions table.
The same structure as in `construct` method.
user_features_df, item_features_df : pd.DataFrame, optional
New user (item) explicit features table.
The same structure as in `construct` method.
cat_user_features, cat_item_features : tp.Iterable[str], default ``()``
List of categorical user (item) feature names for
`SparseFeatures.from_flatten` method.
Used only if `make_dense_user_features` (`make_dense_item_features`)
flag is ``False`` and `user_features_df` (`item_features_df`) is not ``None``.
make_dense_user_features, make_dense_item_features : bool, default ``False``
Create user (item) features as dense or sparse.
Used only if `user_features_df` (`item_features_df`) is not ``None``.
- if ``False``, `SparseFeatures.from_flatten` method will be used;
- if ``True``, `DenseFeatures.from_dataframe` method will be used.

Returns
-------
Dataset
New dataset with added data.
"""
self._check_columns_present(interactions_df)

new_user_id_map = self.user_id_map.add_ids(interactions_df[Columns.User].values, raise_if_already_present=False)
new_item_id_map = self.item_id_map.add_ids(interactions_df[Columns.Item].values, raise_if_already_present=False)
new_interactions = Interactions.from_raw(interactions_df, new_user_id_map, new_item_id_map)

new_user_features, new_user_id_map = self._make_features(
user_features_df,
cat_user_features,
make_dense_user_features,
new_user_id_map,
Columns.User,
"user",
)
new_item_features, new_item_id_map = self._make_features(
item_features_df,
cat_item_features,
make_dense_item_features,
new_item_id_map,
Columns.Item,
"item",
)

return Dataset(
new_user_id_map,
new_item_id_map,
new_interactions,
new_user_features,
new_item_features,
)
20 changes: 20 additions & 0 deletions rectools/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ def fit(self: T, dataset: Dataset, *args: tp.Any, **kwargs: tp.Any) -> T:
def _fit(self, dataset: Dataset, *args: tp.Any, **kwargs: tp.Any) -> None:
raise NotImplementedError()

def fit_partial(self: T, dataset: Dataset, *args: tp.Any, **kwargs: tp.Any) -> T:
"""
Partial fit model.

Parameters
----------
dataset : Dataset
Dataset with input data.

Returns
-------
self
"""
self._fit_partial(dataset, *args, **kwargs)
self.is_fitted = True
return self

def _fit_partial(self, dataset: Dataset, *args: tp.Any, **kwargs: tp.Any) -> None:
raise NotImplementedError()

def recommend(
self,
users: AnyIds,
Expand Down
19 changes: 19 additions & 0 deletions rectools/models/implicit_als.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,25 @@ def _fit(self, dataset: Dataset) -> None: # type: ignore
self.verbose,
)

def _fit_partial(self, dataset: Dataset) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed it a lot. Let's not do ALS support for fit_partial for now.
Main reasons are:

  1. ALS wrapper has different behaviour then LightFM wrapper in current implementation. ALS cannot do fit_partial on model that has not yet been fitted. We can't leave it as it is.
  2. Features support should be here for ALS. So we're gonna have to rewrite some of our code for features support in fit to add support for fit_partial and also for epochs parameter. And it's a long story. It should be in another PR.

# deepcopy does not copy model.item_factors and model.user_factors.
feldlime marked this conversation as resolved.
Show resolved Hide resolved
# That causes issues with partial fit.
users = dataset.get_hot_users()
items = dataset.get_hot_items()

ui_csr = dataset.get_user_item_matrix(
include_weights=True, include_warm_users=True, include_warm_items=True
).astype(np.float32)
iu_csr = ui_csr[:, items].T.tocsr(copy=False)

# TODO: implement partial fit for explicit features
if dataset.get_hot_item_features() or dataset.get_hot_user_features():
raise NotImplementedError("fit_partial with explicit features is not implemented")

for _ in range(self.model.iterations):
self.model.partial_fit_users(users, ui_csr[users])
self.model.partial_fit_items(items, iu_csr)

def _get_users_factors(self, dataset: Dataset) -> Factors:
return Factors(get_users_vectors(self.model))

Expand Down
18 changes: 18 additions & 0 deletions rectools/models/lightfm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import typing as tp
from copy import deepcopy
from typing import Any

import numpy as np
from lightfm import LightFM
Expand Down Expand Up @@ -89,6 +90,23 @@ def _fit(self, dataset: Dataset) -> None: # type: ignore
verbose=self.verbose > 0,
)

def _fit_partial(self, dataset: Dataset, *args: Any, **kwargs: Any) -> None: # type: ignore
self.model = deepcopy(self._model)
feldlime marked this conversation as resolved.
Show resolved Hide resolved

ui_coo = dataset.get_user_item_matrix(include_weights=True).tocoo(copy=False)
user_features = self._prepare_features(dataset.get_hot_user_features(), dataset.n_hot_users)
item_features = self._prepare_features(dataset.get_hot_item_features(), dataset.n_hot_items)

feldlime marked this conversation as resolved.
Show resolved Hide resolved
self.model.fit_partial(
ui_coo,
user_features=user_features,
item_features=item_features,
sample_weight=ui_coo,
epochs=self.n_epochs,
feldlime marked this conversation as resolved.
Show resolved Hide resolved
num_threads=self.n_threads,
verbose=self.verbose > 0,
)

@staticmethod
def _prepare_features(features: tp.Optional[Features], n_hot: int) -> tp.Optional[sparse.csr_matrix]:
if features is None:
Expand Down
60 changes: 60 additions & 0 deletions tests/models/test_implicit_als.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,63 @@ def test_i2i_with_warm_and_cold_items(self, use_gpu: bool, dataset: Dataset) ->
dataset=dataset,
k=2,
)

def test_fit_partial(self, use_gpu: bool, dataset: Dataset) -> None:
base_model = AlternatingLeastSquares(factors=8, num_threads=2, use_gpu=use_gpu, random_state=1)
model = ImplicitALSWrapperModel(model=base_model).fit(dataset)
data = [
[150, 11],
[150, 12],
[150, 15],
]
new_interactions = pd.DataFrame(data, columns=Columns.UserItem)
new_interactions[Columns.Weight] = 1
new_interactions[Columns.Datetime] = "2021-09-10"
new_dataset = dataset.construct_new_datasets(new_interactions)
model.fit_partial(new_dataset)
actual = model.recommend(
users=[150], # new user
feldlime marked this conversation as resolved.
Show resolved Hide resolved
dataset=new_dataset,
k=2,
filter_viewed=False,
)
expected = pd.DataFrame(
{
Columns.User: [150, 150],
Columns.Item: [12, 11],
Columns.Rank: [1, 2],
}
)
pd.testing.assert_frame_equal(actual.drop(columns=Columns.Score), expected)
pd.testing.assert_frame_equal(
actual.sort_values([Columns.User, Columns.Score], ascending=[True, False]).reset_index(drop=True), actual
)

def test_fit_partial_with_features(self, use_gpu: bool, dataset: Dataset) -> None:
user_id_map = IdMap.from_values(["u1", "u2", "u3"])
item_id_map = IdMap.from_values(["i1", "i2", "i3"])
interactions_df = pd.DataFrame(
[
["u1", "i1", 0.1, "2021-09-09"],
["u2", "i1", 0.1, "2021-09-09"],
["u2", "i2", 0.5, "2021-09-05"],
["u2", "i3", 0.2, "2021-09-05"],
["u1", "i3", 0.2, "2021-09-05"],
["u3", "i1", 0.2, "2021-09-05"],
],
columns=[Columns.User, Columns.Item, Columns.Weight, Columns.Datetime],
)
interactions = Interactions.from_raw(interactions_df, user_id_map, item_id_map)
user_features_df = pd.DataFrame({"id": ["u1", "u2", "u3"], "f1": [0.3, 0.4, 0.5]})
user_features = DenseFeatures.from_dataframe(user_features_df, user_id_map)
item_features_df = pd.DataFrame({"id": ["i1", "i1"], "feature": ["f1", "f2"], "value": [2.1, 100]})
item_features = SparseFeatures.from_flatten(item_features_df, item_id_map)
dataset = Dataset(user_id_map, item_id_map, interactions, user_features, item_features)

# In case of big number of iterations there are differences between CPU and GPU results
base_model = AlternatingLeastSquares(factors=32, num_threads=2, use_gpu=use_gpu)
self._init_model_factors_inplace(base_model, dataset)

model = ImplicitALSWrapperModel(model=base_model, fit_features_together=False).fit(dataset)
with pytest.raises(NotImplementedError, match="fit_partial with explicit features is not implemented"):
model.fit_partial(dataset)
32 changes: 32 additions & 0 deletions tests/models/test_lightfm.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,38 @@ def test_with_weights(self, interactions_df: pd.DataFrame) -> None:
actual,
)

def test_fit_partial(self, dataset: Dataset) -> None:
base_model = DeterministicLightFM(no_components=2, loss="logistic")
model = LightFMWrapperModel(model=base_model, epochs=50).fit(dataset)
data = [
[150, 11],
[150, 12],
[150, 15],
]
new_interactions = pd.DataFrame(data, columns=Columns.UserItem)
new_interactions[Columns.Weight] = 1
new_interactions[Columns.Datetime] = "2021-09-10"
new_dataset = dataset.construct_new_datasets(interactions_df=new_interactions)
model.fit_partial(new_dataset)
actual = model.recommend(
users=np.array([150]), # new user
dataset=new_dataset,
k=2,
filter_viewed=False,
)
expected = pd.DataFrame(
{
Columns.User: [150, 150],
Columns.Item: [15, 12],
Columns.Rank: [1, 2],
}
)
pd.testing.assert_frame_equal(actual.drop(columns=Columns.Score), expected)
pd.testing.assert_frame_equal(
actual.sort_values([Columns.User, Columns.Score], ascending=[True, False]).reset_index(drop=True),
actual,
)

def test_with_warp_kos(self, dataset: Dataset) -> None:
base_model = DeterministicLightFM(no_components=2, loss="warp-kos")
try:
Expand Down
Loading