Skip to content

algorithms

Algorithms for temporal path calculation and graph metrics.

The functions and submodules in this module allow to compute time-respecting or causal paths in temporal graphs and to calculate (temporal) and higher-order graph metrics like centralities.

Example
# Import pathpyG
import pathpyG as pp

# Generate a toy example for a temporal graph.
g = pp.TemporalGraph.from_edge_list([
    ('b', 'c', 2),
    ('a', 'b', 1),
    ('c', 'd', 3),
    ('d', 'a', 4),
    ('b', 'd', 2),
    ('d', 'a', 6),
    ('a', 'b', 7)
])

# Extract DAG capturing causal interaction sequences in temporal graph.
e_i = pp.algorithms.lift_order_temporal(g, delta=1)
dag = pp.Graph.from_edge_index(e_i)
print(dag)

# Calculate shortest time-respecting pathas
dist, pred = pp.algorithms.temporal.temporal_shortest_paths(g, delta=1)

RollingTimeWindow

An iterable rolling time window that can be used to perform time slice analysis of temporal graphs.

Source code in src/pathpyG/algorithms/rolling_time_window.py
class RollingTimeWindow:
    """An iterable rolling time window that can be used to perform time slice analysis of temporal graphs."""

    def __init__(self, temporal_graph, window_size, step_size=1, return_window=False, weighted=True):
        """Initialize RollingTimeWindow.

        Initialize a RollingTimeWindow instance that can be used to
        iterate through a sequence of time-slice networks for a given
        TemporalNetwork instance.

        Args:
            temporal_graph: TemporalGraphinstance that will be used to generate the
                sequence of time-slice networks.
            window_size: The width of the rolling time window used to create time-slice networks.
            step_size: The step size in time units by which the starting
                time of the rolling window will be incremented on each iteration.
            return_window: Whether or not the iterator shall return the current time window as a second return value. Default is False.
            weighted: Whether or not to return a weighted graph

        Example:
            ```py
            tedges = [('a', 'b', 1), ('b', 'c', 5), ('c', 'd', 9), ('c', 'e', 9),
              ('c', 'f', 11), ('f', 'a', 13), ('a', 'g', 18), ('b', 'f', 21),
              ('a', 'g', 26), ('c', 'f', 27), ('h', 'f', 27), ('g', 'h', 28),
              ('a', 'c', 30), ('a', 'b', 31), ('c', 'h', 32), ('f', 'h', 33),
              ('b', 'i', 42), ('i', 'b', 42), ('c', 'i', 47), ('h', 'i', 50)]
            t = pp.TemporalGraph.from_edge_list(tedges)
            r = pp.algorithms.RollingTimeWindow(t, 10, 10, return_window=True)
            for g, w in r:
                print('Time window ', w)
                print(g)
                print(g.data.edge_index)
                print('---')
            ```
        """
        self.g = temporal_graph
        self.window_size = window_size
        self.step_size = step_size
        self.current_time = self.g.start_time
        self.return_window = return_window
        self.weighted = weighted

    def __iter__(self):
        """Return the iterator object itself."""
        return self

    def __next__(self):
        """Return the next time-slice network in the rolling time window sequence."""
        if self.current_time <= self.g.end_time:
            time_window = (self.current_time, self.current_time + self.window_size)
            s = self.g.to_static_graph(weighted=self.weighted, time_window=time_window)
            self.current_time += self.step_size
            if self.return_window:
                return s, time_window
            else:
                return s
        else:
            raise StopIteration()

__init__

Initialize RollingTimeWindow.

Initialize a RollingTimeWindow instance that can be used to iterate through a sequence of time-slice networks for a given TemporalNetwork instance.

Parameters:

Name Type Description Default
temporal_graph

TemporalGraphinstance that will be used to generate the sequence of time-slice networks.

required
window_size

The width of the rolling time window used to create time-slice networks.

required
step_size

The step size in time units by which the starting time of the rolling window will be incremented on each iteration.

1
return_window

Whether or not the iterator shall return the current time window as a second return value. Default is False.

False
weighted

Whether or not to return a weighted graph

True
Example
tedges = [('a', 'b', 1), ('b', 'c', 5), ('c', 'd', 9), ('c', 'e', 9),
  ('c', 'f', 11), ('f', 'a', 13), ('a', 'g', 18), ('b', 'f', 21),
  ('a', 'g', 26), ('c', 'f', 27), ('h', 'f', 27), ('g', 'h', 28),
  ('a', 'c', 30), ('a', 'b', 31), ('c', 'h', 32), ('f', 'h', 33),
  ('b', 'i', 42), ('i', 'b', 42), ('c', 'i', 47), ('h', 'i', 50)]
