Skip to content

Temporal Graph Support

Christopher Rost edited this page Aug 29, 2024 · 5 revisions

Temporal Property Graph Model (TPGM)

The temporal analysis of evolving graphs is an important requirement in many domains but hardly supported in current graph databases and graph processing systems. We, therefore, have started with extending Gradoop for temporal graph analysis by adding time properties to vertices, edges, and graphs and using them within graph operators.

See the following sections:

The key features of our model are:

  • Bi-temporal time dimension support
  • Backwards compatible with the most EPGM operators
  • Flexible time representation: can be empty, a timestamp, or a time-interval

See our publications

Data Model

The data model is an extension of the EPGM data model. Each graph element (i.e., logical graph, vertex and edge) has two additional time intervals to enable bi-temporal time semantics. One interval represents the transaction time, i.e. the time the fact is current in the graph. It represents rollback information that is maintained by Gradoop. The other interval represents the valid time dimension (also referred to as application time) that represents the time when the information is valid in the real world. It represents historical information. Valid times are typically embedded within the context of the application before they enter Gradoop.

Operators

We added two new operators to the TPGM model that can be exclusively applied on a TemporalGraph instance: Snapshot and Difference. Besides that, we extended some EPGM operators like Transformation, Grouping, and Aggregation with additional features to enable different temporal analyses.

Snapshot

The TPGM snapshot operator allows one to retrieve a valid snapshot of the whole temporal graph either at a specific point in time or a subgraph that is valid during a given time range by providing a temporal predicate function. Besides the operator itself, several predefined predicate functions are available. They are adopted from SQL:2011 that supports temporal databases.

Example code snippet:

// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();

// We need a UNIX timestamp in milliseconds since epoch.
long queryTimestamp = LocalDateTime
      .of(2018, 12, 24, 20, 15, 0, 0)
      .toInstant(ZoneOffset.UTC)
      .toEpochMilli();

// Get the graph 'as of' our query timestamp (consideres the valid time dimension as default)
TemporalGraph historicalGraph = temporalGraph
      .snapshot(new AsOf(queryTimestamp));

Difference

The evolution of graphs over time can be represented by the difference of two graph snapshots, i.e., by a difference graph that is the union of both snapshots where each graph element is annotated as an added, deleted, or persistent element.

The TPGM diff operator consumes two graph snapshots defined by temporal predicate functions and calculates the difference graph. The annotations are stored as a property _diff on each graph element, whereas the value of the property will be a number indicating that an element is either equal in both snapshots (0) or added (1) or removed (-1) in the second snapshot.

Example code snippet:

// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();

// We need two UNIX timestamp in milliseconds since epoch that we want to compare
long firstQueryTimestamp = LocalDateTime
      .of(2018, 12, 24, 20, 15, 0, 0)
      .toInstant(ZoneOffset.UTC)
      .toEpochMilli();

long secondQueryTimestamp = LocalDateTime
      .of(2019, 12, 24, 20, 15, 0, 0)
      .toInstant(ZoneOffset.UTC)
      .toEpochMilli();

// Get the difference of both historical graph version (consideres the valid time dimension as default)
TemporalGraph differenceGraph = temporalGraph
      .diff(new AsOf(firstQueryTimestamp), new AsOf(secondQueryTimestamp));

Transformation

The transform operator defines a structure-preserving modification of graph, vertex and edge data. User-defined transformation functions can be applied to a temporal graph, which results in an modified output graph. Within TPGM it is possible to (1) modify the temporal attributes, (2) define the time attributes from information stored in properties or (3) create properties resulting from the temporal information of the time attributes. For example, if the temporal attributes are not yet set or calculated during a workflow, this operator offers the possibility to define the valid times at runtime.

Example code snippet:

// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();

// Assign the valid-from attribute from a property "CreationDate" for all vertices
temporalGraph = temporalGraph.transform(
  // Keep the graph heads
  TransformationFunction.keep(),
  // Extract timestamps for vertices
  (origin, t) -> {
    if (origin.hasProperty("CreationDate") && origin.getPropertyValue("CreationDate").isDateTime()) {
      origin.setValidFrom(
        origin.getPropertyValue("CreationDate")
          .getDateTime()
          .atZone(ZoneId.systemDefault())
          .toInstant()
          .toEpochMilli());
    }
    return origin;
  },
  // Keep the edges
  TransformationFunction.keep()
);

Time-dependent Graph Grouping

