Naiad Thesis:《Naiad: A Timely Dataflow System》
Previously through the articleThe Dissertation Graph: Awesome-Graphs Takes a Sample with 200 Graph Systems Dissertations.Introduced Awesome-Graphs, a thesis graphing project, to the group and shared Google'sPregeland OSDI'12PowerGraphSOSP'13X-Stream. This time, we share with you another paper on stream processing systems, Naiad, published by Microsoft in SOSP'13.TimelyDataflowis its open source implementation. The paper facilitated the design and innovation of subsequent flow graph systems, and the TuGraph Analytics scheduler can be seen in the design of its scheduling framework.
Students interested in graph computing technology can learn more about it, and you are very welcome to follow and participate in the open source project of Thesis Mapping:
- Awesome-Graphs:/TuGraph-family/Awesome-Graphs
- OSGraph:/TuGraph-family/OSGraph
Thanks in advance to those of you who gave the project a Star, and let's get right into the meat of it!
summaries
Naiad is a distributed data parallel system that executes ringed data streams, providing high-throughput batch processing, low-latency stream processing, and the ability for iterative and incremental computation.
1. Introduction
Support Features:
- Loop structuring with support for reverse edges (feedback).
- Stateful data flow nodes that support production-consumption capabilities without global coordination.
- Notification mechanism after a node collects inputs for a specific round/iteration.
2. Timely data flow
Data flow graphs can contain nested loop structures, and timestamps are used to distinguish which round/iteration the data was generated from.
2.1 Figure structure
Just-in-time data flow graphs contain input/output nodes, where input nodes receive message sequences from external producers and output nodes send message sequences to external consumers.
The external producer marks a round (epoch) for each message and proactively notifies the input node when there are no messages to input.
The producer can also close the input node, indicating that the input node will not receive any more messages.
Messages from output nodes are also marked for this round, and similarly external consumers are notified when there are no messages to output.
Timely data flow graphs can contain nested loop contexts:
- Entry point (ingress vertex): edges of the data flow graph entering the loop context must pass through the entry point, e.g. I.
- Exit point (egress vertex): edges of the data flow graph leaving the loop context must pass through the exit point, e.g., E.
- Feedback vertex: the loop context must contain a feedback vertex, e.g., F.
for the computational semantic interpretation expressed in the figure above:
Key concept: logical timestamp:
- e: the rounds of the message.
- k: depth of loop nesting.
- c: vector, number of iterations per level of the loop.
Logical timestamp change rules:
- After the entry point: c adds a dimension, initialized to 0, indicating the start of the loop.
- After the feedback point: +1 for the last dimension of c, indicating that the number of loops has accumulated.
- After the exit point: the last dimension of c is proposed to be restored to coincide with the entrance point.
Logical timestamp size comparison, t1=(e1, <c1, ... , cm>), t2=(e2, <c1, ... , cn>):
- Condition 1: Integer comparison, e1 < e2.
- Condition 2: String comparison, c1 + ... + cm < c1 + ... + cn.
2.2 Node Calculation
Nodes in the data stream can receive and send messages with logical timestamps (message), and notifications (notification).
Each node v implements two callback functions:
- (Edge e, Message m, Timestamp t): receive the message.
- (Timestamp t): Receive notifications.
and can call two functions provided by the system:
- (Edge e, Message m, Timestamp t): send the message.
- (Timestamp t): send notification.
For the data flow edge e = (u, v), will trigger, will trigger.
The data flow system guarantees that (t) must occur after (e, m, t'), where t' < t, i.e., it is guaranteed that notifications are processed after processing all the messages before t, in order to allow the node to have the opportunity to clean up the state of the work before t.
This mechanism ensures that message processing does not backwards in time.
The following sample code describes the logic of a double-out data stream node implementing the distinct, count operator.
class DistinctCount<S,T> : Vertex<T>
{
Dictionary<T, Dictionary<S,int>> counts;
void OnRecv(Edge e, S msg, T time)
{
if (!(time)) {
counts[time] = new Dictionary<S,int>();
(time);
}
if (!counts[time].ContainsKey(msg)) {
counts[time][msg] = 0;
(output1, msg, time);
}
counts[time][msg]++;
}
void OnNotify(T time)
{
foreach (var pair in counts[time])
(output2, pair, time);
(time);
}
}
2.3 Realization of just-in-time data flow
Data flow processing is limited by unprocessed events (events: messages, notifications) and the structure of the data flow graph.
Key concept: pointstamp:
- (e, m, t): generate pointstamp (t, e).
- (t): generate pointstamp (t, v).
Single-threaded scheduler implementation:
- Maintains a collection of active pointstamps of size at least 1. For each pointstamp, there are two counters:
- OC (occurrence count): number of outstanding pointstamps.
- PC (precursor count): the number of pointstamps activated upstream.
- When the system is initialized, the first pointstamp is generated for the input node where t=e, OC=1, and PC=0. When e is completed, the pointstamp continues to be generated for t=e+1.
- When pointstamp p is activated, initialize the PC to the number of all activated pointstamps upstream and increment the PC value for all pointstamps at downstream nodes.
- When OC[p]=0, remove p from the active set and decrement the PC values of all pointstamps of downstream nodes.
- When PC[p]=0, it means that there is no active pointstamp upstream that affects p. Then p is said to be frontier and the scheduler sends all notifications to the frontier.
The rules for calculating OC are:
3. Distributed realization
- The Naiad cluster contains multiple processes, each containing multiple workers, and the worker manages a partition of the data flow node.
- Messages are exchanged between workers over local shared memory or remote TCP connections.
- The process follows the distributed Progress Tracking Protocol (PTP) for coordinating the distribution of notifications.
3.1 Data parallelism
- Logical data flow diagram: stages+connectors.
- connectors contains a partition function.
- At runtime the logical data flow graph is expanded into a physical data flow graph, stages are replaced with a set of nodes and connectors are replaced with a set of edges.
3.2 Workers
- Distributing messages takes precedence over distributing notifications.
- Distribution strategies are varied, e.g., based on the earliest pointstamp distribution to reduce end-to-end latency.
- worker uses a shared queue for communication.
- If the target node of the distribution is in the same worker, then SendBy will directly call OnRecieve on the target node.
- If a ring exists it needs to be forced into a queue, or the recursion depth needs to be controlled to avoid overloading the system.
3.3 Distributed Progress Tracking
- Each worker maintains its own state, which is shared via broadcast OC.
- Optimization of means:
- Implement progress tracking using mapped pointstamps to reduce concurrent conflicts and update size.
- Local aggregation before updating the broadcast.
3.4 Error Tolerance and Availability
- Checkpoint and Restore interfaces.
3.5 Prevention of jitter
- Networking.
- Data structure competition.
- Garbage collection.
4. Writing programs using Naiad
5. Performance evaluation
6. Real-world applications
- Batch Iterative Graph Computation
- Batch Iterative Machine Learning
- manifold acyclic computation (math.)
- Streaming Iterative Graph Analysis
7. Summary
Naiad supports hybrid synchronous + asynchronous computing by allowing programs to coordinate on demand.