t = pp.TemporalGraph.from_edge_list(tedges)
r = pp.algorithms.RollingTimeWindow(t, 10, 10, return_window=True)
for g, w in r:
    print('Time window ', w)
    print(g)
    print(g.data.edge_index)
    print('---')
Source code in src/pathpyG/algorithms/rolling_time_window.py
def __init__(self, temporal_graph, window_size, step_size=1, return_window=False, weighted=True):
    """Initialize RollingTimeWindow.

    Initialize a RollingTimeWindow instance that can be used to
    iterate through a sequence of time-slice networks for a given
    TemporalNetwork instance.

    Args:
        temporal_graph: TemporalGraphinstance that will be used to generate the
            sequence of time-slice networks.
        window_size: The width of the rolling time window used to create time-slice networks.
        step_size: The step size in time units by which the starting
            time of the rolling window will be incremented on each iteration.
        return_window: Whether or not the iterator shall return the current time window as a second return value. Default is False.
        weighted: Whether or not to return a weighted graph

    Example:
        ```py
        tedges = [('a', 'b', 1), ('b', 'c', 5), ('c', 'd', 9), ('c', 'e', 9),
          ('c', 'f', 11), ('f', 'a', 13), ('a', 'g', 18), ('b', 'f', 21),
          ('a', 'g', 26), ('c', 'f', 27), ('h', 'f', 27), ('g', 'h', 28),
          ('a', 'c', 30), ('a', 'b', 31), ('c', 'h', 32), ('f', 'h', 33),
          ('b', 'i', 42), ('i', 'b', 42), ('c', 'i', 47), ('h', 'i', 50)]
        t = pp.TemporalGraph.from_edge_list(tedges)
        r = pp.algorithms.RollingTimeWindow(t, 10, 10, return_window=True)
        for g, w in r:
            print('Time window ', w)
            print(g)
            print(g.data.edge_index)
            print('---')
        ```
    """
    self.g = temporal_graph
    self.window_size = window_size
    self.step_size = step_size
    self.current_time = self.g.start_time
    self.return_window = return_window
    self.weighted = weighted

__iter__

Return the iterator object itself.

Source code in src/pathpyG/algorithms/rolling_time_window.py
def __iter__(self):
    """Return the iterator object itself."""
    return self

__next__

Return the next time-slice network in the rolling time window sequence.

Source code in src/pathpyG/algorithms/rolling_time_window.py
def __next__(self):
    """Return the next time-slice network in the rolling time window sequence."""
    if self.current_time <= self.g.end_time:
        time_window = (self.current_time, self.current_time + self.window_size)
        s = self.g.to_static_graph(weighted=self.weighted, time_window=time_window)
        self.current_time += self.step_size
        if self.return_window:
            return s, time_window
        else:
            return s
    else:
        raise StopIteration()

WeisfeilerLeman_test

Run Weisfeiler-Leman isomorphism test on two graphs.

The algorithm heuristically checks whether two graphs are isomorphic. If it returns False, we can be sure that the graphs are non-isomoprhic. If the test returns True we did not find conclusive evidence that they are not isomorphic, i.e. the graphs may or may not be isomophic.

The two graphs must have IndexMap mappings that assign different node IDs to the nodes in both graphs. The function will raise an error if the node labels of both graphs overlap.

The function returns a tuple (bool, list, list), where the first entry is the result of the test and the two lists represent the fingerprints of the two graphs. If the test yields true the fingerprints are identical. If the test fails, the fingerprints do not correspond.

Parameters:

Name Type Description Default
g1 pathpyG.core.graph.Graph

First graph.

required
g2 pathpyG.core.graph.Graph

Second graph.

required
features_g1 dict | None

Optional initial node features for graph 1.

None
features_g2 dict | None

Optional initial node features for graph 2.

