Proposal: Fair Scheduler for Concurrent-Apply

Problem

We have talk a lot about the Batch-System for TiKV. The problem is that a message may arrive early but the target region it belongs is at the back of the fsm queue. This message will wait a long time to be processed.

We hope we can process messages in the order in which they are generated. If we consume message m1 in thread-1, while a new message m2 arrives and is consumed in thread-2, the message m2 may be written into RocksDB before m1 and it will cause an inconsistent result. So we need TiKV can keep correctness for concurrent-apply.

Design

RocksDB uses write-group to decide the log sequence number of every keys. Every write batch would be assigned with a WriteThread::Writer and it will be added to an atomic-linked-list. I will divide this WriteThread::Write from write-operation of RocksDB, so that I can make sure that the next entry will not write into RocksDB before the previous entry.

It means that we can decide the write order before we really write the keys. As soon as we ensure that the entry e2 is behind e1 in write-queue, we can apply them in different thread.

let mut wb = db.new_write_batch();
let mut starting = vec![];
while let Some(m) = receiver.try_recv() {
      let region = globalRegionMap.get(m.get_region_id());
      if region.last_applying_index.load(Ordering::Acquire) + 1 == m.get_first_log_index() {
           starting.push((region, m.get_last_log_index()));
           for (k,v) in m.kvs() {
                wb.put(m);
           }
      } else {
           region.restore_entries(m);
      }
      if wb.count() > MAX_WRITE_BATCH_SIZE {
           break;
      }
}
db.prepare(&wb);
for (region,v) in starting {
     region.last_applying_index.store(v, Ordering::release);
     region.reschedule_entries(v);
}
db.submit(&wb);

Q & A

  • Would this proposal break the constraints of Raft algorithm? How is the different between this proposal and the parallel-raft algorithm proposed by PolarFS?

    • We still require all entries must be appended to log in order. And we do not change the calculation rules of committed-index. We just hope to apply the data to memtable as soon as possible. But we do not speed up the commit of log in this proposal.
  • What if an entry send to one thread but it finds that there have been still some entries not processed?

    • It will be restored in the cache of the region it belongs to. And when the previous entry have been starting writing to RocksDB, it will notify the entires behind itself.
  • How do we batch the message for better performance and CPU-cost effective?

    • We do not collect multiple regions as the current batch-system. We only decide when to flush write batch by the count of write batch. It means that the current configure apply-max-batch-size does not work again.
  • How shall we deal with the split-request and merge-request?

    • If one message contains an admin-request, it must wait all of entries before it are consumed.
  • When would we delete these entries which have been applied to RocksDB?

    • This operation is also an admin-request which is called as CompactLog. See the previous question.
1 Like

There is part of my work for the API of RocksDB:

https://github.com/tikv/rocksdb/pull/260
Rust API for RocksDB

Suppose we have raft logA for region A, and another raft logB for region B. Each raft log contains a queue of requsts. Take a request Ma from logA, and take another request Mb from logB.
With this proposal, can we know which request comes earlier before we apply the two requests? If the answer is true, the universal queue of requests is available now for tikv. It is wonderful. @Little-Wallace

If this two message come from different regions, we can apply them in any order.

I talked with @BusyJay about this proposal. And we agree to decide the order in raftstore and we may prepare writer in raftstore thread and then we will execute the real writing operation in apply-thread.

We have raft logA for region A, and another raft logB for region B. Each raft log contains a queue of requsts. Take a request Ma from logA, and take another request Mb from logB.
Suppose:
Ma is A.prepare_merging, and
Mb is B.commit_merge.
The client generates Ma before Mb.
Discuss:
If a server thread can receive the two requests, and applies Ma before Mb, region A will be merged into region B correctly.
If all the nodes of raftstore apply Ma before Mb, logics of prepare_merging and commit_merge will be simple, and no need to deal with abnormal cases created by asynchronous conditions, including the points Jay listed. Discuss: Region Merge Mto1Solution @BusyJay
Does this proposal help? Or what does this proposal do?

I don’t see what’s your point. What current TiKV does is exact making all nodes “apply Ma before Mb”. And the guarantee only solves the problem 1.


@Little-Wallace says applying Ma and Mb in any order.


@BusyJay says applying Ma before Mb.

Which one is valid for Tikv 5.0?
Does this propsal do something helpful for the example of Ma and Mb?

Both are true. Actually, because raft merge algorithm guarantees CommitMerge will be applied after PrepareMerge, which means Ma and Mb can’t be CommitMerge and PrepareMerge respetively, so Ma and Mb can be written to rocksdb in any order.

Fine. I see you Jay. I should say “Which one is valid for tikv logics excluding current merge implement?” Anyway, we do not discuss region merge at this topic.