Location>code7788 >text

I look at papers with Awesome-Graphs: interpreting Pregel

Popularity:393 ℃/2024-07-29 16:39:53

Pregel Thesis《Pregel: A System for Large-Scale Graph Processing》

Last time I shared with you the introductory post of Awesome-Graphs, the dissertation graphing projectThe Dissertation Graph: Awesome-Graphs Takes a Sample with 200 Graph Systems Dissertations., this time we'll take the founding paper Pregel on graph computing systems to open the next forecast Differential dataflow along the main line of the paper's atlas, which explains the content of the paper on graph computing systems.

Students interested in graph computing technology can learn more about it, and you are very welcome to follow and participate in the open source project of Thesis Mapping:

  • Awesome-Graphs:/TuGraph-family/Awesome-Graphs
  • OSGraph:/TuGraph-family/OSGraph

Thanks in advance to those of you who gave the project a Star, and let's get right into the meat of it!

summaries

The program, written using the Pregel computational model, consists of a series of iterations. In each iteration, the points of the graph can receive messages sent in the previous iteration and send messages to other points, as well as update the state of itself and outgoing edges, and even modify the topology of the graph. This point-centered computation allows flexible representation of a large number of graph algorithms.

1. Introduction

Challenges in large-scale graph processing:

  • Graph algorithms have poor memory access locality.
  • Less computation at a single point.
  • Problems arising from changes in parallelism during implementation.
  • The machine failure problem for distributed computing processes.

Common implementations of large-scale graph algorithms:

  • Distributed implementations customized for specific graphs [poor generalization]
  • Based on existing distributed computing platforms [e.g., MR, insufficient performance, ease of use]
  • Use of stand-alone graph algorithm libraries [e.g., BGL, LEDA, NetworkX, JDSL, GraphBase, FGL, which limits the size of the graph]
  • Use of existing graph computation systems [e.g. Parallel BGL, CGMgraph, lack of fault-tolerance mechanisms].

The design of the Pregel computing system was inspired by Valiant's BSP model:

  • The computation consists of a series of iterations which are called Superstep.
  • Each time the overstep framework executes the user's UDF on the point in parallel, describing the behavior of the point V at the overstep S.
  • A message sent to point V in S-1 overstep can be read within the UDF and a new message sent to the point in S+1 overstep.
  • The state of the point V and its outgoing edges can be modified within the UDF.
  • Normally messages are sent in the direction of the outgoing edge of the point, but they can also be sent to the point corresponding to a specified ID.

2. Models

importation

The input to the Pregel computational model is a directed graph:

  • Points are distinguished using IDs of type string and have a modifiable user-defined type value.
  • A directed edge is associated with a source point and contains a modifiable user-defined type VALUE and a target point ID.

count

The Pregel computational model runs a series of supersteps until the end of the computation, with the supersteps partitioned by all synchronization points (Barriers). The computation on the points in each superstep is parallelized and terminates when all points are inactive and no messages are passed.

exports

The output of a Pregel computational model is the set of all point output values, usually a directed graph isomorphic to the input graph. However, this is not absolute, as points/edges can be added and deleted during the computation.

talk over

Pregel uses a message-passing model rather than remote reads or other shared-memory-like schemes:

  • Messaging is sufficiently expressive without having to be read remotely.
  • Remote reads have high latency, which can be reduced using asynchronous + bulk messaging. (implies push semantics)

Performance issues in implementing graph algorithms using chained MR:

  • Pregel stores the points/edges on the machine performing the computation and uses only the network to pass the information.The MR implementation requires transforming the graph state from one stage to another, raising the overhead of communication and serialization.
  • The coordination of a sequence of MR jobs increases the complexity of the graph computation task, a problem that can be avoided by using Pregel's overstepping.

3. API

template <typename VertexValue, typename EdgeValue, typename MessageValue>
class Vertex {
  public:
    virtual void Compute(MessageIterator* msgs) = 0;
    
    const string& vertex_id() const;
    int64 superstep() const;
    
    const VertexValue& GetValue();
    VertexValue* MutableValue();
    OutEdgeIterator GetOutEdgeIterator();
    
