Introduction
Many relationships between real-world entities can be modelled as graph data, and hidden relationships between those entities can be identified by graph processing. Community detection is one such application that plays a vital role in finding complex or hidden structural relationships. But the performance and scalability of graph processing applications remains a challenging issue because many algorithms, including community detection are NP-hard.
Stochastic block partition (SBP) [1], is a probabilistic al-gorithm for community detection, presented as part of IEEE HPEC GraphChallenge. In our prior work [2]–[4], we developed several techniques to improve the performance of SBP. Our prior work, though effective, still struggled to achieve high performance due to challenges in parallelizing critical parts of the SBP algorithm. In this paper, we further improve the computational performance of SBP by improving the internal data structure and parallelism model.
The contributions of our work are as follows:
The design and implementation of optimization techniques for parallel SBP in C and Python that include (1) the use of a novel lock-free shared-memory compressed data structure for the internal bookkeeping of the algorithm, (2) a method for separating read and write phases during nodal movements, and (3) the use of an efficient batch size to balance parallelism in each worker against the overhead and convergence rate of the algorithm.
A study that evaluates the performance of the above SBP optimization techniques against the private copy, buffered update parallelism model used in our prior work. Our results show a speedup of 5.38x on a lOOk node graph compared to our previous approach. Our implementation also achieves better scalabilty with a large number work-ers' and a much smaller memory footprint.
The rest of this paper is organized as follows. Section II presents background information pertaining to the stochastic block partition algorithm. We describe our approach to op-timizing stochastic block partition in Section III. Section IV presents a performance evaluation of the optimization techniques. Finally, concluding remarks and future work are provided in Section V.
Background
The stochastic block partition (SBP) algorithm is a proba-bilistic algorithm for performing community detection which uses a generative statistical model based based on work by Peixoto [5]–[7] and builds on the work from Karrer and Newman [8]. The algorithm uses a Markov Chain Monte Carlo (MCMC) method in the spirit of the Metropolis-Hastings algorithm [9], [10].
The number of blocks and assignment of each node to a block is not known ahead of time. The algorithm finds both by using an entropy measurement function to assess the quality of partitioning at each number of blocks. It compares the entropies at different numbers of blocks, ultimately bracketing the optimal number.
To actually find the optimal partitioning at a given number of blocks, SBP performs an agglomerative merge where two existing blocks of vertices are merged together, repeatedly and greedily, until the target number of blocks is reached. Once the target number of blocks is reached, the algorithm switches to a MCMC nodal movement phase. Here each vertex is proposed to be moved from its current community to another community with some probability based on the resulting change in entropy. Our previous work has shown that this nodal movement phase dominates the runtime of the algorithm and is difficult to parallelize [2], [3].
Method
In this section, we describe our techniques for improving parallel SBP. Our previous work [4] focused on devising a compressed data structure for efficient computations during SBP, dynamically adjusting the amount of parallelism, and reducing the amount of data to be processed by aggressively merging blocks. Since that work, we have made significant enhancements including moving the heavyweight portions of our algorithm from Python to native C code.
However, the techniques presented in this paper are or-thogonal to the above mentioned enhancements and focus on changes to the internal data structure and synchronization model to improve shared-memory parallelism during the MCMC nodal movement phase.
A. Source of Nodal Update Contention
The internal state of the SBP algorithm is centered around two entities: a partition array, and an interblock edge count matrix. The partition array simply maps each vertex id to a community id. The interblock edge count matrix is derived from the partition array and keeps track of the counts of edges from every community to every other community. When a vertex u is moved from one block
These MCMC nodal movements take around 90 percent of the baseline runtime of the algorithm for large-sized graphs, as we reported in our previous work. In our previous implementation of SBP on large-sized graphs, we also observed that there was no overall program speedup when using more than 8 threads for nodal movements. To understand why parallelizing SBP's nodal movements is difficult, consider the naive parallel algorithm shown in Algorithm 1.
In order to propose a nodal movement, the current state of the partition array in the neighborhood of the node index is read. But these neighboring vertices may be moved by other workers. Writing to and reading from this shared state without some form of synchronization can cause corruption that can crash the algorithm. An simple, but ineffective approach is to take a single lock around both reading from and writing to the shared state.
Our prior work solved this issue by buffering nodal movements generated the worker threads, and having a single thread combine all recent updates into global state updates periodically refreshed by each worker, as shown in Algorithm 2. But this approach is only partially effective because while the proposals based on current state can be generated in parallel, the actual updates to global state are still serialized.
B. Split-Phase Nodal Updates
To address this data dependency issue, we have devised an approach based on carefully examining the data flow in the SBP algorithm. We observed that updates to shared global state primarily in the interblock edge count matrix - are commutative addition and subtraction operations. Since these operations are order independent, the instructions needed to update this state need only to have atomicity, not synchronization, and can be executed in parallel by every worker thread. Furthermore, generating proposed updates only requires reading from shared state, not writing. As a result, preventing hazards in this situation only requires that no writers be active while proposals are being generated. Combining these two insights, we devised an approach that splits the parallel nodal move procedure into two phases - a read phase to generate proposed movements and a write phase to carry out the accepted movements. There is a simple barrier in between the phases to provide synchro-nization. Our improved approach is shown in Algorithm 3, which we refer to as decontentioned parallel nodal movement.
Note that our approach does not entirely eliminate the need for locks. Computing the correct update to shared state while moving a node requires looking at the current partition array at that node's index, as well as the current partition values of each of a node's neighbors. These cannot be modified while reading and still require a lock.
Our split-phase approach has an additional major advantage over the buffered approach in Algorithm 2. The buffered approach requires each worker to have its own private copy of the interblock edge count matrix. This is prohibitively expensive for large sized graphs, even using a compressed representation.
Algorithm 1 Naive Parallel Nodal Movement
/* Each worker operates on a range of vertices. * /
parallelNodalMovementNaive (start_ vert, stop_ vert) {
for ni in range(start_ vert, stop_ vert) {
lock.acquire()
/* propose_node_movement reads different entries from the partition array and interblock_edge_cnt and returns the new rows and cols it used to compute the acceptance probability. */
result = propose_movement(G, ni, partition, interblock_edge_cnt)
r,s,p_accept,new_rows_cols = result
if (proposal accepted) {
partition[ni] = s
interblock_edge_cnt[r,:] = new_rows_cols[O]
interblock_edge_cnt[s,:] = new _rows_cols[l]
interblock_edge_cnt[:,r] = new_rows_cols[2]
interblock_edge_cnt[.,s] = new_rows_cols [3]
}
lock.release()
}
}
Algorithm 2 Buffered Parallel Nodal Movement
/* Range of start_ vert to stop_ vert is batch size. */
parallelNodalMovementBuffered (start_ vert, stop_ vert {
/* Check for updates from central worker and copy changes. */
lock.acquire()
partition_local[:] = partition[:]
for i in modified_blocks {
interblock_edge_cnt_local[i,:] = interblock_edge_cnt[i,:]
interblock_edge_cnt_local[:, i] = interblock_edge_cnt[:, i]
}
lock.release()
for ni in range(start_vert, stop_vert) {
proposal = propose_movement(G, ni, partition_local, interblock_edge_cnt_local)
r,s,p_accept,new _rows_cols = proposal
if (proposal accepted) {
Buffer ni,r,s into results.
}
}
Send results to central worker.
/* Prepare to be called again with the next batch. * /
}
C. Lock-Free Compressed Data Structure
U sing lock - free instructions to update the interblock edge count matrix is straightforward when this matrix is stored in a dense 2D-array. But this simple representation is not suitable for large sized graphs. At the beginning of SBP, each vertex is assigned to its own community, and thus the initial density of the interblock edge count matrix is very sparse (e.g., 4.1e-4 and 2e-4 for the 50k and 100k node graphs used in our work, respectively). Our previous work focused on devising a compressed representation of the interblock edge count matrix to greatly reduce the amount of storage space needed during processing. Our compressed data structure uses hash tables - one along each axis - to store the interblock edge counts. The tables along each axis are needed because the SBP needs to take slices along both dimensions in order to formulate the entropy changes that would result from block merges and nodal movements.
Implementing a lock-free design is desirable to improve parallelism, but is much harder than with a dense array, especially when hash tables must be re-sized. We have devised a hash table design that uses compare-and-swap (CAS) and double compare-and-swap (DCAS) instructions suitable for updating and resizing in parallel.
Our hash tables implementation uses a linear probing to resolve collisions, that is, first jumping to a slot based on hashing the key, and then scanning until the first unused slot is found. An insertion attempts to CAS a new entry into the first apparently empty slot after hashing the input key. If the CAS fails (i.e. another worker inserted into that open slot first), the next available slot is attempted. Once an insert succeeds, a resizing operation may be needed. Here we make use of a dif-ferential reference counting scheme similar to [11] to manage resizing without locks. This scheme uses a pointer to an outer structure that contains a reference counter and a pointer to an internal structure. The internal structure contains the pointer to the hash table itself, and another reference counter. A writer will DCAS the external structure, incrementing the reference counter, and incrementing the internal reference counter when done. Resizes can be done atomically by swapping in both the new external pointer, and the newly-reset external counter together. If a worker sees that a resize is needed, it will allocate a new, bigger, hash table and attempt to swap it in place. Whichever worker wins the race to swap the external structure will then adjust the internal counter, and merge all of the old entries into the new table. We additionally wrote a shared-memory lock-free memory allocator that uses queues of memory pools so hash table resizing can be done entirely without locks.
Algorithm 3 Decontentioned Parallel Nodal Movement
/* Range of start_ vert to stop _ vert is batch size. * /
parallelNodalMovementDecontentioned (start_ vert, stop_ vert) {
for ni in range(start_ vert, stop_ vert) {
/* Proposals are generated without taking locks. * /
result = propose_movement(G, ni, partition, interblock_edge_cnt)
if (proposal accepted) { enqueue(Q, proposal) }
}
barrier()
while(Q) {
ni,r,s,new_rows_cols = dequeue(Q)
lock.acquire()
read partition[j] for j in neighbors of ni
Compute block ids and edge counts from vertex neighbors into b_out, count_out, b_in, and count_in.
partition[ni] = s
lock.release()
/* Update shared state in place with no locks */
for i in range(len(b _ out)) {
interblock_edge_cnt[[r, b_out[i]] -= count_out[i]
interblock_edge_cnt[[s, b_out[i]] += count_out[i]
}
for i in range(len(b_in)) {
interblock_edge_cnt[[b_in[i], r] -= count_in[i]
interblock_edge_cnt[[b_in[i], s] += count_in[i]
}
}
/* Prepare to be called again with the next batch. * /
}
D. Nodal Update Batch Size
During nodal movements, there is a tradeoff between the overhead of synchronizing global state, and the quality of the proposals generated. The more fresh each worker's view of global state is, the better proposals it generates. Higher-quality proposals mean fewer nodal movements and larger changes in entropy, ultimately leading to faster convergence and better partition accuracy. This batch size parameter is critical and must be measured empirically to tune the algorithm.
In our Decontentioned approach, the group batch size is the size of the range between the start vertex and stop vertex for each parallel worker. In other words, the group batch size is the number of proposals a worker generates before waiting on the barrier and seeing updated global state. The same batch size tradeoff also occurs in our Buffered implementation. In that approach, the group size is the number of proposals evaluated and sent to the central worker before synchronizing from global state. Our previous work [2] found that a batch size of 1 (i.e. the lowest possible granularity) was optimal, but our implementation details and fixed overheads are different now, so we choose to re-evaluate this tradeoff. We found optimal batch sizes for both approaches, and we discuss the details in the next section.
Performance Evaluation
We evaluate our decontentioned approach, as described in Section III with the buffered approach from our prior work [4].
A. Experimental Setup
The datasets we used in our experiments are described in Ta-ble I. These include the baseline datasets from the GraphChal-lenge, supplemented by larger graphs we synthesized using the generator in the GraphChallenge repository. We instrumented our code to measure the overall program runtime, ignoring time to read from disk. Our tests were conducted on a system with two 64-core AMD EPYC 7713 CPUs and 1 TB of RAM. For simplicity and because the program runtime is dominated by nodal movements, multi-threaded tests were conducted by setting an equal number of agglomerative merge and nodal movement threads. The serial baseline uses the same native code and compressed data structures, just without the overhead of creating and managing threads. Our code is written in a mixture of Python and C, with the native C code doing the heavy lifting, including computing entropy and implementing the Decontentioned compressed data structure described in Section III. We use Python 3.11.3 and Clang 15.0.7 to build our C code, and Numpy [12] for array processing.
B. Results
1) Group Batch Size
First we set out to characterize the sensitivity of both the Buffered approach and our new Decontentioned approach to the group batch size. We measured the performance of each with a varying number of threads and group sizes. The results for the baseline Buffered approach for the N=20k graph are shown in Figure 1. We see that the optimal number of vertices before synchronizing in the baseline approach can be large, only leveling off at 256 for the smaller number of threads. We found similar results for other input graph sizes, and selected 256 as a reasonable value.
Performance of parallel Buffered on a N=20k input graph across numbers of threads and varying batch sizes.
The characterization of our Decontentioned approach is shown in Figure 2 and Figure 3 for 20k and 200k graph sizes, respectively. Here we see that a smaller number of vertices is a better batch size compared to Buffered. We found good overall performance at a batch size of 64 vertices, vs. (256 or 512 for Buffered). Our Decontentioned approach performs best under smaller batch sizes, because it has an additional barrier overhead. A worker in Buffered is not blocked waiting for other workers to complete. If a worker has not completed its batch, those results will simply be reported and incorporated later.
2) Decontentioned Performance
Next we look directly at the performance of our Decontentioned approach against the baseline Buffered approach, each configured with an appropri-ate batch size as determined in Figures 1–3. The speedup of Buffered compared to serial for different numbers of threads is shown in Figure 4. Here we see that the maximum speedup is achieved at a low number of threads (16) compared to the number of CPU cores available (128 physical cores), for every input graph. This corresponds to our previous work showing that nodal movement parallelism is limited [4]. Note that Buffered failed to run on the larger lOOk and 200k node graphs with a large number of threads because it exhausted available system memory (which is 1 TB) due to duplicated state in the worker threads.
Performance of parallel Decontentioned on a N=20k input graph across numbers of threads and varying batch sizes.
Performance of parallel Decontentioned on a N=200k input graph across numbers of threads and varying batch sizes.
The speedup of our new Decontentioned over serial is shown in Figure 5. Unlike Buffered, the maximum speedup is achieved at 128 threads, corresponding to the number of CPU cores available on the system. Furthermore, the speedups are much higher. Next we directly compare the performance of Buffered and Decontentioned. The speedup of Decontentioned over Buffered is shown in Figure 6. Here we see the maxi-mum speedup over Buffered at 128 threads is 5.38x. At 256 threads, Decontentioned is no longer at its fastest, but has a speedup of 6.42x compared to Buffered. This indicates a more graceful performance degradation on an over-scheduled system. Furthermore, Decontentioned can actually process the larger lOOk and 200k graphs with a large number of threads, because of its more efficient use of memory.
Decontentioned speedup over Buffered across different input graphs and numbers of threads.
Conclusions and Future Work
We have described a new approach to that improves the parallelism of Stochastic Block Partition by leveraging a novel lock - free compressed data structure, splitting the critical nodal movement operations into separate read and write phases, and optimizing the granularity of nodal movement updates across workers. Our current work forms the basis for further algorithmic and implementation improvements in the future. In particular, we would like to combine our new approach to shared-memory parallelism on one node with message-passing based parallelism across nodes. We also plan to investigate the use of fine-grained locking to further enhance the parallelism of the algorithm. Finally, we plan to apply adaptive techniques based on vertex connectivity and asynchronous Gibbs sam-pling [13] to dynamically adjust the nodal update batch size.
ACKNOWLEDGEMENTS
This work was supported in part by the National Science Foundation, grants 1618706 and 1717774.