None
Source code in src/pathpyG/algorithms/weisfeiler_leman.py
def WeisfeilerLeman_test(
    g1: Graph, g2: Graph, features_g1: dict | None = None, features_g2: dict | None = None
) -> Tuple[bool, List[str], List[str]]:
    """Run Weisfeiler-Leman isomorphism test on two graphs.

    The algorithm heuristically checks whether two graphs are isomorphic. If it returns False,
    we can be sure that the graphs are non-isomoprhic. If the test returns True we did not find
    conclusive evidence that they are not isomorphic, i.e. the graphs may or may not be isomophic.

    The two graphs must have IndexMap mappings that assign different node IDs to the nodes
    in both graphs. The function will raise an error if the node labels of both graphs overlap.

    The function returns a tuple (bool, list, list), where the first entry is the result of the test
    and the two lists represent the fingerprints of the two graphs. If the test yields true the fingerprints
    are identical. If the test fails, the fingerprints do not correspond.

    Args:
        g1: First graph.
        g2: Second graph.
        features_g1: Optional initial node features for graph 1.
        features_g2: Optional initial node features for graph 2.
    """
    if g1.mapping is None or g2.mapping is None:
        raise Exception("Graphs must contain IndexMap that assigns node IDs")
    if len(set(g1.mapping.node_ids).intersection(g2.mapping.node_ids)) > 0:  # type: ignore[arg-type]
        raise Exception("node identifiers of graphs must not overlap")
    g_combined = g1 + g2
    # initialize labels of all nodes to zero
    if features_g1 is None or features_g2 is None:
        fingerprint: Dict[str | int, str] = {v: "0" for v in g_combined.nodes}
    else:
        fingerprint = features_g1.copy()
        fingerprint.update(features_g2)
    labels = {}
    label_count = 1
    stop = False
    while not stop:
        new_fingerprint = {}
        for node in g_combined.nodes:
            # create new label based on own label and sorted labels of all neighbors
            n_label = [fingerprint[x] for x in g_combined.successors(node)]
            n_label.sort()
            label = str(fingerprint[node]) + str(n_label)
            # previously unknown label
            if label not in labels:
                # create a new label based on next consecutive number
                labels[label] = label_count
                label_count += 1
            new_fingerprint[node] = labels[label]
        if len(set(fingerprint.values())) == len(set(new_fingerprint.values())):
            # we processed all nodes in both graphs without encountering a new label, so we stop
            stop = True
        else:
            # update fingerprint and continue
            fingerprint = new_fingerprint.copy()  # type: ignore[assignment]

    # Reduce fingerprints to nodes of g1 and g2 respectively
    fingerprint_1 = [fingerprint[v] for v in g1.nodes]
    fingerprint_1_sorted = fingerprint_1.copy()
    fingerprint_1_sorted.sort()
    fingerprint_2 = [fingerprint[v] for v in g2.nodes]
    fingerprint_2_sorted = fingerprint_2.copy()
    fingerprint_2_sorted.sort()

    # perform WL-test
    if fingerprint_1_sorted == fingerprint_2_sorted:
        return True, fingerprint_1, fingerprint_2
    return False, fingerprint_1, fingerprint_2

connected_components

Compute the connected components of a graph.

Parameters:

Name Type Description Default
graph pathpyG.core.graph.Graph

The input graph.

required
connection str

Type of connection to consider. Options are "weak" or "strong". Defaults to "weak".

'weak'

Returns:

Type Description
typing.Tuple[int, numpy.ndarray]

Tuple[int, np.ndarray]: A tuple containing the number of connected components and an array with component labels for each node.

Source code in src/pathpyG/algorithms/components.py
def connected_components(graph: Graph, connection="weak") -> Tuple[int, _np.ndarray]:
    """Compute the connected components of a graph.

    Args:
        graph (Graph): The input graph.
        connection (str, optional): Type of connection to consider. 
            Options are "weak" or "strong". Defaults to "weak".

    Returns:
        Tuple[int, np.ndarray]: A tuple containing the number of connected components and
            an array with component labels for each node.
    """
    m = graph.sparse_adj_matrix()
    n, labels = _cc(m, directed=graph.is_directed(), connection=connection, return_labels=True)
    return n, labels

largest_connected_component

Extract the largest connected component from a graph.

Parameters:

Name Type Description Default
graph pathpyG.core.graph.Graph

The input graph.

required
connection str

Type of connection to consider. Options are "weak" or "strong". Defaults to "weak".

'weak'

Returns:

Name Type Description
Graph pathpyG.core.graph.Graph

A new graph instance containing only the largest connected component.

Source code in src/pathpyG/algorithms/components.py
def largest_connected_component(graph: Graph, connection="weak") -> Graph:
    """Extract the largest connected component from a graph.

    Args:
        graph (Graph): The input graph.
        connection (str, optional): Type of connection to consider. 
            Options are "weak" or "strong". Defaults to "weak".

    Returns:
        Graph: A new graph instance containing only the largest connected component.
    """
    m = graph.sparse_adj_matrix()
    _, labels = _cc(m, directed=graph.is_directed(), connection=connection, return_labels=True)

    # find largest component C
    ctr = Counter(labels.tolist())
    x, _ = ctr.most_common(1)[0]
    # create graph only consisting of nodes in C
    C = []
    for v, w in graph.edges:
        if labels[graph.mapping.to_idx(v)] == x and labels[graph.mapping.to_idx(w)] == x:
            C.append((v, w))
    return Graph.from_edge_list(C, is_undirected=graph.is_undirected())

lift_order_temporal

Lift a temporal graph to a second-order temporal event graph.

Parameters:

Name Type Description Default
g pathpyG.core.temporal_graph.TemporalGraph

Temporal graph to lift.

required
delta float | int

Maximum time difference between events to consider them connected.

1

Returns:

Name Type Description
ho_index

Edge index of the second-order temporal event graph.