    void SendMessageTo(const string& dest_vertex, const MessageValue& message);
    void VoteToHalt();
};
  • Vertex: Pregel programs inherit from the Vertex class, and the template parameters correspond to the types of point values, edge values, and messages.
  • Compute: the UDF executed at each point per overstep.
  • GetValue: Get the value of the point.
  • MutableValue: modifies the value of the point.
  • GetOutEdgeIterator: get out edge iterator, can read and modify the value of the out edge.
  • The value of the point and the outgoing edge are persistent across supersteps.

3.1 Messaging

  • Points can send as many messages as they want.
  • All messages sent to point V at superstep S can be fetched at superstep S+1 using an iterator.
  • Message order is not guaranteed, but it is guaranteed that a point will be transmitted and not de-duplicated.
  • The point receiving the message is not necessarily a neighboring point, i.e., the message is not necessarily sent along the outgoing edge.

3.2 Connectors (Combiner)

When sending a message to a point, especially if the target point is on another machine, there is a certain overhead that can be reduced by customizing the Combiner at the user level. For example, if the merge logic for messages sent to the same point is a summation, then the system pre-sums the messages before sending them to the target point and merges multiple messages into a single one, reducing network and memory overheads.

3.3 Aggregator

Pregel's aggregator provides a mechanism for global communication:

  • Each point in the superstep S can be provided with a value.
  • The system uses the reduce operator to statute all values as a global value such as max, min, sum.
  • This global value is visible to all points in the superstep S+1.
  • Aggregators can provide aggregation capabilities across supersteps.

Usage Scenarios:

  • Statistical feature: summing the out-degrees of points allows you to compute the number of edges of the graph.
  • Global coordination: waiting for all points in the overstep to satisfy certain conditions before continuing the computation; election of a point in the algorithm as a special role.
  • Cross-hyperstep aggregation: automatic maintenance of the global number of edges based on the addition/removal of pairs of edges in a hyperstep; Δ-stepping shortest path algorithm.

3.4 Modifying the topology

Modify the conflict resolution strategy:

  1. Deletion edges take precedence over deletion points.
  2. Delete operations take precedence over add operations.
  3. New points are prioritized over new edges.
  4. User-defined conflict policies.
  5. Finally the compute function is executed.

3.5 Inputs and outputs

  • Composition is separated from graph computation.
  • Customize Reader/Writer.

4. Realization

4.1 Basic Architecture

Pregel program execution flow:

  • The user program is copied to the master and worker nodes. master is used to coordinate the worker nodes and the worker nodes register information with the master through the name service.
  • The master determines the partition of the graph, default hash(point ID)%number of partitions. worker is responsible for maintaining the state of the graph partition, executing the compute function, and sending and receiving messages from other workers.
  • The master allocates user inputs to the worker, and the inputs are partitioned independently of the graph slices. If the inputs and graph slices happen to be on a worker then the corresponding data structure is updated immediately, otherwise it is shuffled to another worker. after the inputs are loaded, the point is initialized to active state.
  • The master directs the worker to execute the overstep, and the worker starts a thread for each partition that executes the compute function at the point of the active state to receive the messages delivered by the previous overstep. after the worker finishes execution, it reports back to the master the number of points that will be active for the next overstep.
  • The overstep is executed continuously until there are no active points as well as messages. After the computation is finished, the master notifies the worker to save the computation result on the graph slice.

4.2 Error tolerance

Fault tolerance is implemented by means of checkpoint:

  • Before the hyperstep begins, the master notifies the woker to save the graph state to persistent storage.
  • The graph state contains: point values, edge values, input messages, and the value of the aggregator on the master.
  • The master detects the status of the worker through ping messages, and once the out-of-connection worker computation is terminated, the master marks the worker in the FAILED state.
  • The master assigns the corresponding partition on the failed worker to the other surviving workers, and the other workers load the graph state from checkpoint.
  • The checkpoint may be more than one overstep ahead of the last overstep at the time of the error (not necessarily checkpointing every overstep).
  • Confined Recovery persists outgoing messages to save computational resources during recovery, but requires that the computation be deterministic.