A structural grouping of vertices and edges is an important task in temporal graph analytics. Since temporal graphs can become very large, a condensation can facilitate deeper insights about structures and patterns hidden in the graph. In the current EPGM implementation of the groupBy operator, a grouping is based on vertex and edge grouping keys as well as vertex and edge aggregation functions.

For temporal grouping, TPGM provides three additional features:

  • time-specific value transformation functions can be applied to compute time values on the desired granularity for grouping
  • support for GROUP BY CUBE and GROUP BY ROLLUP similar to SQL (the output of the operator is a collection where each graph corresponds to a single combination of the given grouping keys)
  • support of aggregations on the temporal properties by user-defined functions and predefined time-specific aggregation functions (e.g., MinFrom or MaxFrom)

Example code snippet:

// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();

// Assign the valid-from attribute from a property "CreationDate" for all vertices
TemporalGraph groupedGraph = temporalGraph.groupBy(
  // Vertex grouping keys: Group vertices by their duration in months
  Collections.singletonList(TemporalGroupingKeys.duration(VALID_TIME, ChronoUnit.MONTHS)),
  // Vertex aggregate functions: calculate the average valid time duration of grouped vertices
  Collections.singletonList(new AverageVertexDuration("avgVertexDurValid", VALID_TIME)),
  // Edge grouping keys: Group edges by their type label
  Collections.singletonList(GroupingKeys.label()),
  // Edge aggregate functions: count the grouped edges and save the result in a property "cnt"
  Collections.singletonList(new Count("cnt"))
);

Aggregation

The aggregation operator is used to reduce some kind of information that is contained in a graph down to a single value by given aggregations. The operator can be called by the aggregate() function that is available on the LogicalGraph and TemporalGraph class. It takes any amount of aggregate functions as an input and outputs the original graph with the result of each aggregate function as a new property at the graph head. Most aggregate functions require a string denoting the name of the property or label they are being applied to.

The temporal extension of the aggregation operator introduces new aggregation functions, that are listed under Temporal Aggregations.

Temporal Graph Pattern Matching

Gradoop uses the Temporal-GDL as pattern matching language. Temporal-GDL is a fork of the Graph Definition Language (GDL) of s1ck and extends it with several temporal constraints to support bitemporal property graphs. In Gradoop, you can use it by calling the .temporalQuery() function on your temporal graph.

Please see the respective repo for further information: Temporal-GDL.

** Example usage **

TemporalGraph matchResult = myTemporalGraph
      // Pattern matching
      .temporalQuery("MATCH (v1:Station {cellId: 2883})-[t1:Trip]->(v2:Station)-[t2:Trip]->(v3:Station) " +
        "WHERE v2.id != v1.id " +
        "AND v2.id != v3.id " +
        "AND v3.id != v1.id " +
        "AND t1.val.precedes(t2.val) " +
        "AND t1.val.lengthAtLeast(Minutes(30)) " +
        "AND t2.val.lengthAtLeast(Minutes(30))")
      // Reduce collection to graph
      .reduce(new ReduceCombination<>())

The following temporal predicates are available for Snapshot and Difference operators.

The placeholder named queryTimestamp, queryFrom and queryTo has to be specified as arguments by the user.

elementFrom and elementTo represent the begin and end of the elements validity interval.

Name Description Predicate
All A filter that returns all elements.
AsOf Given a timestamp, this predicate will match all timestamps before or at that time and all time-intervals containing that time. elementFrom <= queryTimestamp && elementTo > queryTimestamp
Between Given a time-interval, this predicate will match all intervals that start before or at that interval's end and end after the start of that interval. elementFrom <= queryTo && elementTo > queryFrom
ContainedIn Given a time interval, this predicate will match all intervals that are a subset of that interval. queryFrom <= elementFrom && elementTo <= queryTo
CreatedIn Given a time-interval, this predicate matches all intervals starting during that interval. queryFrom <= elementFrom && elementFrom <= queryTo
DeletedIn Given a time-interval, this predicate will match all intervals ending during that interval. queryFrom <= elementTo && elementTo <= queryTo
FromTo Given a time-interval, this predicate will match all intervals that were valid during that interval. elementFrom < queryTo && elementTo > queryFrom
ValidDuring Given a time-interval, this predicate matches all intervals that contain that interval. elementFrom <= queryFrom && elementTo >= queryTo

Temporal Aggregations