Source code in src/pathpyG/algorithms/temporal.py
def lift_order_temporal(g: TemporalGraph, delta: float | int = 1):
    """Lift a temporal graph to a second-order temporal event graph.

    Args:
        g: Temporal graph to lift.
        delta: Maximum time difference between events to consider them connected.

    Returns:
        ho_index: Edge index of the second-order temporal event graph.
    """
    # first-order edge index
    edge_index, timestamps = g.data.edge_index, g.data.time

    delta = torch.tensor(delta, device=edge_index.device)  # type: ignore[assignment]
    indices = torch.arange(0, edge_index.size(1), device=edge_index.device)

    unique_t = torch.unique(timestamps, sorted=True)
    second_order = []

    # lift order: find possible continuations for edges in each time stamp
    for t in tqdm(unique_t):
        # find indices of all source edges that occur at unique timestamp t
        src_time_mask = timestamps == t
        src_edge_idx = indices[src_time_mask]

        # find indices of all edges that can possibly continue edges occurring at time t for the given delta
        dst_time_mask = (timestamps > t) & (timestamps <= t + delta)
        dst_edge_idx = indices[dst_time_mask]

        if dst_edge_idx.size(0) > 0 and src_edge_idx.size(0) > 0:
            # compute second-order edges between src and dst idx
            # for all edges where dst in src_edges (edge_index[1, x[:, 0]]) matches src in dst_edges (edge_index[0, x[:, 1]])
            x = torch.cartesian_prod(src_edge_idx, dst_edge_idx)
            ho_edge_index = x[edge_index[1, x[:, 0]] == edge_index[0, x[:, 1]]]
            second_order.append(ho_edge_index)

    ho_index = torch.cat(second_order, dim=0).t().contiguous()
    return ho_index

temporal_shortest_paths

Compute shortest time-respecting paths in a temporal graph.

Parameters:

Name Type Description Default
g pathpyG.core.temporal_graph.TemporalGraph

Temporal graph to compute shortest paths on.

required
delta int

Maximum time difference between events in a path.

required

Returns:

Type Description
numpy.ndarray

Tuple of two numpy arrays:

numpy.ndarray
  • dist: Shortest time-respecting path distances between all first-order nodes.
typing.Tuple[numpy.ndarray, numpy.ndarray]
  • pred: Predecessor matrix for shortest time-respecting paths between all first-order nodes.
Source code in src/pathpyG/algorithms/temporal.py
def temporal_shortest_paths(g: TemporalGraph, delta: int) -> Tuple[np.ndarray, np.ndarray]:
    """Compute shortest time-respecting paths in a temporal graph.

    Args:
        g: Temporal graph to compute shortest paths on.
        delta: Maximum time difference between events in a path.

    Returns:
        Tuple of two numpy arrays:
        - dist: Shortest time-respecting path distances between all first-order nodes.
        - pred: Predecessor matrix for shortest time-respecting paths between all first-order nodes.
    """
    # generate temporal event DAG
    edge_index = lift_order_temporal(g, delta)

    # Add indices of first-order nodes as src and dst of paths in augmented
    # temporal event DAG
    src_edges_src = g.data.edge_index[0] + g.m
    src_edges_dst = torch.arange(0, g.data.edge_index.size(1), device=g.data.edge_index.device)

    dst_edges_src = torch.arange(0, g.data.edge_index.size(1), device=g.data.edge_index.device)
    dst_edges_dst = g.data.edge_index[1] + g.m + g.n

    # add edges from source to edges and from edges to destinations
    src_edges = torch.stack([src_edges_src, src_edges_dst])
    dst_edges = torch.stack([dst_edges_src, dst_edges_dst])
    edge_index = torch.cat([edge_index, src_edges, dst_edges], dim=1)

    # create sparse scipy matrix
    event_graph = Graph.from_edge_index(edge_index, num_nodes=g.m + 2 * g.n)
    m = event_graph.sparse_adj_matrix()

    # print(f"Created temporal event DAG with {event_graph.n} nodes and {event_graph.m} edges")

    # run disjktra for all source nodes
    dist, pred = dijkstra(
        m, directed=True, indices=np.arange(g.m, g.m + g.n), return_predecessors=True, unweighted=True
    )

    # limit to first-order destinations and correct distances
    dist_fo = dist[:, g.m + g.n :] - 1
    np.fill_diagonal(dist_fo, 0)

    # limit to first-order destinations and correct predecessors
    pred_fo = pred[:, g.n + g.m :]
    pred_fo[pred_fo == -9999] = -1
    idx_map = np.concatenate([to_numpy(g.data.edge_index[0].cpu()), [-1]])
    pred_fo = idx_map[pred_fo]
    np.fill_diagonal(pred_fo, np.arange(g.n))

    return dist_fo, pred_fo