Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix propeller crash when inferring literal type for an offloaded literal #5771

Merged
merged 12 commits into from
Oct 2, 2024

Conversation

pmahindrakar-oss
Copy link
Contributor

@pmahindrakar-oss pmahindrakar-oss commented Sep 23, 2024

Why are the changes needed?

Followup to this ticket #5705
Fixes propeller crash when the type inferred is nil.

What changes were proposed in this pull request?

The LiteralTypeFromLiteral function didn't handle Offloaded type

Along with this adds the following changes.

  • Reading of offloaded literal in array node map task if the input itself is an offloaded literal. This is to support passing the inputs to all the sub nodes
  • Promise input supports for offloaded literal
  • Removes logging of the inputs outputs which might be large from datacatlog
  • Adds hashing logic for offloaded literal which is used for caching
  • adds flytectl support to o/p offloaded metadata uri
  • bumps the maxDownloadMbs to 1000 MB to be inline with the max offloaded literal size

This does increase the memory profile for propeller whenever it needs to download large literal

How was this patch tested?

Tested using sandbox and the changes working fine for the following examples . Tested for variation of caching aswell

import typing
from dataclasses import dataclass
from flytekit import task, workflow, map_task
from flytekit.types.file import FlyteFile
from mashumaro.mixins.json import DataClassJSONMixin

@dataclass
# class InnerDC(DataClassJSONMixin):
class InnerDC:
    a: bool
    b: int
    c: float
    d: list[int]
    e: dict[str, int]
    f: FlyteFile

@dataclass
# class DC(DataClassJSONMixin):
class DC:
    name: str
    other: str
    f: FlyteFile
    inner: InnerDC


@task(cache=True, cache_version="1.1")
def f(n: int) -> DC:
    with open("/tmp/abc", "w") as f:
        f.write("wow")
    name = "a" * 500_000
    return DC(
        name=name,
        other=name,
        f=FlyteFile("/tmp/abc"),
        inner=InnerDC(
            a=True,
            b=42,
            c=3.1415,
            d=[1,2,3],
            e={"a": 1, "b": 2},
            f=FlyteFile("/tmp/abc"),
        )
    )

@task(cache=True, cache_version="1.1")
def get_list(n: int) -> list[int]:
    return list(range(n))

@task(cache=True, cache_version="1.1")
def id_dataclass(dc: DC) -> DC:
    with open(dc.f, "r") as f:
        print(f.read())
    return dc


@task(cache=True, cache_version="1.1")
def consume_list_of_objects(xs: list[DC]) -> list[DC]:
    with open(xs[0].f, "r") as f:
        print(f.read())
    return xs[0:2]


@workflow
def working_wf(n: int = 15) -> list[DC]:
    xs=map_task(f)(n=get_list(n=n))
    return consume_list_of_objects(xs=xs)


@workflow
def passing_map_output_wf(n: int = 15) -> list[DC]:
    xs=map_task(f)(n=get_list(n=n))
    # Notice how we pass the offloaded literal to map_task
    return map_task(id_dataclass)(dc=xs) 

@task(cache=True, cache_version="1.1")
def consume_single_element(y: DC) -> DC:
    with open(y.f, "r") as f:
        print(f.read())
    return y

@workflow
def indexing_map_output_wf(n: int = 15):
    a=passing_map_output_wf(n=n)
    consume_single_element(y=a[0])

Notice consume_list_objects returns a slice since the outputting of offloaded data is yet to be supported in flytekit without which if the data is too large the datacatlog call errors out with grpc error due to max size breached

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Copy link

codecov bot commented Sep 23, 2024

Codecov Report

Attention: Patch coverage is 43.90244% with 23 lines in your changes missing coverage. Please review.

Project coverage is 36.31%. Comparing base (1942173) to head (9edd5f9).

Files with missing lines Patch % Lines
...lytepropeller/pkg/controller/nodes/common/utils.go 44.44% 6 Missing and 4 partials ⚠️
...ytepropeller/pkg/controller/nodes/array/handler.go 0.00% 5 Missing and 1 partial ⚠️
...opeller/pkg/controller/nodes/attr_path_resolver.go 14.28% 5 Missing and 1 partial ⚠️
...epropeller/pkg/controller/nodes/output_resolver.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5771      +/-   ##
==========================================
- Coverage   36.31%   36.31%   -0.01%     
==========================================
  Files        1304     1304              
  Lines      110048   110072      +24     
