Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding large amounts of metadata does not work #22

Open
yonomitt opened this issue May 27, 2023 · 3 comments
Open

Adding large amounts of metadata does not work #22

yonomitt opened this issue May 27, 2023 · 3 comments
Labels
bug Something isn't working priority: high

Comments

@yonomitt
Copy link

yonomitt commented May 27, 2023

As a stress test, I have a repo with 542,247 images in it and wanted to add metadata to a data source. I ran the following code from a Jupyter notebook:

# Set up DagsHub
import os
os.environ["DAGSHUB_CLIENT_HOST"] = "https://test.dagshub.com"

from dagshub.data_engine.model import datasources

repo = "yonomitt/LAION-Aesthetics-V2-6.5plus"
image_root = "data"
try:
    ds = datasources.get_datasource(repo=repo, name="images")
except:
    ds = datasources.create_from_repo(repo=repo, name="images", path=image_root)


# Imports
from tqdm import tqdm


# Add metadata
annotations_file = 'labels.tsv'

with ds.metadata_context() as ctx, open(annotations_file) as f:
    for row in tqdm(f.readlines()):
        image, caption, score = row.split('\t')[:3]
        ctx.update_metadata(image, {'caption': caption, 'score': score})

The first time I ran this, it never returned (I waited several hours). The second time, I got a 502:

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 542247/542247 [00:01<00:00, 342020.81it/s]
---------------------------------------------------------------------------
JSONDecodeError                           Traceback (most recent call last)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/requests/models.py:971, in Response.json(self, **kwargs)
    970 try:
--> 971     return complexjson.loads(self.text, **kwargs)
    972 except JSONDecodeError as e:
    973     # Catch JSON-related errors and raise as requests.JSONDecodeError
    974     # This aliases json.JSONDecodeError and simplejson.JSONDecodeError

File ~/.miniforge3/envs/dagstest/lib/python3.10/json/__init__.py:346, in loads(s, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, **kw)
    343 if (cls is None and object_hook is None and
    344         parse_int is None and parse_float is None and
    345         parse_constant is None and object_pairs_hook is None and not kw):
--> 346     return _default_decoder.decode(s)
    347 if cls is None:

File ~/.miniforge3/envs/dagstest/lib/python3.10/json/decoder.py:337, in JSONDecoder.decode(self, s, _w)
    333 """Return the Python representation of ``s`` (a ``str`` instance
    334 containing a JSON document).
    335 
    336 """
--> 337 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
    338 end = _w(s, end).end()

File ~/.miniforge3/envs/dagstest/lib/python3.10/json/decoder.py:355, in JSONDecoder.raw_decode(self, s, idx)
    354 except StopIteration as err:
--> 355     raise JSONDecodeError("Expecting value", s, err.value) from None
    356 return obj, end

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

During handling of the above exception, another exception occurred:

JSONDecodeError                           Traceback (most recent call last)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:243, in RequestsHTTPTransport.execute(self, document, variable_values, operation_name, timeout, extra_args, upload_files)
    242 try:
--> 243     result = response.json()
    245     if log.isEnabledFor(logging.INFO):

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/requests/models.py:975, in Response.json(self, **kwargs)
    972 except JSONDecodeError as e:
    973     # Catch JSON-related errors and raise as requests.JSONDecodeError
    974     # This aliases json.JSONDecodeError and simplejson.JSONDecodeError
--> 975     raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

During handling of the above exception, another exception occurred:

HTTPError                                 Traceback (most recent call last)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:231, in RequestsHTTPTransport.execute.<locals>.raise_response_error(resp, reason)
    229 try:
    230     # Raise a HTTPError if response status is 400 or higher
--> 231     resp.raise_for_status()
    232 except requests.HTTPError as e:

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/requests/models.py:1021, in Response.raise_for_status(self)
   1020 if http_error_msg:
-> 1021     raise HTTPError(http_error_msg, response=self)

HTTPError: 502 Server Error: Bad Gateway for url: https://test.dagshub.com/api/v1/repos/yonomitt/LAION-Aesthetics-V2-6.5plus/data-engine/graphql

The above exception was the direct cause of the following exception:

TransportServerError                      Traceback (most recent call last)
Cell In[10], line 3
      1 annotations_file = 'labels.tsv'
----> 3 with ds.metadata_context() as ctx, open(annotations_file) as f:
      4     for row in tqdm(f.readlines()):
      5         image, caption, score = row.split('\t')[:3]

File ~/.miniforge3/envs/dagstest/lib/python3.10/contextlib.py:142, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
    140 if typ is None:
    141     try:
--> 142         next(self.gen)
    143     except StopIteration:
    144         return False

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/model/datasource.py:118, in Datasource.metadata_context(self)
    116 ctx = MetadataContextManager(self)
    117 yield ctx
--> 118 self._upload_metadata(ctx.get_metadata_entries())

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/model/datasource.py:183, in Datasource._upload_metadata(self, metadata_entries)
    182 def _upload_metadata(self, metadata_entries: List[DatapointMetadataUpdateEntry]):
