0%

MyOTP-5 IPC进程通信

应用场景

如下图所示,MyOTP中将一个传统的单进程多线程交易系统切换为多进程的交易系统。进程间通信采用基于共享内存的queue、disruptor、key-store和req-rep等组件实现。 fig1

Disruptor

Disruptor是英国外汇交易公司LMAX开发的一个线程间通信的框架,即在多线程间共享数据。它是由LMAX公司开发的可信消息传递架构的一部分,以便用非常快速的方法来在多组件之间传递数据。这里借鉴这个框架的实现方式通过共享内存实现跨进程通信使用。 如下图,Disruptor不是传统的生产者-消费者模式,Disruptor的是广播形式的,通过修改映射共享内存实现一个行情源广播多个策略进程的模式。

fig3

Ring Buffer

Disruptor的底层数据结构是一个RingBuffer,并且只维护一个用于指向下一个可用的位置的游标。 当需要写入时,Disruptor会分配一个空间,并将指针更新为该空间的最大下标处,这块空间就是申请写入的线程所独享的。当空间分配成功后,写线程可以慢慢写入。所以当有多线程同时写入时,需要同步的地方只是更新游标,而更新游标只是做加操作,利用原子变量的加操作就可以避免使用锁,从而提高效率。当需要读取时,Disruptor 会等待游标更新到想要读取的下标处,然后执行读操作,这个过程同样不需要加锁。

1
2
3
4
5
6
7
8
9
typedef struct
{
size_t buffer_size;
size_t total_mem_size;
std::atomic<size_t> registered_consumer_count;
std::atomic<int64_t> cursor;
std::atomic<int64_t> next;
std::atomic<int64_t> array_of_consumer_indexes[MAX_CONSUMER];
} ShmStoreHeader;

RingBuffer使用cursornext管理写入线程位置,next表示生产者为数据存储保留的位置,cursor是生产者保存完数据的位置,也就是消费者可以读取的位置。对SPMC和SPSC的模式,可以取消原子操作进一步提高效率。

现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。

Disruptor论文中讲述了一个实验:

  • 这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
  • 机器环境:2.4G 6核
  • 运算: 64位的计数器累加5亿次

|Method | Time (ms) | |— | —| |Single thread | 300| |Single thread with CAS | 5,700| |Single thread with lock | 10,000| |Single thread with volatile write | 4,700| |Two threads with CAS | 30,000| |Two threads with lock | 224,000|

CAS操作比单线程无锁慢了1个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。

单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能。

在多线程情况下,为了保证线程安全,必须使用CAS或锁,这种情况下,CAS的性能超过锁的性能,前者大约是后者的8倍。

综上可知,加锁的性能是最差的。

fig4

RingBuffer中数据的位置使用nextcursor两个单调递增的变量来表示。例如假设一个总大小为8的环形缓冲区。随着数据的存储,我们将使用顺序递增的索引来标记其位置(0,1,2,3,4,5,6.. 类似这样)。但是,如果指数超过7,则指数在Disruptor中并没有再次回到0,而是像8、9、10一样继续增加。

Producer

Disruptor的实现思路就是每一个数据都会有一个唯一自增的序号,用一个环形队列来存储数据,写入数据的时候先分配出可写的空间序号,然后再慢慢写数据,等到数据写完后再更新一下可读区域。这样因为每个线程独占一块空间写入数据,就不会有线程同步问题,唯一需要同步的地方是分配写入空间和更新可读区域。但是这两个操作都是非常简单的加操作,加锁太浪费,所以Disruptor直接使用原子变量加自旋等待来同步,获取极高的性能。

next写入线程下个月写入位置,cursor读取线程可以读取的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
bool set_data(T *data) {
// get next
const auto idx = ClaimIndex();
// write data
const bool ret = SetData(idx, data);
// update cursor
if (ret) {
return Commit(idx);
}
return ret;
}

// 一个进程写入,单进程内多线程因为原子操作无需区分写入线程
template<typename T>
inline int64_t Disruptor<T>::ClaimIndex() {
// fetch_add, Adds val to the contained value and returns
// the value it had immediately before the operation.
auto nNextSeqForClaim = shm_hdr_->next.fetch_add(1) + 1;

int64_t wrapPoint = nNextSeqForClaim - buffer_size_;
do {
int64_t gatingSequence = GetMinIndexOfConsumers();
if (wrapPoint >= gatingSequence) {
// std::this_thread::yield tells the implementation to
// reschedule the execution of threads, that should be
// used in a case where you are in a busy waiting state,
// like in a thread pool:
std::this_thread::yield();
continue;
} else {
break;
}
} while (true);
return nNextSeqForClaim;
}