The following aggregations are available for Teime-dependent Grouping and Aggregation. The two available time dimensions TRANSACTION_TIME and VALID_TIME can be specified by the usage of the enum called TimeDimension. To specify the beginning or end of an interval of a time dimension, the enum TimeDimension.Field can be used.

Function Description Input
MinTime Minimum of a specified time dimensions's begin or end over all edges and vertices (1) The property key where the aggregated value is stored. (2) The time dimension to consider. (3) The field of the time dimension (begin or end) to consider.
MinVertexTime Minimum of a specified time dimensions's begin or end over all vertices Same as MinTime
MinEdgeTime Minimum of a specified time dimensions's begin or end over all edges Same as MinTime
MaxTime Maximum of a specified time dimensions's begin or end over all edges and vertices Same as MinTime
MaxVertexTime Maximum of a specified time dimensions's begin or end over all vertices Same as MinTime
MaxEdgeTime Maximum of a specified time dimensions's begin or end over all edges Same as MinTime
AverageDuration Calculate the average duration of temporal elements of a specified time dimension. Time intervals with either the start or end time set to the respective default value will be ignored. (1) The property key where the aggregated value is stored. (2) The time dimension to consider.
AverageVertexDuration Calculate the average duration of vertices of a specified time dimension. Time intervals with either the start or end time set to the respective default value will be ignored. Same as AverageDuration
AverageEdgeDuration Calculate the average duration of edges of a specified time dimension. Time intervals with either the start or end time set to the respective default value will be ignored. Same as AverageDuration

Data Sources and Sinks for Temporal Property Graphs

The TPGM provides several data sources and sinks to read and persist a TPGM graph to a file system, e.g., HDFS.

Data Sources: TemporalCSVDataSource, TemporalIndexedCSVDataSource, TemporalParquetDataSource, TemporalParquetProtobufDataSource

Data Sinks: TemporalCSVDataSink, TemporalIndexedCSVDataSink, TemporalParquetDataSink, TemporalParquetProtobufDataSink

The following is an example line of an edge.csv file written by the temporal csv data sink:

5d777236ed08fd369d717ab2;[5d777162ed08fd369d6f2cd5];5d7771d5ed08fd369d6f77d7;5d7771bded08fd369d6f63c0;Owner;;(1568108898709,9223372036854775807),(-9223372036854775808,9223372036854775807)

with the respective signature:

EdgeId;[GraphId(s)];SourceVertexId;TargetVertexId;Label;Properties;(tx-from,tx-to),(val-from,val-to)

Compatiblity with EPGM

The TPGM is mainly compatible to all operators of the EPGM, i.e., an operator like Subgraph can be easily applied on a TemporalGraph instance.

If you want to use EPGM operators that are not available on a TPGM graph, just call the .toLogicalGraph() or .toLogicalGraphCollection() function.

Example workflow

Below we provided an exemplary workflow of a typical temporal graph analysis.

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TemporalGradoopConfig config = TemporalGradoopConfig.createConfig(env);
TemporalCSVDataSource dataSource = new TemporalCSVDataSource("path/to/graph", temporalGradoopConfig);

TemporalGraph graph = dataSource.getTemporalGraph();

graph
  // Filter for necessary vertex and edge types
  .subgraph(
    new ByLabel<>("Agent").or(new ByLabel<>("Customer")
      .and(new ByProperty<>("city", PropertyValue.create("Istanbul")))), new ByLabel<>("calls"))
  // Extract a snapshot from historical graph information
  .snapshot(new CreatedIn(
    LocalDate.of(2018, Month.JANUARY, 1).atStartOfDay(), 
    LocalDate.of(2018, Month.DECEMBER, 31).atTime(LocalTime.MAX)))
  // Remove dangling edges
  .verify()
  // Apply a grouping with ROLL UP feature
  .groupEdgesByRollup(
    // Vertex grouping key's
    Arrays.asList(GroupingKeys.label(), GroupingKeys.property("city")),
    // Vertex aggregation functions
    Collections.singletonList(new Count()),
    // Edge grouping key's
    Arrays.asList(
      TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.MONTH_OF_YEAR),
      TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.MONTH_OF_YEAR),
      TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.ALIGNED_WEEK_OF_YEAR),
      TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.DAY_OF_YEAR)),
    // Edge aggregation functions
    Arrays.asList(new Count("cnt"), new AverageEdgeDuration()))
  .writeTo(new TemporalCSVDataSink("path/to/output", temporalGradoopConfig));

env.execute("My temporal analysis.");