--> 183     self.source.client.update_metadata(self, metadata_entries)

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/client/data_client.py:109, in DataClient.update_metadata(self, datasource, entries)
    102 assert len(entries) > 0
    104 params = GqlMutations.update_metadata_params(
    105     datasource_id=datasource.source.id,
    106     datapoints=[e.to_dict() for e in entries]
    107 )
--> 109 return self._exec(q, params)

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/client/data_client.py:82, in DataClient._exec(self, query, params)
     80     logger.debug(f"Params: {params}")
     81 q = gql.gql(query)
---> 82 resp = self.client.execute(q, variable_values=params)
     83 return resp

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:403, in Client.execute(self, document, variable_values, operation_name, serialize_variables, parse_result, get_execution_result, **kwargs)
    400     return data
    402 else:  # Sync transports
--> 403     return self.execute_sync(
    404         document,
    405         variable_values=variable_values,
    406         operation_name=operation_name,
    407         serialize_variables=serialize_variables,
    408         parse_result=parse_result,
    409         get_execution_result=get_execution_result,
    410         **kwargs,
    411     )

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:221, in Client.execute_sync(self, document, variable_values, operation_name, serialize_variables, parse_result, get_execution_result, **kwargs)
    219 """:meta private:"""
    220 with self as session:
--> 221     return session.execute(
    222         document,
    223         variable_values=variable_values,
    224         operation_name=operation_name,
    225         serialize_variables=serialize_variables,
    226         parse_result=parse_result,
    227         get_execution_result=get_execution_result,
    228         **kwargs,
    229     )

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:849, in SyncClientSession.execute(self, document, variable_values, operation_name, serialize_variables, parse_result, get_execution_result, **kwargs)
    829 """Execute the provided document AST synchronously using
    830 the sync transport.
    831 
   (...)
    845 
    846 The extra arguments are passed to the transport execute method."""
    848 # Validate and execute on the transport
--> 849 result = self._execute(
    850     document,
    851     variable_values=variable_values,
    852     operation_name=operation_name,
    853     serialize_variables=serialize_variables,
    854     parse_result=parse_result,
    855     **kwargs,
    856 )
    858 # Raise an error if an error is returned in the ExecutionResult object
    859 if result.errors:

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:758, in SyncClientSession._execute(self, document, variable_values, operation_name, serialize_variables, parse_result, **kwargs)
    748         if serialize_variables or (
    749             serialize_variables is None and self.client.serialize_variables
    750         ):
    751             variable_values = serialize_variable_values(
    752                 self.client.schema,
    753                 document,
    754                 variable_values,
    755                 operation_name=operation_name,
    756             )
--> 758 result = self.transport.execute(
    759     document,
    760     variable_values=variable_values,
    761     operation_name=operation_name,
    762     **kwargs,
    763 )
    765 # Unserialize the result if requested
    766 if self.client.schema:

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:249, in RequestsHTTPTransport.execute(self, document, variable_values, operation_name, timeout, extra_args, upload_files)
    246         log.info("<<< %s", response.text)
    248 except Exception:
--> 249     raise_response_error(response, "Not a JSON answer")
    251 if "errors" not in result and "data" not in result:
    252     raise_response_error(response, 'No "data" or "errors" keys in answer')

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:233, in RequestsHTTPTransport.execute.<locals>.raise_response_error(resp, reason)
    231     resp.raise_for_status()
    232 except requests.HTTPError as e:
--> 233     raise TransportServerError(str(e), e.response.status_code) from e
    235 result_text = resp.text
    236 raise TransportProtocolError(
    237     f"Server did not return a GraphQL result: "
    238     f"{reason}: "
    239     f"{result_text}"
    240 )

TransportServerError: 502 Server Error: Bad Gateway for url: https://test.dagshub.com/api/v1/repos/yonomitt/LAION-Aesthetics-V2-6.5plus/data-engine/graphql
@yonomitt
Copy link
Author

The labels.tsv file can be found here: https://dagshub.com/DagsHub-Datasets/LAION-Aesthetics-V2-6.5plus/src/main/data/labels.tsv

And has 542,247 rows.

@yonomitt
Copy link
Author

yonomitt commented May 27, 2023

The workaround was to batch the metadata uploads:

annotations_file = 'labels.tsv'

all_metadata = []
with open(annotations_file) as f:
    for row in tqdm(f.readlines()):
        image, caption, score = row.split('\t')[:3]
    all_metadata.append((image, {'caption': caption[:255], 'score': score}))

total = len(all_metadata)

batch = 1000
for start in tqdm(range(0, total, batch)):
    data = all_metadata[start:start+batch]

    with ds.metadata_context() as ctx, open(annotations_file) as f:
        for image, metadata in data:
            ctx.update_metadata(image, metadata)

@simonlsk simonlsk added bug Something isn't working priority: high labels May 29, 2023
@kbolashev
Copy link
Member

I've copied the batching into the metadata upload, uploading it in batches of 5k points at a time.
Hope that's good enough and we don't need any backend changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working priority: high
Projects
None yet
Development

No branches or pull requests

3 participants