4.3 Worker Implementation

  • The worker maintains the state of each point on the graph slice, the state contains: current point value, list of outgoing edges (edge value + target point), queue of incoming messages, active marker.
  • Considering performance, point-active markers and input message queues are stored independently.
  • There is only one copy of the point/edge value, and two copies of the point-active marker and input message queue (current overstep and next overstep).
  • Messages sent to other worker points are buffered before being sent asynchronously, and messages sent to local points are stored directly in the target point's input message queue.
  • The combiner function is executed when a message is added to the output queue or arrives at the input queue. The latter case does not save network overhead, but does save the space used for message storage (compute implies combine semantics).

4.4 Master Implementation

  • Coordinates the worker, assigns ids as the worker registers to the master. saves the worker's id, survival status, address information, partition information, and so on.
  • Master operations include input, output, calculation, and saving/restoring checkpoints.
  • Maintains statistics and graph state data during computation, such as graph size, out-degree distribution, number of active points, elapsed time and message transfers for oversteps, and aggregator values.

4.5 Aggregators

  • WORKER performs partial aggregation of slices first.
  • Global aggregation uses the tree approach to statute, rather than the pipeline approach, to improve CPU parallelism efficiency.
  • The global aggregation value is sent to all workers in the next overstep.

5. Applications

5.1 PageRank

class PageRankVertex : public Vertex<double, void, double> {
  public:
    virtual void Compute(MessageIterator* msgs) {
      if (superstep() >= 1) {
        double sum = 0;
        for (; !msgs->Done(); msgs->Next())
        sum += msgs->Value();
        *MutableValue() = 0.15 / NumVertices() + 0.85 * sum;
    }
    if (superstep() < 30) {
      const int64 n = GetOutEdgeIterator().size();
      SendMessageToAllNeighbors(GetValue() / n);
    } else {
      VoteToHalt();
    }
  }
};

5.2 Shortest path

class ShortestPathVertex: public Vertex<int, int, int> {
  void Compute(MessageIterator* msgs) {
    int mindist = IsSource(vertex_id()) ? 0 : INF;
    for (; !msgs->Done(); msgs->Next())
      mindist = min(mindist, msgs->Value());
      if (mindist < GetValue()) {
        *MutableValue() = mindist;
        OutEdgeIterator iter = GetOutEdgeIterator();
        for (; !(); ())
          SendMessageTo((), mindist + ());
    }
    VoteToHalt();
  }
};

class MinIntCombiner : public Combiner<int> {
  virtual void Combine(MessageIterator* msgs) {
    int mindist = INF;
    for (; !msgs->Done(); msgs->Next())
      mindist = min(mindist, msgs->Value());
    Output("combined_source", mindist);
  }
};

5.3 Bipartite graph matching

Calculation process:

  • Phase 0: Those vertices in the left set that have not yet been matched will send a message to each of its neighbors requesting a match, and will then unconditionally VoteToHalt. if it doesn't send a message (either because it has already found a match, or there are no outgoing edges), or if all message recipients have already been matched, the vertex will no longer become active.
  • Phase 1: Those vertices in the right set that have not been matched yet randomly select one of the messages it receives and send a message indicating acceptance of the request, and then send a rejection message to the other requesters. It then also unconditionally VoteToHalt.
  • Phase 2: Those vertices in the left collection that have not yet been matched select one of the acceptance requests it received from the right collection and send an acknowledgement message. Those vertices in the left set that have already been matched will never perform this phase because they will not send a message at phase 0.
  • Phase 3: The vertices in the right-hand set that have not yet been matched receive at most one confirmation message. It will notify the matching vertex and then unconditionally VoteToHalt that its job is done.
  • Repeat the process until all nodes are matched.

5.4 Semi-clustering


[Algorithm implementation requires additional information]

6. Experiments

Tested using the shortest path algorithm:

  • Point/edge size 1 billion: worker count 50-800, compute time 174s-17.3s, 16x worker acceleration 10x.
  • WORKER NUMBER 800: point/edge size 1 billion-50 billion, computation time 17.3s-702s, computation time grows linearly.

summarize

  • Pregel uses a "think like a vertex" programming API inspired by the BSP computational model.
  • Pregel meets the performance, scalability, and fault tolerance of graph computation at the billion scale.
  • Pregel is designed for computation on sparse graphs, where communication occurs mainly on edges and hot spots in dense graphs cause performance problems.