Skip to content

graphtool

parse_graphtool_format

Decodes data in graphtool binary format and returns a Graph. For a documentation of the graphtool binary format, see see doc at https://graph-tool.skewed.de/static/doc/gt_format.html

Parameters:

Name Type Description Default
data bytes

Array of bytes to be decoded

required

Returns:

Name Type Description
Graph pathpyG.core.graph.Graph

a static graph

Source code in src/pathpyG/io/graphtool.py
def parse_graphtool_format(data: bytes, id_node_attr=None) -> Graph:
    """
    Decodes data in graphtool binary format and returns a [`Graph`][pathpyG.Graph]. For a documentation of
    the graphtool binary format, see see doc at https://graph-tool.skewed.de/static/doc/gt_format.html

    Args:
        data: Array of bytes to be decoded

    Returns:
        Graph: a static graph
    """

    # check magic bytes
    if data[0:6] != b"\xe2\x9b\xbe\x20\x67\x74":
        print("Invalid graphtool file. Wrong magic bytes.")
        raise Exception("Invalid graphtool file. Wrong magic bytes.")
    ptr = 6

    # read graphtool version byte
    graphtool_version = int(data[ptr])
    ptr += 1

    # read endianness
    if bool(data[ptr]):
        graphtool_endianness = ">"
    else:
        graphtool_endianness = "<"
    ptr += 1

    # read length of comment
    str_len = struct.unpack(graphtool_endianness + "Q", data[ptr : ptr + 8])[0]
    ptr += 8

    # read string comment
    comment = data[ptr : ptr + str_len].decode("ascii")
    ptr += str_len

    # read network directedness
    directed = bool(data[ptr])
    ptr += 1

    # read number of nodes
    n_nodes = struct.unpack(graphtool_endianness + "Q", data[ptr : ptr + 8])[0]
    ptr += 8

    # create pandas dataframe
    network_dict = {}
    # n = Network(directed = directed, multiedges=True)

    # determine binary representation of neighbour lists
    if n_nodes < 2**8:
        fmt = "B"
        d = 1
    elif n_nodes < 2**16:
        fmt = "H"
        d = 2
    elif n_nodes < 2**32:
        fmt = "I"
        d = 4
    else:
        fmt = "Q"
        d = 8

    sources = []
    targets = []
    # parse lists of out-neighbors for all n nodes
    n_edges = 0
    for v in range(n_nodes):
        # read number of neighbors
        num_neighbors = struct.unpack(graphtool_endianness + "Q", data[ptr : ptr + 8])[0]
        ptr += 8

        # add edges to record
        for _ in range(num_neighbors):
            w = struct.unpack(graphtool_endianness + fmt, data[ptr : ptr + d])[0]
            ptr += d
            sources.append(v)
            targets.append(w)
            n_edges += 1

    # collect attributes from property maps
    graph_attr = dict()
    node_attr = dict()
    edge_attr = dict()

    # parse property maps
    property_maps = struct.unpack(graphtool_endianness + "Q", data[ptr : ptr + 8])[0]
    ptr += 8

    for _ in range(property_maps):
        key_type = struct.unpack(graphtool_endianness + "B", data[ptr : ptr + 1])[0]
        ptr += 1

        property_len = struct.unpack(graphtool_endianness + "Q", data[ptr : ptr + 8])[0]
        ptr += 8

        property_name = data[ptr : ptr + property_len].decode("ascii")
        ptr += property_len

        property_type = struct.unpack(graphtool_endianness + "B", data[ptr : ptr + 1])[0]
        ptr += 1

        if key_type == 0:  # graph-level property
            res = _parse_property_value(data, ptr, property_type, graphtool_endianness)
            graph_attr[property_name] = res[0]
            ptr += res[1]
        elif key_type == 1:  # node-level property
            if property_name not in node_attr:
                node_attr[property_name] = []
            for v in range(n_nodes):
                res = _parse_property_value(data, ptr, property_type, graphtool_endianness)
                node_attr[property_name].append([res[0]])
                ptr += res[1]
        elif key_type == 2:  # edge-level property
            if property_name not in edge_attr:
                edge_attr[property_name] = []
            for e in range(n_edges):
                res = _parse_property_value(data, ptr, property_type, graphtool_endianness)
                edge_attr[property_name].append(res[0])
                ptr += res[1]
        else:
            print("Unknown key type {0}".format(key_type))

    # LOG.info('Version \t= {0}'.format(graphtool_version))
    # LOG.info('Endianness \t= {0}'.format(graphtool_endianness))
    # LOG.info('comment size \t= {0}'.format(str_len))
    # LOG.info('comment \t= {0}'.format(comment))
    # LOG.info('directed \t= {0}'.format(directed))
    # LOG.info('nodes \t\t= {0}'.format(n_nodes))

    # add edge properties to data frame
    # for p in edge_attribute_names:
    #     # due to use of default_dict, this will add NA values to edges which have missing properties
    #     network_data[p] = [ edge_attributes[e][p] for e in range(n_edges) ]

    # create graph from pandas dataframe

    # if 'time' in edge_attribute_names and not ignore_temporal:
    #     raise Exception('')
    #     n = to_temporal_network(network_data, directed=directed, **network_attributes)
    # else:

    if id_node_attr:
        mapping = pp.IndexMap(node_attr[id_node_attr])
    else:
        mapping = None

    g = Graph.from_edge_index(torch.LongTensor([sources, targets]).to(config["torch"]["device"]), mapping=mapping)
    for a in node_attr:
        if not a.startswith("node_"):
            # print(node_attr[a])
            # g.data['node_{0}'.format(a)] = torch.tensor(node_attr[a], dtype=torch.float).to(config['torch']['device'])
            g.data["node_{0}".format(a)] = node_attr[a]
    for a in edge_attr:
        if not a.startswith("edge_"):
            g.data["edge_{0}".format(a)] = torch.tensor(edge_attr[a], dtype=torch.float).to(config["torch"]["device"])
    for a in graph_attr:
        g.data[a] = graph_attr[a]

    if not directed:
        return g.to_undirected()
    return g

read_graphtool

Read a file in graphtool binary format.

Parameters:

Name Type Description Default
file str

Path to graphtool file to be read

required
Source code in src/pathpyG/io/graphtool.py
def read_graphtool(file: str, multiedges: bool = False) -> Graph:
    """
    Read a file in graphtool binary format.

    Args:
        file: Path to graphtool file to be read
    """
    with open(file, "rb") as f:
        if ".zst" in file:
            try:
                import zstandard as zstd

                dctx = zstd.ZstdDecompressor()
                data = f.read()
                return parse_graphtool_format(dctx.decompress(data, max_output_size=len(data)))
            except ModuleNotFoundError:
                msg = 'Package zstandard is required to decompress graphtool files. Please install module, e.g., using "pip install zstandard".'
                # LOG.error(msg)
                raise Exception(msg)
        else:
            return parse_graphtool_format(f.read(), multiedges)