Problem
We have talk a lot about the Batch-System for TiKV. The problem is that a message may arrive early but the target region it belongs is at the back of the fsm queue. This message will wait a long time to be processed.
We hope we can process messages in the order in which they are generated. If we consume message m1 in thread-1, while a new message m2 arrives and is consumed in thread-2, the message m2 may be written into RocksDB before m1 and it will cause an inconsistent result. So we need TiKV can keep correctness for concurrent-apply.
Design
RocksDB uses write-group to decide the log sequence number of every keys. Every write batch would be assigned with a WriteThread::Writer
and it will be added to an atomic-linked-list. I will divide this WriteThread::Write
from write-operation of RocksDB, so that I can make sure that the next entry will not write into RocksDB before the previous entry.
It means that we can decide the write order before we really write the keys. As soon as we ensure that the entry e2 is behind e1 in write-queue, we can apply them in different thread.
let mut wb = db.new_write_batch();
let mut starting = vec![];
while let Some(m) = receiver.try_recv() {
let region = globalRegionMap.get(m.get_region_id());
if region.last_applying_index.load(Ordering::Acquire) + 1 == m.get_first_log_index() {
starting.push((region, m.get_last_log_index()));
for (k,v) in m.kvs() {
wb.put(m);
}
} else {
region.restore_entries(m);
}
if wb.count() > MAX_WRITE_BATCH_SIZE {
break;
}
}
db.prepare(&wb);
for (region,v) in starting {
region.last_applying_index.store(v, Ordering::release);
region.reschedule_entries(v);
}
db.submit(&wb);
Q & A
-
Would this proposal break the constraints of Raft algorithm? How is the different between this proposal and the parallel-raft algorithm proposed by PolarFS?
- We still require all entries must be appended to log in order. And we do not change the calculation rules of committed-index. We just hope to apply the data to memtable as soon as possible. But we do not speed up the commit of log in this proposal.
-
What if an entry send to one thread but it finds that there have been still some entries not processed?
- It will be restored in the cache of the region it belongs to. And when the previous entry have been starting writing to RocksDB, it will notify the entires behind itself.
-
How do we batch the message for better performance and CPU-cost effective?
- We do not collect multiple regions as the current batch-system. We only decide when to flush write batch by the count of write batch. It means that the current configure
apply-max-batch-size
does not work again.
- We do not collect multiple regions as the current batch-system. We only decide when to flush write batch by the count of write batch. It means that the current configure
-
How shall we deal with the split-request and merge-request?
- If one message contains an admin-request, it must wait all of entries before it are consumed.
-
When would we delete these entries which have been applied to RocksDB?
- This operation is also an admin-request which is called as CompactLog. See the previous question.