Part 2: Hacking Flink to handle large scale continuous ETL

Oct 6, 2022

Introduction


This post is part 2 of a multi-part series about how we scaled Flink to handle a large scale continuous ETL pipeline. Check out part 1 of the series if you have not done so yet.


At Rubicon, we use a good deal of open source software. In particular, we rely on Apache Flink as the base stream processing engine for continuous SQL transformation. After deploying our system against a handful of real-world production workloads, we’ve learned that while Flink has a really strong foundation, it cannot handle many of the workloads that we encountered out of the box.




For example, the query described in Part 1 of this series produces the following job graph.

  • 19,648 sub-stasks

  • parallelism 64

  • TBs of ingest data to process


Even after a considerable amount of parameter tuning, the job would not perform well even on a relatively large cluster. Our goal is to finish all backfill jobs within 2-4h window max, and this job was taking several days to complete running on clusters as large as 16 nodes of rd5.16xlarge instances — a considerable amount of processing power!




Ultimately, we needed to make a number of substantial modifications to Flink in order to support the more challenging customer workloads at scale. Some of the more substantial modifications include:

  • Batch backfill with streaming operators

  • Auto-rescale after batch backfill

  • Mini-batch optimizations for JOIN operators

  • Suppress processing of input rows that will not modify output state



Backfill performance challenges


The lifecycle of a continuous ETL job can be split into two distinct phases: (1) “offline” initial backfill of data and (2) “live” streaming mode. The backfill mode needs to process the entire dataset to build up both the internal job and materialized output state. In live streaming mode, a complete version of the state is built with some acceptable lag and ready to serve production traffic.

Our general approach to handling backfill is to give the backfill phase significant amount of resources (large cluster, lots of disk, CPU, and memory) and run the job at high parallelism. Once the backfill phase completes, we automatically rescale the job to a lower parallelism and restart with much fewer resources.

However, running the job at high parallelism and ample compute resources can quickly become cost prohibitive because building the initial job operator state in streaming mode creates an amplification effect as the job computes various intermediate states.

Yet these intermediate states are not needed — we only need the latest value at the end of backfill. Take the following simple transformation as an example:




Note how for every input row, Flink retracts (deletes) the previous value and emits the last values. As new rows stream in, we see all of the intermediate states computed for this aggregation. Now imagine a large number of such operators in sequence and you can see how the number of rows processed can flood the system.

In a traditional batch style system, operators do not output intermediate state. And for backfill jobs, batch mode is a much more efficient approach. So we want the bet of both worlds — batch backfill and streaming live phase! At first glance, it looks like we could leverage Flink’s BATCH execution mode. However, it ultimately doesn’t work because batch mode would use a completely different set of (batch) operators which will not build the necessary streaming operator state to resume the job in streaming mode 😢



Batch backfill with streaming operators


To solve our backfill amplification problem, we added a new mode to Flink with that combines the benefits of batch style backfill but uses modified streaming operators so that backfill job state can be used to restart a streaming job!

At a high level, the new mode works as follows:

  • At the start of the job, we snapshot all of the current Kafka offsets and turn them into bounded reads.

  • Each streaming operator builds its internal state without emitting any output until it receives an end-of-stream signal for all of its inputs. Only then does it emit.

  • After emitting, the finished operator enters a standby state and waits for a snapshot signal.

  • Once every operator finishes emitting, we have the following:

    • A correct point-in-time materialization of job output at the start of the job.

    • Complete streaming operator state ready for snapshot.



Once batch backfill completes, we snapshot the job and restore it rescaled down to enter streaming mode. The optimization has given us a 10-100x improvement on a number of real-world customer workloads!