Apache Flink 笔记

杨朝坤 发布于

Apache Flink - Stateful Computations over Data Streams

Document

Apache Flink Documentation

Dataflow Model

The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in MassiveScale, Unbounded, OutofOrder Data Processing

State

Paper

  • Chandy-Lamport algorithm:Distributed Snapshots: Determining Global States of Distributed Systems
  • Lightweight Asynchronous Snapshots for Distributed Dataflows
    • an asynchronous snapshotting algorithm that achieves minimal snapshots on acyclic execution graphs
    • a generalisation of our algorithm that works on cyclic execution graphs
  • State Management in Apache Flink
    • a complete end-to-end design for continuous stateful processing, from the conceptual view of state in the programming model to its physical counterpart implemented in various backends.
    • how to naturally pipeline consistent snapshots in weakly connected dataflow graphs and capture minimal state, skipping in-transit records when possible, without impacting general system progress.
    • how snapshots can be utilized for a large variety of operational needs beyond failure recovery such as software patches, testing, system upgrades and exactly-once delivery.
    • encapsulate different processing guarantees and isolation levels for externally accessing partitioned operator state and output, using snapshots.
    • large-scale pipeline deployments that operate 24/7 in production and rely heavily on stateful processing coupled with runtime metrics and performance insights.

Article

Note

  1. Key-Groups
    • Key-Groups是flink重分发状态的原子单元。
    • 使用一致性哈希算法减少seek和增加磁盘顺序读。
    • 实现:先对key做hash,然后对最大并行度取模。以任务并行度等分哈希环,每个task instance处理[ i * max_parallelism/parallelism, (i + 1) * max_parallelism/parallelism)内的key,管理keyed-state。如果并行度改变,则加载新的范围的状态,新进来的记录也会哈希到持有对应key的状态的task instance
    • 为什么使用一致性哈希:如果不使用一致性哈希,比如使用并行度来取模。那么在并行度改变时,要么任务扫描整个快照,获取指派给它的所有键的状态,这会导致大量不必要的I/O;或者在快照中保存对每个key-state的引用,即索引,这样每个任务来选择性地读取指派给它的状态,这会增加索引成本(与key的数量成比例,而key的数量可能会非常大)和通信负载。使用一致性哈希,则每个task实例只读需要的数据,而且通常Key-Groups足够大以致能够粗粒度地顺序磁盘读。
    • 注意哈希环即最大并行度不能够设置的太大。因为根据上面定义,Key-Groups数量与最大并行度相同,如果最大并行度设置得太大,则每个Key-Groups可能只由很少的key的状态组成,从而无法进行粗粒度磁盘读写,导致性能降低。
  2. Rollback
    • During a full restart or rescaling, all tasks are being redeployed, while after a failure only the tasks belonging to the affected connected component (of the execution graph) are reconfigured.
    • In essence, known incremental recovery techniques from micro-batch processing are orthogonal to this approach and can also be employed. A snapshot epoch acts as synchronization point, similarly to a micro-batch or an input-split. On recovery, new task instances are being scheduled and, upon initialization, retrieve their allocated shard of state. In the case of IterationHead recovery, all records logged during the snapshot are recovered and flushed to output channels prior to their regular record forwarding logic.
  3. Asynchronous and Incremental Snapshots
    • the pipelined snapshotting protocol only governs “when” but not “how” snapshots are internally executed.
    • The out-of-core state backend based on RocksDB [13]exploits the LSM (log-structured merge) tree, internal representation of data in RocksDB. Updates are not made in-place, but are appended and compacted asynchronously. Upon taking a snapshot, the synchronous triggerSnapshot() call simply marks the current version, which prevents all state as of that version to be overwritten during compactions. The operator can then continue processing and make modifications to the state. An asynchronous thread iterates over the marked version, materializes it to the snapshot store, and finally releases the snapshot so that future compactions can overwrite that state. Furthermore, the LSM-based data structure also lends itself to incremental snapshots, which write only parts to the snapshot store that changed since the previous snapshots.
    • Flink’s in-memory local state backend implementation is based on hash tables that employ chain hashing. During a snapshot, it copies the current table array synchronously and then starts the external materialization of the snapshot, in a background thread. The operator’s regular stream processing thread lazily copies the state entries and overflow chains upon modification, if the materialization thread still holds onto the snapshot. Incremental snapshots for the in-memory local backend are possible and conceptually trivial (using delta maps), yet not implemented at the current point.
  4. ExactlyOnce Delivery Sinks
    • Idempotent Sinks
    • Transactional Sinks