Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors pulled from #668 #686

Merged
merged 12 commits into from
Mar 12, 2024
7 changes: 3 additions & 4 deletions tiled/adapters/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ def read_csv(
)


read_csv.__doc__ = (
"""
read_csv.__doc__ = """
This wraps dask.dataframe.read_csv. Original docstring:

"""
+ dask.dataframe.read_csv.__doc__
""" + (
dask.dataframe.read_csv.__doc__ or ""
)
12 changes: 7 additions & 5 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ async def __aiter__(self):

@property
def data_sources(self):
return [DataSource.from_orm(ds) for ds in self.node.data_sources or []]
return [DataSource.from_orm(ds) for ds in (self.node.data_sources or [])]

async def asset_by_id(self, asset_id):
statement = (
Expand Down Expand Up @@ -597,12 +597,14 @@ async def create_node(
if data_source.management != Management.external:
if structure_family == StructureFamily.container:
raise NotImplementedError(structure_family)
data_source.mimetype = DEFAULT_CREATION_MIMETYPE[structure_family]
data_source.mimetype = DEFAULT_CREATION_MIMETYPE[
data_source.structure_family
]
data_source.parameters = {}
data_uri = str(self.context.writable_storage) + "".join(
f"/{quote_plus(segment)}" for segment in (self.segments + [key])
)
init_storage = DEFAULT_INIT_STORAGE[structure_family]
init_storage = DEFAULT_INIT_STORAGE[data_source.structure_family]
assets = await ensure_awaitable(
init_storage, data_uri, data_source.structure
)
Expand Down Expand Up @@ -1307,9 +1309,9 @@ def specs_array_to_json(specs):


STRUCTURES = {
StructureFamily.container: CatalogContainerAdapter,
StructureFamily.array: CatalogArrayAdapter,
StructureFamily.awkward: CatalogAwkwardAdapter,
StructureFamily.table: CatalogTableAdapter,
StructureFamily.container: CatalogContainerAdapter,
StructureFamily.sparse: CatalogSparseAdapter,
StructureFamily.table: CatalogTableAdapter,
}
2 changes: 1 addition & 1 deletion tiled/catalog/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ class DataSource(Timestamped, Base):
node_id = Column(
Integer, ForeignKey("nodes.id", ondelete="CASCADE"), nullable=False
)
structure_family = Column(Enum(StructureFamily), nullable=False)
structure_id = Column(
Unicode(32), ForeignKey("structures.id", ondelete="CASCADE"), nullable=True
)
Expand All @@ -301,6 +300,7 @@ class DataSource(Timestamped, Base):
parameters = Column(JSONVariant, nullable=True)
# This relates to the mutability of the data.
management = Column(Enum(Management), nullable=False)
structure_family = Column(Enum(StructureFamily), nullable=False)

# many-to-one relationship to Structure
structure: Mapped["Structure"] = relationship(
Expand Down
5 changes: 4 additions & 1 deletion tiled/client/constructors.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,14 @@ def from_context(
and (context.http_client.auth is None)
):
context.authenticate()
params = {}
if include_data_sources:
params["include_data_sources"] = True
content = handle_error(
context.http_client.get(
item_uri,
headers={"Accept": MSGPACK_MIME_TYPE},
params={"include_data_sources": include_data_sources},
params=params,
)
).json()
else:
Expand Down
33 changes: 19 additions & 14 deletions tiled/client/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,18 @@ def __getitem__(self, keys, _ignore_inlined_contents=False):
# Lookup this key *within the search results* of this Node.
key, *tail = keys
tail = tuple(tail) # list -> tuple
params = {
**_queries_to_params(KeyLookup(key)),
**self._queries_as_params,
**self._sorting_params,
}
if self._include_data_sources:
params["include_data_sources"] = True
content = handle_error(
self.context.http_client.get(
self.item["links"]["search"],
headers={"Accept": MSGPACK_MIME_TYPE},
params={
"include_data_sources": self._include_data_sources,
**_queries_to_params(KeyLookup(key)),
**self._queries_as_params,
**self._sorting_params,
},
params=params,
)
).json()
self._cached_len = (
Expand Down Expand Up @@ -305,13 +307,14 @@ def __getitem__(self, keys, _ignore_inlined_contents=False):
self_link = self.item["links"]["self"]
if self_link.endswith("/"):
self_link = self_link[:-1]
params = {}
if self._include_data_sources:
params["include_data_sources"] = True
content = handle_error(
self.context.http_client.get(
self_link + "".join(f"/{key}" for key in keys[i:]),
headers={"Accept": MSGPACK_MIME_TYPE},
params={
"include_data_sources": self._include_data_sources
},
params=params,
)
).json()
except ClientError as err:
Expand Down Expand Up @@ -413,15 +416,17 @@ def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False):
next_page_url = f"{self.item['links']['search']}?page[offset]={start}"
item_counter = itertools.count(start)
while next_page_url is not None:
params = {
**self._queries_as_params,
**sorting_params,
}
if self._include_data_sources:
params["include_data_sources"] = True
content = handle_error(
self.context.http_client.get(
next_page_url,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
"include_data_sources": self._include_data_sources,
**self._queries_as_params,
**sorting_params,
},
params=params,
)
).json()
self._cached_len = (
Expand Down
4 changes: 2 additions & 2 deletions tiled/client/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import dask
import dask.dataframe
import dask.dataframe.core

from ..serialization.table import deserialize_arrow, serialize_arrow
from ..utils import APACHE_ARROW_FILE_MIME_TYPE, UNCHANGED
Expand Down Expand Up @@ -162,7 +162,7 @@ def read(self, columns=None):

if columns is not None:
meta = meta[columns]
ddf = dask.dataframe.DataFrame(
ddf = dask.dataframe.core.DataFrame(
dask_tasks,
name=name,
meta=meta,
Expand Down
54 changes: 15 additions & 39 deletions tiled/server/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from . import schemas
from .etag import tokenize
from .links import links_for_node
from .utils import record_timing

del queries
Expand Down Expand Up @@ -404,6 +405,7 @@ async def construct_resource(
depth=0,
):
path_str = "/".join(path_parts)
id_ = path_parts[-1] if path_parts else ""
attributes = {"ancestors": path_parts[:-1]}
if include_data_sources and hasattr(entry, "data_sources"):
attributes["data_sources"] = entry.data_sources
Expand Down Expand Up @@ -488,15 +490,16 @@ async def construct_resource(
for key, direction in entry.sorting
]
d = {
"id": path_parts[-1] if path_parts else "",
"id": id_,
"attributes": schemas.NodeAttributes(**attributes),
}
if not omit_links:
d["links"] = {
"self": f"{base_url}/metadata/{path_str}",
"search": f"{base_url}/search/{path_str}",
"full": f"{base_url}/container/full/{path_str}",
}
d["links"] = links_for_node(
entry.structure_family,
entry.structure(),
base_url,
path_str,
)

resource = schemas.Resource[
schemas.NodeAttributes, schemas.ContainerLinks, schemas.ContainerMeta
Expand All @@ -510,34 +513,16 @@ async def construct_resource(
entry.structure_family
]
links.update(
{
link: template.format(base_url=base_url, path=path_str)
for link, template in FULL_LINKS[entry.structure_family].items()
}
links_for_node(
entry.structure_family,
entry.structure(),
base_url,
path_str,
)
)
structure = asdict(entry.structure())
if schemas.EntryFields.structure_family in fields:
attributes["structure_family"] = entry.structure_family
if entry.structure_family == StructureFamily.sparse:
shape = structure.get("shape")
block_template = ",".join(f"{{{index}}}" for index in range(len(shape)))
links[
"block"
] = f"{base_url}/array/block/{path_str}?block={block_template}"
elif entry.structure_family == StructureFamily.array:
shape = structure.get("shape")
block_template = ",".join(
f"{{index_{index}}}" for index in range(len(shape))
)
links[
"block"
] = f"{base_url}/array/block/{path_str}?block={block_template}"
elif entry.structure_family == StructureFamily.table:
links[
"partition"
] = f"{base_url}/table/partition/{path_str}?partition={{index}}"
elif entry.structure_family == StructureFamily.awkward:
links["buffers"] = f"{base_url}/awkward/buffers/{path_str}"
if schemas.EntryFields.structure in fields:
attributes["structure"] = structure
else:
Expand Down Expand Up @@ -719,15 +704,6 @@ class WrongTypeForRoute(Exception):
pass


FULL_LINKS = {
StructureFamily.array: {"full": "{base_url}/array/full/{path}"},
StructureFamily.awkward: {"full": "{base_url}/awkward/full/{path}"},
StructureFamily.container: {"full": "{base_url}/container/full/{path}"},
StructureFamily.table: {"full": "{base_url}/table/full/{path}"},
StructureFamily.sparse: {"full": "{base_url}/array/full/{path}"},
}


def asdict(dc):
"Compat for converting dataclass or pydantic.BaseModel to dict."
if dc is None:
Expand Down
16 changes: 14 additions & 2 deletions tiled/server/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def get_root_tree():
)


def SecureEntry(scopes):
def SecureEntry(scopes, structure_families=None):
async def inner(
path: str,
request: Request,
Expand Down Expand Up @@ -116,7 +116,19 @@ async def inner(
)
except NoEntry:
raise HTTPException(status_code=404, detail=f"No such entry: {path_parts}")
return entry
# Fast path for the common successful case
if (structure_families is None) or (
entry.structure_family in structure_families
):
return entry
raise HTTPException(
status_code=404,
detail=(
f"The node at {path} has structure family {entry.structure_family} "
"and this endpoint is compatible with structure families "
f"{structure_families}"
),
)

return Security(inner, scopes=scopes)

Expand Down
53 changes: 53 additions & 0 deletions tiled/server/links.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Generate the 'links' section of the response JSON.

The links vary by structure family.
"""
from ..structures.core import StructureFamily


def links_for_node(structure_family, structure, base_url, path_str):
links = {}
links = LINKS_BY_STRUCTURE_FAMILY[structure_family](
structure_family, structure, base_url, path_str
)
links["self"] = f"{base_url}/metadata/{path_str}"
return links


def links_for_array(structure_family, structure, base_url, path_str):
links = {}
block_template = ",".join(f"{{{index}}}" for index in range(len(structure.shape)))
links["block"] = f"{base_url}/array/block/{path_str}?block={block_template}"
links["full"] = f"{base_url}/array/full/{path_str}"
return links


def links_for_awkward(structure_family, structure, base_url, path_str):
links = {}
links["buffers"] = f"{base_url}/awkward/buffers/{path_str}"
links["full"] = f"{base_url}/awkward/full/{path_str}"
return links


def links_for_container(structure_family, structure, base_url, path_str):
links = {}
links["full"] = f"{base_url}/container/full/{path_str}"
links["search"] = f"{base_url}/search/{path_str}"
return links


def links_for_table(structure_family, structure, base_url, path_str):
links = {}
links["partition"] = f"{base_url}/table/partition/{path_str}?partition={{index}}"
links["full"] = f"{base_url}/table/full/{path_str}"
return links


LINKS_BY_STRUCTURE_FAMILY = {
StructureFamily.array: links_for_array,
StructureFamily.awkward: links_for_awkward,
StructureFamily.container: links_for_container,
StructureFamily.sparse: links_for_array, # spare and array are the same
StructureFamily.table: links_for_table,
}
Loading
Loading