Reader

读取的时候则是先声明想读取的序号。然后就一直等待直到写入数据后更新的可读序号赶上想读取的序号。等待可以是CPU自旋等待或者放弃CPU时间片或者睡眠或者使用条件变量唤醒。此时可读序号之前的数据都是可读的,而读操作也不需要加锁。又可以获得极高的性能。读取后更新一下已读取序号,这样写入着就可以继续重复利用这块空间了。

1
2
3
4
5
6
7
8
// last_idx, cursor last value
auto last_idx = shm.WaitFor(reader_idx);
// if reader_idx <= cursor
if(reader_idx <= last_idx){
ptr = shm.GetData(reader_idx);
}
// update user reader_idx
shm.CommitRead(user_id, reader_idx);

消费者和生产者一样,必须在对环形缓冲区索引的所有操作完成后保存它读取数据的点,以防止生产者通过引用消费者的位置来覆盖缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// consomer/reader 最新可读取的数据位置
template<typename T>
inline int64_t Disruptor<T>::GetMinIndexOfConsumers() {
int64_t min_index = INT64_MAX;
bool is_found = false;
for (size_t i = 0; i < shm_hdr_->registered_consumer_count; i++) {
int64_t index = shm_hdr_->array_of_consumer_indexes[i];
if (index < min_index) {
min_index = index;
is_found = true;
}
}
if (!is_found) {
return 0;
}
return min_index;
}

当调用生产者的Claim或消费者的WaitFor方法时,它不会立即返回,而是等待直到满足适当的条件。比如生产者调用Claim,下一个next位置违反了消费者位置,就需要等待。在Disruptor中,此时如何等待的方法是在Strategy模式中实现的。生产者和消费者的等待策略类型如下。

  • BusySpinStrategy:不放弃cpu死循环等待
  • YieldingStrategy:死循环一定次数后调用yield放弃cpu时间片。
  • SleepingStrategy:死循环一定次数后重复调用yield放弃cpu时间片。调用yield一定次数后重复调用sleep睡眠指定时间。
  • BlockingStrategy:阻塞等待,内部使用了条件变量,需要写入的时候唤醒。

Improve

如果内存足够大,写入数据量不是无限多的情况下,可以使用一个huge-page替换ring-buffer。操作在“无限长的内存页面”的上面,不需要考虑写入覆盖的问题,就可以取消ring-buffer里面对读写速度的控制部分进一步提高效率,同时也可以将所有的流程数据异步保存在本地,也就是把“事故现场”完整保留。
如下代码,vector通过管理内存地址的方式实现多个mmap创建内存空间连续使用,这样就可以直接使用在多线程和多进程中而不必区分版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 创建第一个Page,书签Bookmark也在这个页面上面
{
std::string file_path = folder_path + "page0.store";
auto page = Page(file_path, writer);
if (!page.GetShm()) {
return false;
}

// 书签
bookmark_ = (Bookmark *) page.GetShmDataAddress();
wait_ = new WaitStrategy(bookmark_);
if (init) {
bookmark_->item_num = item_num;
bookmark_->page_num = page_num;
bookmark_->cursor = -1;
bookmark_->next = -1;
}

// 只使用一个页面
if (page_num == 1) {
for (auto i = 0; i < item_num; i++) {
content_[i] = (T *) ((char *) page.GetShmDataAddress() + sizeof(Bookmark) + sizeof(T) * i);
}
return true;
}

//使用多个页面
for (auto i = 0; i < item_num_in_page - item_num_in_mark; i++) {
content_[idx++] = (T *) ((char *) page.GetShmDataAddress() + sizeof(Bookmark) + sizeof(T) * i);
}
}

//Pages
for (auto p = 1; p < page_num; p++) {
std::string file_path = folder_path + "page" + std::to_string(p) + ".store";
auto page = Page(file_path, writer);
if (!page.GetShm()) {
return false;
}

auto left_item_num =
total_item_num - (item_num_in_page - item_num_in_mark) - item_num_in_page * (p - 1);
if (left_item_num < item_num_in_page) {
for (auto i = 0; i < left_item_num; i++) {
content_[idx++] = (T *) ((char *) page.GetShmDataAddress() + sizeof(T) * i);
}
} else {
for (auto i = 0; i < item_num_in_page; i++) {
content_[idx++] = (T *) ((char *) page.GetShmDataAddress() + sizeof(T) * i);
}
}
}

ShmKV

链接