Skip to content

Graph Utils

Graph Utils

discretize_edges(edgelist, time_scale, store_unix=False, freq_weight=False)

util function for discretizing edgelist, expected timestamp on edges are unixtimestamp this func supports discretization of edge timestamp 1. by providing the number of intervals (int), it will equally divide the data into that number of intervals. Note that the last bin can have less duration than others. 2. by providing a time granularity (str), it will divide the data into intervals based on the given granularity, i.e. "hourly", "daily", "weekly", "monthly", "yearly", the starting time of the dataset is consider the start of the first interval Parameters: edgelist: dict, dictionary of edges time_scale: int or str, time interval to discretize the graph store_unix: bool, whether to return the converted timestamps in unix format freq_weight: bool, whether to weight the edges based on their frequency Returns: output list: the first item in the list is always the updated edgelist (dict, dictionary of edges with discretized timestamps) and the second item is the converted timestamps in unix format (list) if store_unix is True

Source code in tgx/utils/graph_utils.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def discretize_edges(edgelist: dict,
                    time_scale: Union[int,str],
                    store_unix: Optional[bool] = False,
                    freq_weight: Optional[bool] = False) -> list:
    """
    util function for discretizing edgelist, expected timestamp on edges are unixtimestamp
    this func supports discretization of edge timestamp 
    1. by providing the number of intervals (int), it will equally divide the data into that number of intervals. Note that the last bin can have less duration than others.
    2. by providing a time granularity (str), it will divide the data into intervals based on the given granularity, i.e. "hourly", "daily", "weekly", "monthly", "yearly", the starting time of the dataset is consider the start of the first interval
    Parameters:
        edgelist: dict, dictionary of edges
        time_scale: int or str, time interval to discretize the graph
        store_unix: bool, whether to return the converted timestamps in unix format
        freq_weight: bool, whether to weight the edges based on their frequency
    Returns:
        output list: the first item in the list is always the updated edgelist (dict, dictionary of edges with discretized timestamps) and the second item is the converted timestamps in unix format (list) if store_unix is True
    """
    unique_ts = list(edgelist.keys())        
    total_time = unique_ts[-1] - unique_ts[0]

    if time_scale is not None:
        if isinstance(time_scale, int):
            interval_size = total_time // time_scale  #integer timestamp of the bin, discounting any bin that has a smaller duration than others
        elif isinstance(time_scale, str): 
            if time_scale == "minutely":
                interval_size = SEC_IN_MIN
            elif time_scale == "hourly":
                interval_size = SEC_IN_HOUR
            elif time_scale == "daily":
                interval_size = SEC_IN_DAY
            elif time_scale == "weekly":
                interval_size = SEC_IN_WEEK
            elif time_scale == "monthly":
                interval_size = SEC_IN_MONTH
            elif time_scale == "yearly":
                interval_size = SEC_IN_YEAR
            elif time_scale == "biyearly":
                interval_size = SEC_IN_BIYEARLY
        else:
            raise TypeError("Invalid time interval")
    else:
        raise TypeError("Please provide a time interval")

    num_time_scale = ceiling_division(total_time, interval_size)    
    print(f'Discretizing data to {num_time_scale} timestamps...')

    updated_edgelist = {}

    if (store_unix):
        unix_dict = []
        start_time = int(unique_ts[0])

    for ts, edges_list in edgelist.items():
        #? no longer assume ts start with 0
        bin_ts = ceiling_division(ts-start_time, interval_size)  #will correctly put edges into the last bin

        for edge in edges_list:
            if bin_ts not in updated_edgelist:
                updated_edgelist[bin_ts] = {edge: 1}
            else:
                if (not freq_weight):
                    updated_edgelist[bin_ts][edge] = 1
                else:
                    if (edge in updated_edgelist[bin_ts]):
                        updated_edgelist[bin_ts][edge] += 1
                    else:
                        updated_edgelist[bin_ts][edge] = 1

        if (store_unix):
            #! should use bin_ts here
            unix_ts = start_time + bin_ts * interval_size

            # unix_ts = start_time + int(ts // interval_size) * interval_size #round to the nearest start time
            unix_ts = int(unix_ts)
            unix_dict.extend([unix_ts] * len(edges_list))

    output = [updated_edgelist]
    if (store_unix):
        output.append(unix_dict)
    return output

is_discretized(edgelist, max_timestamps=10000)

Check if an edgelist is discretized or not.

Source code in tgx/utils/graph_utils.py
219
220
221
222
223
224
225
226
227
228
229
def is_discretized(edgelist: Optional[dict],
                   max_timestamps: Optional[int] = 10000) -> bool:
    r"""
    Check if an edgelist is discretized or not.
    """
    timestamps = list(edgelist.keys())
    discretized = True
    if len(timestamps) > max_timestamps:
        discretized = False

    return discretized

node_list(dict_edgelist)

create a list of nodes from edgelist dictionary

Source code in tgx/utils/graph_utils.py
165
166
167
168
169
170
171
172
173
174
175
176
177
def node_list(dict_edgelist: dict) -> list:

    """
    create a list of nodes from edgelist dictionary
    """
    node_list = {}
    for _, edge_data in dict_edgelist.items():
        for (u,v), _ in edge_data.items():
            if u not in node_list:
                node_list[u] = 1
            if v not in node_list:
                node_list[v] = 1
    return list(node_list.keys())

subsampling(graph, node_list=[], selection_strategy='random', N=100)

Subsampling a part of graph by only monitoring the contacts from specific nodes' list

Parameters:

Name Type Description Default
graph object

graph object

required
node_list Optional[list]

list, a set of nodes to extract their contacts from the graph

[]
selection_strategy str

str, currently supports random sampling

'random'
N Optional[int]

int, number of nodes to be randomly sampled from graph

100

Returns:

Name Type Description
new_edgelist dict

dict, a dictionary of edges corresponding to nodes in the node_list

Source code in tgx/utils/graph_utils.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def subsampling(graph: object, 
                node_list: Optional[list] = [], 
                selection_strategy: str = "random", 
                N: Optional[int] = 100
                ) -> dict:
    """
    Subsampling a part of graph by only monitoring the contacts from specific nodes' list

    Parameters:
        graph: graph object
        node_list: list, a set of nodes to extract their contacts from the graph
        selection_strategy: str, currently supports random sampling
        N: int, number of nodes to be randomly sampled from graph

    Returns:
        new_edgelist: dict, a dictionary of edges corresponding to nodes in the node_list
    """
    print("Generate graph subsample...")
    edgelist = graph.data
    nodes = graph.nodes_list()

    if (len(node_list) == 0): #decide on selection strategy if nodelist not provided
        if (selection_strategy == "random"):
            node_list = list(np.random.choice(nodes, size = N, replace = False))
        else:
            raise ValueError("Selection strategy not supported", selection_strategy)

    new_edgelist = {}
    for t, edge_data in edgelist.items():
                for (u,v), f in edge_data.items():
                    if u in node_list or v in node_list:
                        if t not in new_edgelist:
                            new_edgelist[t] = {}
                            new_edgelist[t][(u, v)] = f
                        else:
                            new_edgelist[t][(u, v)] = f
    return new_edgelist

train_test_split(data, val=False, ratio=[85, 15])

Generate train/test split for the data

Parameters:

Name Type Description Default
data dict

dictionary of data

required
val bool

whether we want to have a validation split as well

False
ratio list

list indication the ratio of the data in split. Sum of the list components should be 100.

[85, 15]

Returns:

Type Description
dict

two (train/test) or three (train/val/test) data dictionaries

Source code in tgx/utils/graph_utils.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def train_test_split(data : dict, 
                     val : bool = False,
                     ratio : list = [85, 15]) -> dict:
    """
    Generate train/test split for the data

    Parameters:
        data:dictionary of data
        val: whether we want to have a validation split as well
        ratio: list indication the ratio of the data in split. Sum of the list components should be 100.

    Returns:
        two (train/test) or three (train/val/test) data dictionaries
    """
    sum = 0
    for i in ratio:
        sum += i
    if sum != 100:
        raise ValueError("invalid train/test split ratio. Sum of the ratios should be 100.")

    if val and len(ratio) != 3:
        raise Exception("Provide train/val/test ratio")
    elif not val and len(ratio) == 3:
        print("Warning! Data is being splitted to train and test only!")

    data_len = len(data)
    train_split = int(data_len * ratio[0] / 100)
    train_data = {k: v for k, v in data.items() if k < train_split}
    if val:
        val_split = int(data_len * ratio[1] / 100) + train_split
        val_data = {k: v for k, v in data.items() if train_split <= k < val_split}
        test_data = {k: v for k, v in data.items() if val_split <= k <= data_len}
        return train_data, val_data, test_data

    else:
        test_data = {k: v for k, v in data.items() if train_split <= k <= data_len}
        return train_data, test_data