TiFlink's Followup

Hi everyone, 5 months since the Hackathon, here I present you a milestone of TiFlink.
TiFlink is a project intended to provide better stream processing integration between Flink and TiKV. It provides some features that we think will be useful for users:

  1. Ship with a pure-java implementation of CDC client, which frees the users from running and maintaining TiCDC and MQs like Kafka.
  2. Stronger consistency guarantee. Working with a Snapshot Coordinator, the distributed Flink task now can provide transactional reading and writing.
  3. Unified Stream and Batch processing, which is especially suitable for materialized views. The Flink job can first fully scan a snapshot of source tables, then seamlessly switch to stream processing(incremental view maintainence).

Below is an overview of the whole system design.

The key idea here is to run a GRPC service as coordinator, for each sub task of the Flink job to coordinate transactions. Within each transaction, all workers will have the same view about the start time, commit time and primary key, thus, those workers can cooperate to read and commit changes to TiKV with MVCC and the percolator model.

The transactions are bound to checkpoints, which means at each checkpoint, the old transaction will be committed and a new one will be opened. Which gurantees ACID and avoids many anomalies like partial reading. For a better understanding of stream processing system’s consistency, please refer to this article.

Below is a picture depicts the transactions/checkpoints management:

The lastest progress of this project is:

  1. The CDC client implementation become stable at TiFlink/client-java. (Ready for MR?)
  2. The Flink library works fine now with tables of integer primary key.

Limitations:

  1. Only support integer PK (for now)
  2. Can only recover from checkpoint within a limited time (before change log GC at TiKV’s side)
  3. Can’t start working with non-empty target table
  4. Initial scan of source tables can’t take too long (the same change log GC problem).

TODO:

  1. Merge CDC client to client-java
  2. Adjust interface of the Flink library and make it ready for beta testing
  3. Support general and compound primary key
  4. Externalize row serializer and deserializer, make them customizable
  5. Integrate with TiDB provide better materialize view implement
  6. Improve flexibility
2 Likes

A PR was opened to contribute CDCClient to client-java https://github.com/tikv/client-java/pull/174

1 Like

Thanks for your updates @shanzi!

That sounds exciting! I can see people from Pravega looking for such client.

I’ll bring them here to see if we can join requirement and build a pragmatic implementation.

cc @amyangfei @kennytm

It is desirable since TiDB itself doesn’t support materialized views. I think corps in Zhihu also working on similar feature. Maybe @sunxiaoguang can give more insights.

I will take a closer look at the design and implementation part in the next reply.

Before diving into technical details, I’d like to ask two architecture questions.

First, where is the transaction coordinator running? From the picture I can see it is running inside Flink JM, which requires hacking in Flink source code. From the source code,

  1. server initialization
  2. caller stack

I think it is running on the client side. Then it is varied from deploy modes.

  • Session mode & Per-job mode, on the client side. In addition the client won’t exit by detached because the server is long-running.
  • Application mode, on the cluster side. It may be as the picture shown in the same container of Flink JM.

I’d like to see more details about where the coordinator running and confirm that it runs for each job.

Second, CDC client is implemented inside client-java, which seems weird based on my understanding. Could you draw a picture from TiDB / TiKV storage to the Client to the downstream? I don’t get it clear that you are gonna add a component to pingcap/ticdc or rewrite the whole project in Java.

@tison The design of the CDC client is not to replace TiCDC which is a whole distributed system for pulling and processing change logs. Instead, it’s some thing that users can run in a single JVM process to pull a key range. Thus, it is just a “client” which users can use similiarly as the KV client.

Users may use the CDC client on several JVM process, like what TiFlink is doing (each worker/TaskManager) will run one CDC client. And they run cooperately to achieve high throughput.
Based on this reasoning, I’d rather to regard the CDC client just one kind of Java client. It is quite simple and is not a replacement for TiCDC as it needs users to manually deploy and coordinate them distributedly. The java CDC client also does not provide failure recovery and checkpointing, which the users themselves have to handle.

For now, the coordinator contains a single GRPC server which should be started alone side the Flink’s JM, the server can also run somewhere in a public position, as long as clients in workers can find it.

The client conforms the coordinator interface, which is used internally by the consumer and producer. Thus, it’s possible to provide clients with different implementation, eg. use transactions/locks on some other storage (etcd, tikv, tidb, RDBMS, etc.) to cooperatively achieve consensus about TiFlink transactions and their timestamps. As it’s harder to implement, it’s left as future work.

It’s great to see the achievement of TiFlink project and looking forward to see it become a mature and production ready solution in foreseeable future.

As @tison mentioned, we are working on a project to implement unified TiDB connector which doesn’t guarantee global consistency but can be good enough for our use cases. Due to it’s simplicity, the implementation is pretty simple and are fairly complete at this time and we started working on real applications with this connector recently. After Hybrid Source(FLIP-150) is ready, this connector can scale to multiple parallelized workers with small amount of work.

In any case you would like to know some detail about the aforementioned connector. Here are PRs related to this work, Comments are welcome:)