==========================================
- Hits        39969    39968       -1     
- Misses      65923    65942      +19     
- Partials     4156     4162       +6     
Flag Coverage Δ
unittests-datacatalog 51.37% <ø> (ø)
unittests-flyteadmin 55.57% <100.00%> (-0.05%) ⬇️
unittests-flytecopilot 12.17% <ø> (ø)
unittests-flytectl 62.21% <ø> (ø)
unittests-flyteidl 7.12% <100.00%> (+<0.01%) ⬆️
unittests-flyteplugins 53.35% <ø> (ø)
unittests-flytepropeller 41.91% <39.47%> (-0.02%) ⬇️
unittests-flytestdlib 55.37% <ø> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

wild-endeavor
wild-endeavor previously approved these changes Sep 24, 2024
Copy link
Contributor

@wild-endeavor wild-endeavor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is fine, but cc @eapolinario

@wild-endeavor
Copy link
Contributor

this doesn't break any flyte releases right?

@pmahindrakar-oss
Copy link
Contributor Author

@wild-endeavor yes right now it doesn't since the feature is disabled, but this happens in peculiar case of map task accepting an offloaded literal. I am working on more elaborate fix for this as the current fix doesn't completely solve it.

@pmahindrakar-oss
Copy link
Contributor Author

@wild-endeavor added more fixes for the crash along with array node support to read offloaded literal ,promise support etc,

katrogan
katrogan previously approved these changes Sep 27, 2024
Copy link
Contributor

@katrogan katrogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but hopefully @hamersaw or @pvditt can review here too!

@@ -96,6 +96,10 @@ func ExtractFromLiteral(literal *core.Literal) (interface{}, error) {
}
}
return mapResult, nil
case *core.Literal_OffloadedMetadata:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, thank you for updating

@@ -192,11 +192,20 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu

size := -1
for _, variable := range literalMap.Literals {
if variable.GetOffloadedMetadata() != nil {
// variable will be overwritten with the contents of the offloaded data which contains the actual large literal.
// We need this for the map task to be able to create the subNodeSpec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just so I understand, this is also needed so the map task can index into the inputs as well too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah subNodeSpec covers that as it contains the inputs for each subnode

@@ -122,7 +122,7 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry
logger.Debugf(ctx, "DataCatalog failed to get artifact by tag %+v, err: %+v", tag, err)
return catalog.Entry{}, err
}
logger.Debugf(ctx, "Artifact found %v from tag %v", artifact, tag)
logger.Debugf(ctx, "Artifact found %v from tag %v", artifact.GetId(), tag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why these changes are necessary? do we ever send too big inputs/outputs to cache service? shouldn't we be using the offloaded literal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even they are not too big breaching the 10 MB limit, but anything less than that threshold will get logged which is also huge and hence removed it. Let me know if you disagree

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, I could still see this being useful, maybe we only log a deterministic prefix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was logging an entire object and hence i think logging an identifier should good for long term too and we have api's to fetch the aritifact using the ID if we want to debug further. Logging a prefix of object converted to string format doesn't seem useful IMO

flytepropeller/pkg/controller/nodes/common/utils.go Outdated Show resolved Hide resolved
Signed-off-by: pmahindrakar-oss <[email protected]>
Signed-off-by: pmahindrakar-oss <[email protected]>
Signed-off-by: pmahindrakar-oss <[email protected]>
Signed-off-by: pmahindrakar-oss <[email protected]>
Signed-off-by: pmahindrakar-oss <[email protected]>
Signed-off-by: pmahindrakar-oss <[email protected]>
Signed-off-by: pmahindrakar-oss <[email protected]>
@pmahindrakar-oss pmahindrakar-oss merged commit e7ce437 into master Oct 2, 2024
50 checks passed
@pmahindrakar-oss pmahindrakar-oss deleted the fix-propeller-crash branch October 2, 2024 17:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants