Skip to content

TET Plot

TET Plots

TET(temp_edgelist, filepath='.', time_scale=None, network_name=None, add_frame=True, test_split=False, figsize=(9, 5), axis_title_font_size=20, ticks_font_size=20, show=True)

Generate TET plots Args: temp_edgelist: a dictionary of temporal edges or a dataset object. filepath: Path to save the TEA Plot. figsize: Size of the figure to save. axis_title_font_size: The font size of xis titles. ticks_font_size: Size of the text in the figure. add_frame: Add the frame to the plot. network_name: Name of the dataset to be used in the TEA plot file. time_scale: time_scale for discretizing data if already not done. test_split: Whether show the test split on the plot. max_time_scale: Maximum number of time_scale to discretize data. show: Whether to show the plot.

Source code in tgx/viz/TET.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def TET(temp_edgelist : Union[object, dict],
        filepath: Optional[str] = ".", 
        time_scale : Union[str, int] = None,
        network_name : str = None,
        add_frame : bool = True,
        test_split : bool = False,
        figsize : tuple = (9, 5),
        axis_title_font_size : int = 20,
        ticks_font_size : int = 20,
        show: bool = True):
    r"""
    Generate TET plots
    Args:
        temp_edgelist: a dictionary of temporal edges or a dataset object.
        filepath: Path to save the TEA Plot.
        figsize: Size of the figure to save.
        axis_title_font_size: The font size of xis titles.
        ticks_font_size: Size of the text in the figure.
        add_frame: Add the frame to the plot.
        network_name: Name of the dataset to be used in the TEA plot file.
        time_scale: time_scale for discretizing data if already not done.
        test_split: Whether show the test split on the plot.
        max_time_scale: Maximum number of time_scale to discretize data.
        show: Whether to show the plot.
    """
    if isinstance(temp_edgelist, object):
        if temp_edgelist.freq_data is None:
            temp_edgelist.count_freq()
        temp_edgelist = temp_edgelist.freq_data

    # check number of unique timestamps:
    unique_ts = list(temp_edgelist.keys())
    # if len(unique_ts) > max_time_scale:
    #     inp = input(f"There are {unique_ts} timestamps in the data.\nDo you want to discretize the data to 1000 timestamps?(y/n)").lower()
    #     if inp == "y":
    #         temp_edgelist = edgelist_discritizer(temp_edgelist,
    #                                             unique_ts,
    #                                             time_scale = max_time_scale)
    if time_scale is not None:
        temp_edgelist = discretize_edges(temp_edgelist,
                                        time_scale = time_scale)

    edge_last_ts = generate_edge_last_timestamp(temp_edgelist)
    edge_idx_map = generate_edge_idx_map(temp_edgelist, edge_last_ts)
    idx_edge_map = {v: k for k, v in edge_idx_map.items()}  # key: edge index; value: actual edge (source, destination)
    print("Info: Number of distinct edges (from index-edge map): {}".format(len(idx_edge_map)))

    unique_ts_list = list(temp_edgelist.keys())
    e_presence_mat = generate_edge_presence_matrix(unique_ts_list, idx_edge_map, edge_idx_map, temp_edgelist)
    print("Info: edge-presence-matrix shape: {}".format(e_presence_mat.shape))
    # print(np.unique(e_presence_mat, return_counts=True))
    e_presence_mat, test_split_ts_value = process_presence_matrix(e_presence_mat, test_ratio_p=0.85)
    print("Info: edge-presence-matrix shape: {}".format(e_presence_mat.shape))
    # print(np.unique(e_presence_mat, return_counts=True))
    fig_param = set_fig_param(network_name, 
                              fig_name = filepath,
                              figsize = figsize,
                              axis_title_font_size = axis_title_font_size,
                              ticks_font_size = ticks_font_size)

    plot_edge_presence_matrix(e_presence_mat, test_split_ts_value, unique_ts_list, list(idx_edge_map.keys()),
                              fig_param, test_split = test_split, add_frames=add_frame, show=show)
    return 

generate_edge_idx_map(edges_per_ts, edge_last_ts)

generates index for edges according to two-level sorting policy: 1. the first level is based on their first appearance timestamp 2. the second level is based on their last appearance timestamp

Source code in tgx/viz/TET.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def generate_edge_idx_map(edges_per_ts, edge_last_ts):
    """
    generates index for edges according to two-level sorting policy:
    1. the first level is based on their first appearance timestamp
    2. the second level is based on their last appearance timestamp
    """
    edge_idx_map = {}  # key: actual edge (source, destination), value: edge index
    distinct_edge_idx = 0
    for ts, ts_e_list in edges_per_ts.items():
        e_last_ts_this_timestamp = {}
        for e in ts_e_list:
            e_last_ts_this_timestamp[e] = edge_last_ts[e]
        e_last_ts_this_timestamp = dict(sorted(e_last_ts_this_timestamp.items(), key=lambda item: item[1]))
        for e in e_last_ts_this_timestamp:
            if e not in edge_idx_map:
                edge_idx_map[e] = distinct_edge_idx
                distinct_edge_idx += 1

    return edge_idx_map

generate_edge_last_timestamp(edges_per_ts)

generates a dictionary containing the last timestamp of each edge

Source code in tgx/viz/TET.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
def generate_edge_last_timestamp(edges_per_ts):
    """generates a dictionary containing the last timestamp of each edge"""
    edge_last_ts = {}
    for ts, e_list in edges_per_ts.items():
        for e in e_list:
            if e not in edge_last_ts:
                edge_last_ts[e] = ts
            else:
                edge_last_ts[e] = max(ts, edge_last_ts[e])
    return edge_last_ts

generate_edge_presence_matrix(unique_ts_list, idx_edge_map, edge_idx_map, edges_per_ts)

Returns presence matrix with values 0 and 1 which indicate: value = 0 : edge is not present in this timestamp value = 1 : edge is present in this timestamp

shape: (ts, total number of edges)

Source code in tgx/viz/TET.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def generate_edge_presence_matrix(unique_ts_list, idx_edge_map, edge_idx_map, edges_per_ts):
    '''
    Returns presence matrix with values 0 and 1 which indicate:
    value = 0 : edge is not present in this timestamp
    value = 1 : edge is present in this timestamp

    shape: (ts, total number of edges)
    '''
    num_unique_ts = len(unique_ts_list)
    num_unique_edge = len(idx_edge_map)
    e_presence_mat = np.zeros([num_unique_ts, num_unique_edge], dtype=np.int8)
    unique_ts_list = np.sort(unique_ts_list)

    for x, ts in tqdm(enumerate(unique_ts_list)):
        es_ts = edges_per_ts[ts]
        for e in es_ts:
            e_presence_mat[num_unique_ts - x - 1, edge_idx_map[e]] = E_PRESENCE_GENERAL

    return e_presence_mat

process_presence_matrix(e_presence_matrix, test_ratio_p)

there are 4 types of edge presence: 1. only in train 2. in train and in test 3. in test and train (which is the number 2 but in later timestamps) 4. only in test X: timestamp Y: edge index

Source code in tgx/viz/TET.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def process_presence_matrix(e_presence_matrix, test_ratio_p):
    """
    there are 4 types of edge presence:
    1. only in train
    2. in train and in test
    3. in test and train (which is the number 2 but in later timestamps)
    4. only in test
    X: timestamp
    Y: edge index
    """
    num_unique_ts = e_presence_matrix.shape[0]
    num_unique_edges = e_presence_matrix.shape[1]
    ts_idx_list = [i for i in range(num_unique_ts)]

    # generating timestamp list for train and test:
    test_split_ts_value = int(np.quantile(ts_idx_list, test_ratio_p))
    train_ts_list = [ts for ts in ts_idx_list if ts <= test_split_ts_value]  # any timestamp in train/validation split
    test_ts_list = [ts for ts in ts_idx_list if ts > test_split_ts_value]  # test_split_ts_value is in train

    # first level processing: differentiate train set edges: 1) Only in train set, 2) in train & test set
    print("First level processing: ")
    print("Detecting edges present in train & test sets")
    for tr_ts in tqdm(train_ts_list):
        for eidx in range(num_unique_edges):
            if e_presence_matrix[num_unique_ts - tr_ts - 1, eidx] == E_PRESENCE_GENERAL:
                for test_ts_idx in range(test_split_ts_value + 1, num_unique_ts):
                    if e_presence_matrix[num_unique_ts - test_ts_idx - 1, eidx] == E_PRESENCE_GENERAL:  # if seen in
                        # the test set
                        e_presence_matrix[num_unique_ts - tr_ts - 1, eidx] = E_TRAIN_AND_TEST
                        break

    # differentiate test set edges: 1) transductive (seen in train, repeating in test), 2) inductive (only in test)
    print("Detecting transductive edges (seen in train, repeating in test)")
    for ts in tqdm(test_ts_list):
        for eidx in range(num_unique_edges):
            if e_presence_matrix[num_unique_ts - ts - 1, eidx] == E_PRESENCE_GENERAL:
                for prev_ts_idx in range(test_split_ts_value, -1, -1):
                    if e_presence_matrix[num_unique_ts - prev_ts_idx - 1, eidx] == E_TRAIN_AND_TEST:  # if seen in
                        # the training set
                        e_presence_matrix[num_unique_ts - ts - 1, eidx] = E_TRANSDUCTIVE
                        break

    # second level processing
    print("Second level processing:")
    print("Detecting edges 1) Only in train set, 2) only in test (inductive)")
    for ts in tqdm(range(num_unique_ts)):
        for eidx in range(num_unique_edges):
            if ts <= test_split_ts_value:
                if e_presence_matrix[num_unique_ts - ts - 1, eidx] == E_PRESENCE_GENERAL:
                    e_presence_matrix[num_unique_ts - ts - 1, eidx] = E_ONLY_TRAIN
            else:
                if e_presence_matrix[num_unique_ts - ts - 1, eidx] == E_PRESENCE_GENERAL:
                    e_presence_matrix[num_unique_ts - ts - 1, eidx] = E_INDUCTIVE

    return e_presence_matrix, test_split_ts_value