From 301927f83548989638d8e698896f122bc37a22c4 Mon Sep 17 00:00:00 2001 From: loesvdbiggelaar Date: Thu, 11 Apr 2024 11:25:51 +0200 Subject: [PATCH] remove redundant code from rdfwriter which is already defined in batch_writer --- biocypher/write/_batch_writer.py | 3 + biocypher/write/graph/_rdf.py | 427 ++++--------------------------- 2 files changed, 51 insertions(+), 379 deletions(-) diff --git a/biocypher/write/_batch_writer.py b/biocypher/write/_batch_writer.py index aa4f61c2..20e6d2e4 100644 --- a/biocypher/write/_batch_writer.py +++ b/biocypher/write/_batch_writer.py @@ -198,6 +198,9 @@ class contains all methods expected by a bach writer instance, some of db_port: The database port. + + rdf_format: + The format of RDF. """ self.db_name = db_name self.db_user = db_user diff --git a/biocypher/write/graph/_rdf.py b/biocypher/write/graph/_rdf.py index 1c023b56..cdca3656 100644 --- a/biocypher/write/graph/_rdf.py +++ b/biocypher/write/graph/_rdf.py @@ -70,9 +70,17 @@ def _get_rdf_format(self, string) -> bool: ) return False else: + # RDF graph does not support 'ttl' format, but only "turtle" format. while the preferred extension is always .ttl + if self.rdf_format == "turtle": + self.extension = "ttl" + elif self.rdf_format == "ttl": + self.rdf_format = "turtle" + self.extension = "ttl" + else: + self.extension = self.rdf_format return True - def _write_single_edge_list_to_rdf( + def _write_single_edge_list_to_file( self, edge_list: list, label: str, @@ -125,17 +133,8 @@ def _write_single_edge_list_to_rdf( # translate label to PascalCase label_pascal = self.translator.name_sentence_to_pascal(label) - # RDF graph does not support 'ttl' format, but only "turtle" format. while the preferred extension is always .ttl - if self.rdf_format == "turtle": - extension = "ttl" - elif self.rdf_format == "ttl": - self.rdf_format = "turtle" - extension = "ttl" - else: - extension = self.rdf_format - # create file name - fileName = os.path.join(self._outdir, f'{label_pascal}.{extension}') + fileName = os.path.join(self._outdir, f'{label_pascal}.{self.extension}') # write data in graph @@ -159,7 +158,7 @@ def _write_single_edge_list_to_rdf( return True - def _write_single_node_list_to_rdf( + def _write_single_node_list_to_file( self, node_list: list, label: str, @@ -185,49 +184,38 @@ def _write_single_node_list_to_rdf( logger.error('Nodes must be passed as type BioCypherNode.') return False - # from list of nodes to list of strings - lines = [] - for n in node_list: - - # check for deviations in properties - # node properties - n_props = n.get_properties() - n_keys = list(n_props.keys()) - # reference properties - ref_props = list(prop_dict.keys()) - - # compare lists order invariant - if not set(ref_props) == set(n_keys): - onode = n.get_id() - oprop1 = set(ref_props).difference(n_keys) - oprop2 = set(n_keys).difference(ref_props) - logger.error( - f'At least one node of the class {n.get_label()} ' - f'has more or fewer properties than another. ' - f'Offending node: {onode!r}, offending property: ' - f'{max([oprop1, oprop2])}. ' - f'All reference properties: {ref_props}, ' - f'All node properties: {n_keys}.', - ) - return False + + # do not check for deviations in properties. + # This is not applicable for rdf. + if False: + # check for deviations in properties + # node properties + n_props = n.get_properties() + n_keys = list(n_props.keys()) + # reference properties + ref_props = list(prop_dict.keys()) + + # compare lists order invariant + if not set(ref_props) == set(n_keys): + onode = n.get_id() + oprop1 = set(ref_props).difference(n_keys) + oprop2 = set(n_keys).difference(ref_props) + logger.error( + f'At least one node of the class {n.get_label()} ' + f'has more or fewer properties than another. ' + f'Offending node: {onode!r}, offending property: ' + f'{max([oprop1, oprop2])}. ' + f'All reference properties: {ref_props}, ' + f'All node properties: {n_keys}.', + ) + return False # translate label to PascalCase label_pascal = self.translator.name_sentence_to_pascal(label) - - - # RDF graph does not support 'ttl' format, but only "turtle" format. while the preferred extension is always .ttl - if self.rdf_format == "turtle": - extension = "ttl" - elif self.rdf_format == "ttl": - self.rdf_format = "turtle" - extension = "ttl" - else: - extension = self.rdf_format - # create file name - fileName = os.path.join(self._outdir, f'{label_pascal}.{extension}') + fileName = os.path.join(self._outdir, f'{label_pascal}.{self.extension}') # write data in graph g = Graph() @@ -241,7 +229,9 @@ def _write_single_node_list_to_rdf( g.add((self.namespaces["biocypher"][class_name], RDF.type, RDFS.Class)) g.add((self.label_to_uri(rdf_subject), RDFS.Class, self.namespaces["biocypher"][class_name])) for key, value in properties.items(): - g.add((self.label_to_uri(rdf_subject), self.namespaces["biocypher"][key], Literal(value))) + # only write value if it exists. + if value: + g.add((self.label_to_uri(rdf_subject), self.namespaces["biocypher"][key], Literal(value))) g.serialize(destination=fileName, format=self.rdf_format) @@ -252,332 +242,10 @@ def _write_single_node_list_to_rdf( ) return True - - def _lpg_to_rdf(self, nodes_or_edges, is_node, batch_size): - """ - Function to convert BioCypher's labeled property graph into RDF - format using a minimal approach where all properties are dropped. - Expects list or generator of nodes from the - :py:class:`BioCypherNode` class, or edges from the - :py:class:`BioCypherEdge` or :py:class:`BioCypherRelAsNode` class. - - Args: - nodes_or_edges: a list or generator of nodes in - :py:class:`BioCypherNode`, - :py:class:`BioCypherEdge` or - :py:class:`BioCypherRelAsNode`format - is_node: boolean, 1=nodes 0=edges - - Returns: - bool: The return value. True for success, False otherwise. - """ - self.seen_node_ids = set() - self.duplicate_node_ids = set() - self.duplicate_node_types = set() - self.seen_edges = {} - self.duplicate_edge_ids = set() - self.duplicate_edge_types = set() - - if is_node: - #### _write_node_data() function - if isinstance(nodes_or_edges, GeneratorType) or isinstance(nodes_or_edges, peekable): - logger.debug('Writing node data to RDF from generator.') - - bins = defaultdict(list) # dict to store a list for each - # label that is passed in - bin_l = {} # dict to store the length of each list for - # batching cutoff - reference_props = defaultdict( - dict, - ) # dict to store a dict of properties - # for each label to check for consistency and their type - # for now, relevant for `int` - labels = {} # dict to store the additional labels for each - # primary graph constituent from biolink hierarchy - for node in nodes_or_edges: - _id = node.get_id() - label = node.get_label() - - # check for non-id - if not _id: - logger.warning(f'Node {label} has no id; skipping.') - continue - - # check if node has already been written, if so skip - if _id in self.seen_node_ids: - self.duplicate_node_ids.add(_id) - if not label in self.duplicate_node_types: - self.duplicate_node_types.add(label) - logger.warning( - f'Duplicate nodes found in type {label}. ' - ) - continue - - if not label in bins.keys(): - # start new list - all_labels = None - bins[label].append(node) - bin_l[label] = 1 - - # get properties from config if present - cprops = self.translator.ontology.mapping.extended_schema.get(label).get('properties', ) - if cprops: - d = dict(cprops) - - # add id and preferred id to properties; these are - # created in node creation (`_create.BioCypherNode`) - d['id'] = 'str' - d['preferred_id'] = 'str' - - # add strict mode properties - if self.strict_mode: - d['source'] = 'str' - d['version'] = 'str' - d['licence'] = 'str' - - else: - d = dict(node.get_properties()) - # encode property type - for k, v in d.items(): - if d[k] is not None: - d[k] = type(v).__name__ - # else use first encountered node to define properties for - # checking; could later be by checking all nodes but much - # more complicated, particularly involving batch writing - # (would require "do-overs"). for now, we output a warning - # if node properties diverge from reference properties (in - # write_single_node_list_to_file) TODO if it occurs, ask - # user to select desired properties and restart the process - - reference_props[label] = d - - # get label hierarchy - # multiple labels: - all_labels = self.translator.ontology.get_ancestors(label) - - if all_labels: - # convert to pascal case - all_labels = [ - self.translator.name_sentence_to_pascal(label) - for label in all_labels - ] - # remove duplicates - all_labels = list(OrderedDict.fromkeys(all_labels)) - # order alphabetically - all_labels.sort() - else: - all_labels = self.translator.name_sentence_to_pascal( - label - ) - - labels[label] = all_labels - - else: - # add to list - bins[label].append(node) - bin_l[label] += 1 - if not bin_l[label] < batch_size: - # batch size controlled here - passed = self._write_single_node_list_to_rdf( - bins[label], - label, - reference_props[label], - labels[label], - ) - - if not passed: - return False - - bins[label] = [] - bin_l[label] = 0 - - self.seen_node_ids.add(_id) - - # after generator depleted, write remainder of bins - for label, nl in bins.items(): - passed = self._write_single_node_list_to_rdf( - nl, - label, - reference_props[label], - labels[label], - ) - - if not passed: - return False - - # use complete bin list to write header files - # TODO if a node type has varying properties - # (ie missingness), we'd need to collect all possible - # properties in the generator pass - - # save config or first-node properties to instance attribute - for label in reference_props.keys(): - self.node_property_dict[label] = reference_props[label] - - return True - else: - if type(nodes_or_edges) is not list: - logger.error('Nodes must be passed as list or generator.') - return False - else: - - def gen(nodes): - yield from nodes - - return self._write_node_data(gen(nodes_or_edges), batch_size=batch_size) - - else: - #### _write_edge_data() function - if isinstance(nodes_or_edges, GeneratorType): - logger.debug('Writing edge data to RDF from generator.') - bins = defaultdict(list) # dict to store a list for each - # label that is passed in - bin_l = {} # dict to store the length of each list for - # batching cutoff - reference_props = defaultdict( - dict, - ) # dict to store a dict of properties - # for each label to check for consistency and their type - # for now, relevant for `int` - for e in nodes_or_edges: - if isinstance(e, BioCypherRelAsNode): - # shouldn't happen any more - logger.error( - "Edges cannot be of type 'RelAsNode'. " - f'Caused by: {e}', - ) - return False - - if not (e.get_source_id() and e.get_target_id()): - logger.error( - 'Edge must have source and target node. ' - f'Caused by: {e}', - ) - continue - - label = e.get_label() - - if not label in self.seen_edges.keys(): - self.seen_edges[label] = set() - - src_tar_id = '_'.join([e.get_source_id(), e.get_target_id()]) - - # check for duplicates - if src_tar_id in self.seen_edges.get(label, set()): - self.duplicate_edge_ids.add(src_tar_id) - if not label in self.duplicate_edge_types: - self.duplicate_edge_types.add(label) - logger.warning( - f'Duplicate edges found in type {label}. ' - ) - continue - - else: - self.seen_edges[label].add(src_tar_id) - - if not label in bins.keys(): - # start new list - bins[label].append(e) - bin_l[label] = 1 - - # get properties from config if present - - # check whether label is in ontology_adapter.leaves - # (may not be if it is an edge that carries the - # "label_as_edge" property) - cprops = None - if label in self.translator.ontology.mapping.extended_schema: - cprops = self.translator.ontology.mapping.extended_schema.get(label).get( - 'properties', - ) - else: - # try via "label_as_edge" - for k, v in self.translator.ontology.mapping.extended_schema.items(): - if isinstance(v, dict): - if v.get('label_as_edge') == label: - cprops = v.get('properties') - break - if cprops: - d = cprops - - # add strict mode properties - if self.strict_mode: - d['source'] = 'str' - d['version'] = 'str' - d['licence'] = 'str' - - else: - d = dict(e.get_properties()) - # encode property type - for k, v in d.items(): - if d[k] is not None: - d[k] = type(v).__name__ - # else use first encountered edge to define - # properties for checking; could later be by - # checking all edges but much more complicated, - # particularly involving batch writing (would - # require "do-overs"). for now, we output a warning - # if edge properties diverge from reference - # properties (in write_single_edge_list_to_file) - # TODO - - reference_props[label] = d - - else: - # add to list - bins[label].append(e) - bin_l[label] += 1 - if not bin_l[label] < batch_size: - # batch size controlled here - passed = self._write_single_edge_list_to_rdf( - bins[label], - label, - reference_props[label], - ) - - if not passed: - return False - - bins[label] = [] - bin_l[label] = 0 - - # after generator depleted, write remainder of bins - for label, nl in bins.items(): - - passed = self._write_single_edge_list_to_rdf( - nl, - label, - reference_props[label], - ) - - if not passed: - return False - - # use complete bin list to write header files - # TODO if a edge type has varying properties - # (ie missingness), we'd need to collect all possible - # properties in the generator pass - - # save first-edge properties to instance attribute - for label in reference_props.keys(): - self.edge_property_dict[label] = reference_props[label] - - else: - if type(nodes_or_edges) is not list: - logger.error('Edges must be passed as list or generator.') - return False - else: - - def gen(edges): - yield from edges - - return self._write_edge_data(gen(nodes_or_edges), batch_size=batch_size) - - return True - + def write_nodes(self, nodes, batch_size: int = int(1e6)): """ - Wrapper for writing nodes in rdf format. It calls _lpg_to_rdf() + Wrapper for writing nodes in rdf format. It calls _write_node_data() functions specifying it's node data. Args: @@ -592,8 +260,8 @@ def write_nodes(self, nodes, batch_size: int = int(1e6)): if not passed: logger.error('Error while writing node data, wrong RDF format') return False - # write node data using _lpg_to_rdf method - passed = self._lpg_to_rdf(nodes, is_node=1, batch_size=int(1e6)) + # write node data using _write_node_data method + passed = self._write_node_data(nodes, batch_size=batch_size) if not passed: logger.error('Error while writing node data.') return False @@ -604,7 +272,7 @@ def write_edges( batch_size: int = int(1e6), ) -> bool: """ - Wrapper for writing edges in rdf format. It calls _lpg_to_rdf() + Wrapper for writing edges in rdf format. It calls _write_edge_data() functions specifying it's edge data. Args: @@ -619,8 +287,8 @@ def write_edges( if not passed: logger.error('Error while writing edge data, wrong RDF format') return False - # write edge data using _lpg_to_rdf method - passed = self._lpg_to_rdf(edges, is_node=0, batch_size=int(1e6)) + # write edge data using _write_edge_data method + passed = self._write_edge_data(edges, batch_size=batch_size) if not passed: logger.error('Error while writing edge data.') return False @@ -694,6 +362,7 @@ def label_to_uri(self, input): # TODO: this should flow out of the config file! # hardcoded it for now + def _init_namespaces(self, graph): self.namespaces = {} self.namespaces["biocypher"] = Namespace("http://example.org/biocypher#")