0%

Shm IPC Lock-Free KV Store

2022-08-13更新

在实盘程序运行中发现shmkv的原始实现代码有bug,修复bug重构版本store

~PyShmKV通过修改pupa内存数据库实现python多进程共享内存数据库,可以在python内将任意数据类型的数据通过kv方式跨进程交互。~

内存内数据体结构

pupa原作者设计是通过双buffer实现的在一写多读情况下无锁的KV数据库,代码由纯C编写,笔者在使用的时候发现核心部分有bug,并且被设计为单独运行的数据库。这里修改为C++并添加封装变成进程内组件,添加类封装,方便创建多个实例。 如下图,是pupa的内存结构设计

mem_layout

其中Header部分由三个结构体组成即item、key和value三部分在内存中的首位置和字节长度。

  • sec1和sec2分别表示双buffer的内存首位置
  • size表示预设大小,used表示实际使用量
  • id=1 or 2 表示当前使用的buffer是哪个

这个内存数据库的设计是通过双bufferitem实现索引一写多读的lock-free。因为要适配共享内存,需要内存连续且有限长度(不能动态创建内存空间),所以对key和value也构建的双buffer的结构,这样可以通过make_snapshot的方式实现内存释放功能。对于item部分,每次set都创建一个新的snapshot操作然后切换,对于 keyvalue两个部分,只有内存空间写满了之后才进行make_snapshot,通过重新去掉无item指向的内存空间释放内存。

1
2
3
4
5
6
7
8
9
10
// sec1和sec2分别表示双buffer的内存首位置
// size表示预设大小,used表示实际使用量, char长度
// id=1/2 表示当前使用的buffer是哪个
typedef struct {
unsigned int id;
unsigned int size;
unsigned int used;
size_t sec1_offset;
size_t sec2_offset;
} skv_store_section;

在这个内存结构设计中,item是真正的索引,item的结构体内部存储key和value在内存中偏移量。使用二分法寻找对应的比对key,从而确定相应的item获取value。

1
2
3
4
5
6
7
// item 数据存储,是数据库的索引
typedef struct {
size_t key_offset;
size_t value_offset;
unsigned int key_len;
unsigned int value_len;
} skv_store_item;
KeyValue为双buffer的存储区域。

Init

  • size表示空间大小,是否使用超出范围
  • used表示已经存储的key数量
  • id表示当前使用过的内存空间
  • offset这个区域内存首位置偏移量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 初始化内存空间
inline size_t skv_store_init(
skv_store_hdr *store_hdr,
unsigned int key_count,
unsigned int value_size) {
size_t offset = sizeof(skv_store_hdr);

// header memory management 初始化
// initialize cache item section
store_hdr->item_section.size = key_count;
store_hdr->item_section.used = 0;
store_hdr->item_section.id = SKV_STORE_SECTION_ONE;
store_hdr->item_section.sec1_offset = offset;
offset += key_count * (long) sizeof(skv_store_item);
store_hdr->item_section.sec2_offset = offset;
offset += key_count * (long) sizeof(skv_store_item);

// initialize cache key section
store_hdr->key_section.size = SKV_KEY_AVERAGE_LEN * key_count;
store_hdr->key_section.used = 0;
store_hdr->key_section.id = SKV_STORE_SECTION_ONE;
store_hdr->key_section.sec1_offset = offset;
offset += key_count * SKV_KEY_AVERAGE_LEN;
store_hdr->key_section.sec2_offset = offset;
offset += key_count * SKV_KEY_AVERAGE_LEN;

// initialize cache value section
store_hdr->value_section.size = value_size * key_count;
store_hdr->value_section.used = 0;
store_hdr->value_section.id = SKV_STORE_SECTION_ONE;
store_hdr->value_section.sec1_offset = offset;
offset += key_count * value_size;
store_hdr->value_section.sec2_offset = offset;
offset += key_count * value_size;

return offset;
}

Store Get

Struct

这部分代码里面store_item_wrapper是非常令人费解的设计,这个数据结构是计算某一个item数据结构相对于共享内存的偏移量。 但是无论是set还是get都是增量信息,没有必要搞这样的操作。

1
2
3
4
5
6
7
8
//  这里的设计逻辑是在共享内存外存在一个数据,
// 使用数据结构skv_store_item表示,
// 并且添加这个数据结构相对于共享内存的偏移量
typedef struct {
skv_store_item store_item;
skv_ctx *ctx;
size_t key_offset;
} skv_store_item_wrapper;

1
2
3
4
//  todo:操作共享内存外的数据,以共享内存为首位置坐标计算偏移量?
store_item_wrapper.ctx = ctx;
store_item_wrapper.key_offset = (key->data - (char *) ctx->store_hdr);
store_item_wrapper.store_item.key_len = key->len + 1;

bsearch

Get的查找功能使用C语言基础的bsearch函数实现

1
2
3
4
5
void *bsearch(const void *key,
const void *base,
size_t nitems,
size_t size,
int (*compar)(const void *, const void *));
* key -- 指向要查找的元素的指针,类型转换为 void *
* base -- 指向进行查找的数组的第一个对象的指针,类型转换为 void *
* nitems -- base 所指向的数组中元素的个数。
* size -- 数组中每个元素的大小,以字节为单位。
* compar -- 用来比较两个元素的函数。

memcmp

memcmp把存储区 str1 和存储区 str2 的前 n 个字节进行比较

1
int memcmp(const void *str1, const void *str2, size_t n);

offsetof

offsetof(type, member-designator) 会生成一个类型为 size_t 的整型常量,它是一个结构成员相对于结构开头的字节偏移量。成员是由 member-designator给定的,结构的名称是在 type 中给定的。

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
inline int skv_store_item_compare_(
const void *p1,
const void *p2,
void *arg) {
int ret;
char *store_hdr = nullptr;
skv_store_item_wrapper *store_item_wrapper = nullptr;

ret = (int) ((skv_store_item *) p1)->key_len - (int) ((skv_store_item *) p2)->key_len;
if (ret != 0) {
return ret;
}

store_item_wrapper = (skv_store_item_wrapper *) arg;
store_hdr = (char *) store_item_wrapper->ctx->store_hdr;
if (store_item_wrapper->key_offset == 0) {
ret = memcmp(store_hdr + ((skv_store_item *) p1)->key_offset,
store_hdr + ((skv_store_item *) p2)->key_offset,
((skv_store_item *) p1)->key_len);
} else {
ret = memcmp(store_hdr + store_item_wrapper->key_offset,
store_hdr + ((skv_store_item *) p2)->key_offset,
((skv_store_item *) p1)->key_len);
}
return ret;
}

inline int skv_store_item_key_compare(const void *p1,
const void *p2) {
skv_store_item_wrapper *p_store_item_wrapper = nullptr;
// todo: 这里通过偏移量的方式,通过p1获取结构体指针skv_store_item_wrapper的首位置
// p1 is the target value, P2 is each value in the array.
p_store_item_wrapper = (skv_store_item_wrapper *) ((char *) p1 - offsetof(skv_store_item_wrapper, store_item));

return skv_store_item_compare_((void *) p1, p2, p_store_item_wrapper);
}

inline int skv_store_get(skv_ctx *ctx,
skv_str *key,
skv_str *value) {
skv_store_item *p_cache_item = nullptr;
skv_store_item *p_store_item = nullptr;
skv_store_section *p_item_section = nullptr;
skv_store_item_wrapper store_item_wrapper;

// 获取当前读取操作正在使用的buffer(buffer-1或buffer-2)的内存首位置
p_store_item = (skv_store_item *) SKV_STORE_GET_ADDRESS(
ctx->store_hdr, ctx->store_hdr->item_section);

// todo:操作共享内存外的数据,以共享内存为首位置坐标计算偏移量?
store_item_wrapper.ctx = ctx;
store_item_wrapper.key_offset = (key->data - (char *) ctx->store_hdr);
store_item_wrapper.store_item.key_len = key->len + 1;

p_item_section = &ctx->store_hdr->item_section;
p_cache_item = static_cast<skv_store_item *>(bsearch(&store_item_wrapper.store_item,
p_store_item,
p_item_section->used,
sizeof(skv_store_item),
skv_store_item_key_compare));

if (p_cache_item == nullptr) {
DEBUG_LOG("Not found key: %.*s", key->len, key->data);
return SKV_NOT_FOUND;
}

if (p_cache_item->key_len == 0) {
DEBUG_LOG("The length of the key (%.*s) is invalid.", key->len, key->data);
return SKV_ERROR;
}

value->data = (char *) ctx->store_hdr + p_cache_item->value_offset;
value->len = p_cache_item->value_len;
return SKV_OK;
}

Store Set

make_snapshot

每次set操作都是在操作item的snapshot快照。set操作里面,双buffer的切换操作每次切换的是item的双buffer, key和value的双buffer是在写满数据之后才进行切换,不跟随set操作切换buffer。

  • make item snapshot
  • bsearch, add or replace
  • change item section id(原子操作,lock-free read)

创建snapshot,每次set操作都是在snapshot中操作,然后再切换id指向。
* todo: 这里直接使用下一个函数的方式创建snapshot的方式直接管理内存应该更好?for循环memcpy和直接大内存memcpy性能差别有多少?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void pupa_store_item_make_snapshot(skv_ctx *ctx) {
skv_store_item *p_cache_items = NULL;
skv_store_item *p_store_items = NULL;

p_cache_items = (skv_store_item *) SKV_STORE_GET_SNAPSHOT_ADDRESS(
ctx->store_hdr, ctx->store_hdr->item_section);

p_store_items = (skv_store_item *) SKV_STORE_GET_ADDRESS(
ctx->store_hdr, ctx->store_hdr->item_section);

if (ctx->store_hdr->item_section.used) {
memcpy(p_cache_items, p_store_items,
sizeof(skv_store_item) * ctx->store_hdr->item_section.used);
}
ctx->store_items_snapshot = p_cache_items;
}

free memory and change section id

在set操作中,add or replace都是直接在内存地址的末尾copy新的内容,这样会造成前面的内存中的内容并没有删除,只是修改了指针导致内存空间没有回收利用。 当空间填满的时候需要重新排序copy内容,释放没有item指向的内存空间。
* 这个库这里设计会产生一个类似于gc的操作,导致set写入时间不稳定。另外set和get使用bsearch操作也会导致操作时间不稳定,随着size变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 注意这里重新构建snapshot了,从store重新构建snapshot
// 构建完成之后,切换snapshot和store,原子操作
static int skv_store_key_compaction(skv_ctx *ctx,
skv_string *key,
char **address) {
int i;
char *p = NULL;
size_t used_size;
size_t offset;
skv_store_item *p_cache_item = NULL;

p = SKV_STORE_GET_SNAPSHOT_ADDRESS(ctx->store_hdr, ctx->store_hdr->key_section);
offset = SKV_STORE_GET_SNAPSHOT_OFFSET(ctx->store_hdr->key_section);

used_size = 0;
for (i = 0; i < ctx->store_hdr->item_section.used; i++) {
p_cache_item = &ctx->store_items_snapshot[i];
memcpy(p + used_size,
(char *) ctx->store_hdr + p_cache_item->key_offset,
p_cache_item->key_len);
p_cache_item->key_offset = offset + used_size;
used_size += p_cache_item->key_len;
}

if ((key->len + 1 + used_size) > ctx->store_hdr->key_section.size) {
return SKV_OVERFLOW;
}

*address = p;
ctx->store_hdr->key_section.used = used_size;
ctx->store_hdr->key_section.id = SKV_STORE_GET_SEC_SNAPSHOT_ID(
ctx->store_hdr->key_section);

return SKV_OK;
}

static int skv_store_value_compaction(skv_ctx *ctx,
skv_string *value,
char **address) {
int i;
char *p = NULL;
size_t used_size;
size_t offset;
skv_store_item *p_cache_item;

p = SKV_STORE_GET_SNAPSHOT_ADDRESS(ctx->store_hdr,
ctx->store_hdr->value_section);

offset = SKV_STORE_GET_SNAPSHOT_OFFSET(ctx->store_hdr->value_section);

used_size = 0;
for (i = 0; i < ctx->store_hdr->item_section.used; i++) {
p_cache_item = &ctx->store_items_snapshot[i];
memcpy(p + used_size,
(char *) ctx->store_hdr + p_cache_item->value_offset,
p_cache_item->value_len);
p_cache_item->value_offset = offset + used_size;
used_size += p_cache_item->value_len;
}

if ((value->len + 1 + used_size) > ctx->store_hdr->value_section.size) {
return SKV_OVERFLOW;
}

*address = p + used_size;
ctx->store_hdr->value_section.used = used_size;
ctx->store_hdr->value_section.id =
SKV_STORE_GET_SEC_SNAPSHOT_ID(ctx->store_hdr->value_section);

return SKV_OK;
}

add kv

item add key value,新增数据在store末尾新增
* 如果发现key data section空间内还有足够的剩余空间,直接在store最后位置添加新增的key数据,char格式copy进去。 * 如果发现key data section空间内不够,重新创建一个snapshot(减少失去item指向的无效空间),然后将snapshot与store互换之后重复上一步操作。
* ~因为这个时候没有增加key-used数据,所以在get操作里面是看不到这个在末尾新增的数据块的~,这步骤想错了,注意set是创建item的双buffer每次set切换,但是key和value的双buffer只有在写满数据的时候才切换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
inline int skv_store_item_add(skv_ctx *ctx,
skv_store_item *store_item,
skv_str *key,
skv_str *value) {
int ret;
size_t offset;
char *p = nullptr;
skv_store_hdr *store_hdr = ctx->store_hdr;

if (store_hdr->item_section.used == store_hdr->item_section.size) {
return SKV_OVERFLOW;
}

//* 如果发现key data section空间内还有足够的剩余空间,直接在store最后位置添加新增的key数据,char格式copy进去。
// 因为这个时候没有增加item里面used数据,所以在get操作里面是看不到这个在末尾新增的数据块的。
//* 如果发现key data section空间内不够,重新创建一个snapshot(减少失去item指向的无效空间),然后将snapshot与store互换之后重复上一步操作。
if ((store_hdr->key_section.used + (key->len + 1)) >
store_hdr->key_section.size) {
DEBUG_LOG("Start to compact the key section.");

offset = SKV_STORE_GET_SNAPSHOT_OFFSET(store_hdr->key_section);
ret = skv_store_key_compaction(ctx, key, &p);
if (ret != SKV_OK) {
return ret;
}
} else {
p = SKV_STORE_GET_FREE_ADDRESS(store_hdr, store_hdr->key_section);
offset = SKV_STORE_GET_OFFSET(store_hdr->key_section);
}

// copy the data of the key to the cache
store_item->key_len = key->len + 1;
store_item->key_offset = offset + store_hdr->key_section.used;

// 字符串末尾添加 \0结尾
memcpy(p, key->data, key->len);
p += key->len;
*p = '\0';
store_hdr->key_section.used += store_item->key_len;

// copy the data of the value to the cache
p = nullptr;

if ((store_hdr->value_section.used + (value->len + 1)) >
store_hdr->value_section.size) {
DEBUG_LOG("Start to compact the value section.");

offset = SKV_STORE_GET_SNAPSHOT_OFFSET(store_hdr->value_section);

ret = skv_store_value_compaction(ctx, value, &p);
if (ret != SKV_OK) {
return ret;
}
} else {
p = SKV_STORE_GET_FREE_ADDRESS(store_hdr, store_hdr->value_section);
offset = SKV_STORE_GET_OFFSET(store_hdr->value_section);
}

store_item->value_offset = offset + store_hdr->value_section.used;
store_item->value_len = value->len + 1;

memcpy(p, value->data, value->len);
p += value->len;
*p = '\0';

store_hdr->value_section.used += store_item->value_len;
return SKV_OK;
}

replace kv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
inline int skv_store_item_replace(skv_ctx *ctx,
skv_store_item *store_item,
skv_str *value) {
char *p = nullptr;
skv_store_hdr *store_hdr = ctx->store_hdr;
size_t offset;
int ret;

// 使用snapshot重建的store内包含一组重复,重复最新输入的这组key-value
if ((store_hdr->value_section.used + (value->len + 1)) >
store_hdr->value_section.size) {
DEBUG_LOG("Start to compact the value section.");

offset = SKV_STORE_GET_SNAPSHOT_OFFSET(store_hdr->value_section);

ret = skv_store_value_compaction(ctx, value, &p);
if (ret != SKV_OK) {
return ret;
}
} else {
p = SKV_STORE_GET_FREE_ADDRESS(store_hdr, store_hdr->value_section);
offset = SKV_STORE_GET_OFFSET(store_hdr->value_section);
}

// copy the data of the value to the cache
store_item->value_offset = offset + store_hdr->value_section.used;
store_item->value_len = value->len + 1;

// 字符串末尾添加 \0结尾
memcpy(p, value->data, value->len);
p += value->len;
*p = '\0';

store_hdr->value_section.used += store_item->value_len;

return SKV_OK;
}

item sort

因为二分法的原因,需要整个item序列是有序的(根据key的内容)。
* key的长度
* key的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// 递归算法实现二分法排序
// 递归算法实现二分法排序
inline int skv_item_qsort_for_b_search(void *elt,
void *arg,
void *base,
size_t size,
int low,
int high,
int (*cmp)(const void *ptr_a, const void *ptr_b, void *arg)) {
int ret, mid;

if (high <= low) {
ret = cmp(elt, (char *) base + low * size, arg);
if (ret > 0) {
return low + 1;
}
return low;
}

mid = (low + high) / 2;

ret = cmp(elt, (char *) base + mid * size, arg);
if (ret == 0) {
return mid + 1;
} else if (ret > 0) {
return skv_item_qsort_for_b_search(elt, arg, base, size, mid + 1, high, cmp);
} else {
return skv_item_qsort_for_b_search(elt, arg, base, size, low, mid - 1, cmp);
}
}

// bsearch二分法索引配套的算法
inline void skv_item_sort(void *base,
size_t n,
size_t size,
void *arg,
int (*cmp)(const void *ptr_a, const void *ptr_b, void *arg)) {
char *p;
int pos, count;

count = (int) (n) -1;

p = (char *) malloc(size);
if (p == nullptr) {
return;
}

// take the last element to compare to the previous one
memcpy(p, (char *) base + count * size, size);

pos = skv_item_qsort_for_b_search(p, arg, base, size, 0, count, cmp);
if (pos < count) {
memmove((char *) base + (pos + 1) * size, (char *) base + pos * size, (count - pos) * size);
memcpy((char *) base + pos * size, p, size);
}

free(p);
}

// item 本身进行排序,根据key的值
inline int store_item_qsort_compare(const void *p1,
const void *p2,
void *arg) {
int ret;
char *store_hdr;
skv_store_item_wrapper *store_item_wrapper;

ret = (int) ((skv_store_item *) p1)->key_len - (int) ((skv_store_item *) p2)->key_len;
if (ret != 0) {
return ret;
}

store_item_wrapper = (skv_store_item_wrapper *) arg;
store_hdr = (char *) store_item_wrapper->ctx->store_hdr;
if (store_item_wrapper->key_offset == 0) {
ret = memcmp(store_hdr + ((skv_store_item *) p1)->key_offset,
store_hdr + ((skv_store_item *) p2)->key_offset,
((skv_store_item *) p1)->key_len);
} else {
ret = memcmp(store_hdr + store_item_wrapper->key_offset,
store_hdr + ((skv_store_item *) p2)->key_offset,
((skv_store_item *) p1)->key_len);
}
return ret;
}

set

注意 set操作里面,双buffer的切换操作每次切换的是item的双buffer, key和value的双buffer是在写满数据之后才进行切换,不跟随set操作切换buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 注意 set操作里面,双buffer的切换操作每次切换的是item的双buffer, key和value的双buffer是在写满数据之后才进行切换,不跟随set操作切换buffer
inline int skv_store_set(skv_ctx *ctx,
skv_str *key,
skv_str *value) {
int ret;
skv_store_item *p_cache_item = nullptr;
skv_store_section *p_item_section;
skv_store_item_wrapper store_item_wrapper;

// generate the snapshot of the cache items
skv_store_item_make_snapshot(ctx);
p_item_section = &ctx->store_hdr->item_section;

store_item_wrapper.ctx = ctx;
store_item_wrapper.key_offset =
(int64_t) (key->data - (char *) ctx->store_hdr);
store_item_wrapper.store_item.key_len = key->len + 1;

// search snapshot
p_cache_item = static_cast<skv_store_item *>(bsearch(&store_item_wrapper.store_item,
ctx->store_items_snapshot,
p_item_section->used,
sizeof(skv_store_item),
skv_store_item_key_compare));

if (p_cache_item == nullptr) {
DEBUG_LOG("Add key : %.*s - %.*s", key->len, key->data, value->len, value->data);
ret = skv_store_item_add(ctx, ctx->store_items_snapshot + p_item_section->used, key, value);
} else {
DEBUG_LOG("Replace key(%.*s) with (%.*s)", key->len, key->data,
value->len, value->data);
ret = skv_store_item_replace(ctx, p_cache_item, value);
}

if (ret != SKV_OK) {
return ret;
}

// Switch the cache item section if this key doesn't exist
if (p_cache_item == nullptr) {
p_item_section->used++;
store_item_wrapper.key_offset = 0;
skv_item_sort(ctx->store_items_snapshot,
ctx->store_hdr->item_section.used,
sizeof(skv_store_item),
&store_item_wrapper,
store_item_qsort_compare);
}

// item的buffer切换 实现lock-free
p_item_section->id = SKV_STORE_GET_SEC_SNAPSHOT_ID(ctx->store_hdr->item_section);
return SKV_OK;
}

Store Del

  • 删除的是最后一个item,直接item.used--
  • 删除的不是最后一个item,直接将下一个item至最后全部进一位
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    inline int skv_store_del(skv_ctx *ctx,
    skv_str *key) {
    skv_store_item *p_cache_item = nullptr;
    skv_store_section *p_item_section = nullptr;
    skv_store_item_wrapper store_item_wrapper;

    // generate the snapshot of the cache items
    skv_store_item_make_snapshot(ctx);

    p_item_section = &ctx->store_hdr->item_section;

    store_item_wrapper.ctx = ctx;
    store_item_wrapper.key_offset =
    (int64_t) (key->data - (char *) ctx->store_hdr);
    store_item_wrapper.store_item.key_len = key->len + 1;

    p_cache_item = static_cast<skv_store_item *>(bsearch(&store_item_wrapper.store_item,
    ctx->store_items_snapshot,
    p_item_section->used,
    sizeof(skv_store_item),
    skv_store_item_key_compare));
    if (p_cache_item == nullptr) {
    DEBUG_LOG("Not found key: %.*s", key->len, key->data);
    return SKV_NOT_FOUND;
    }

    if ((p_cache_item - ctx->store_items_snapshot + 1) < p_item_section->used) {
    memcpy(p_cache_item, p_cache_item + 1,
    sizeof(skv_store_item) * (p_item_section->used -
    (p_cache_item - ctx->store_items_snapshot) + 1));
    }

    p_item_section->id = SKV_STORE_GET_SEC_SNAPSHOT_ID(ctx->store_hdr->item_section);
    p_item_section->used--;

    return SKV_OK;
    }

Interface

在接口类添加了process_lockthread_lock的功能,实现低性能的锁安全的多写多读功能。 对于高性能的多写多读功能可以在set函数前置scmp的消息队列实现。 * MPSC_Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class ShmKV {
private:
skv_ctx ctx{};
CProcessMutex *ptr_process_mutex = nullptr;
std::mutex ptr_thread_lock;

private:
static void init_disk_file(const std::string &path,
const bool &write_mode) {
if (write_mode) {
if (remove(const_cast<char *>(path.c_str())) == 0)
printf("Removed %s succeeded.\n", const_cast<char *>(path.c_str()));
else
DEBUG_LOG("Removed %s failed.\n", const_cast<char *>(path.c_str()));
}
}

int init_mmap(const std::string &path,
const unsigned int &key_count,
const unsigned int &value_size,
const bool &write_mode) {
int ret;
if (write_mode) {
ret = skv_init(&ctx, const_cast<char *>(path.c_str()), key_count, value_size, SKV_OP_TYPE_RW);
} else {
ret = skv_init(&ctx, const_cast<char *>(path.c_str()), key_count, value_size, SKV_OP_TYPE_RO);
}

if (ret != SKV_OK) {
DEBUG_LOG("Failed to initialize skv.\n");
return ret;
}
return 0;
}

public:
ShmKV(const std::string &path,
const unsigned int &count,
const unsigned int &value_size,
const bool &write_mode,
const bool &init_disk,
const bool &thread_lock = false,
const bool &process_lock = false,
const std::string &process_mutex = "process_mutex") {
init_disk_file(path, init_disk);
int ret = init_mmap(path, count, value_size, write_mode);
if (ret != 0) {
throw std::runtime_error("SKV初始化失败!");
}

if (process_lock & thread_lock) {
DEBUG_LOG("error config, process_lock true, thread_lock true!");
exit(SKV_ERROR);
}

if (process_lock) {
ptr_process_mutex = new CProcessMutex(process_mutex.c_str());
}
}

~ShmKV() {
skv_finish(&ctx);
delete ptr_process_mutex;
}

public:
bool set(const char *key,
const unsigned int &key_len,
const char *value,
const unsigned int &value_len) {
skv_str skv_key, skv_value;
skv_key.len = key_len;
skv_key.data = const_cast<char *>(key);
skv_value.len = value_len;
skv_value.data = const_cast<char *>(value);

int ret = skv_set(&ctx, &skv_key, &skv_value);
if (ret != SKV_OK) {
DEBUG_LOG("Failed to set %.*s.\n", skv_key.len, skv_key.data);
return true;
}
return false;
}

bool set_process_mutex(const char *key,
const unsigned int &key_len,
const char *value,
const unsigned int &value_len) {
bool lock = ptr_process_mutex->Lock();
if (lock) {
bool ret = set(key, key_len, value, value_len);
lock = ptr_process_mutex->UnLock();
return ret;
} else {
return false;
}
}

bool set_thread_mutex(const char *key,
const unsigned int &key_len,
const char *value,
const unsigned int &value_len) {
ptr_thread_lock.lock();
bool ret = set(key, key_len, value, value_len);
ptr_thread_lock.unlock();
return ret;
}

bool get(const char *key,
const unsigned int &key_len,
skv_str *skv_value) {
skv_str skv_key;
skv_key.len = key_len;
skv_key.data = const_cast<char *>(key);

int ret = skv_get(&ctx, &skv_key, skv_value);
if (ret != SKV_OK) {
DEBUG_LOG("Failed to get %.*s.\n", skv_key.len, skv_key.data);
return true;
}
return false;
}

bool del(const char *key,
const unsigned int &key_len) {
skv_str skv_key;
skv_key.len = key_len;
skv_key.data = const_cast<char *>(key);

int ret = skv_del(&ctx, &skv_key);
if (ret != SKV_OK) {
return true;
}
return false;
}
};

Pybind11封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

class PyShmKV {
private:
ots::shmkv::ShmKV *p;

public:
PyShmKV(const std::string &path,
const unsigned int &count,
const unsigned int &value_size,
const bool &write_mode,
const bool &init_disk,
const bool &thread_lock = false,
const bool &process_lock = false,
const std::string &process_mutex = "process_mutex") {
p = new ots::shmkv::ShmKV(path, count, value_size, write_mode, init_disk, thread_lock, process_lock, process_mutex);
}

~PyShmKV() {
delete p;
}

bool set(const std::string &k,
const std::string &v,
const unsigned int &value_len) {

bool ret = p->set(const_cast<char *>(k.c_str()),
static_cast<int>(k.length()),
v.c_str(),
value_len);
return ret;
};

bool set_process_mutex(const std::string &k,
const std::string &v,
const unsigned int &value_len) {
bool ret = p->set_process_mutex(const_cast<char *>(k.c_str()),
static_cast<int>(k.length()),
v.c_str(),
value_len);
return ret;
};

bool set_thread_mutex(const std::string &k,
const std::string &v,
const unsigned int &value_len) {
bool ret = p->set_thread_mutex(const_cast<char *>(k.c_str()),
static_cast<int>(k.length()),
v.c_str(),
value_len);
return ret;
};

py::tuple get(const std::string &k) {
ots::shmkv::skv_str skv_value;
bool ret = p->get(const_cast<char *>(k.c_str()),
static_cast<int>(k.length()),
&skv_value);
if (ret) {
return py::make_tuple(true, "null");
}
std::string str = skv_value.data;
return py::make_tuple(false, str);
}

bool del(const std::string &k) {
bool ret = p->del(const_cast<char *>(k.c_str()),
static_cast<int>(k.length()));
return ret;
};
};

PYBIND11_MODULE(pyshmkv, m) {
py::class_<PyShmKV>(m, "PyShmKV")
.def(py::init<const std::string &,
const unsigned int &,
const unsigned int &,
const bool &,
const bool &,
const bool &,
const bool &,
const std::string &>())
.def("set", &PyShmKV::set)
.def("set_process_mutex", &PyShmKV::set_process_mutex)
.def("set_thread_mutex", &PyShmKV::set_thread_mutex)
.def("get", &PyShmKV::get)
.def("del_void", &PyShmKV::del);
}

Python封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from pickle import dumps, loads
from pyshmkv import PyShmKV as _PyShmKV


class PyShmKV(_PyShmKV):
def __init__(self,
path: str,
count: int,
value_size: int,
write_mode: bool,
init_disk: bool,
thread_lock: bool,
process_lock: bool,
process_mutex: str = "process_mutex"):
"""

:param path:
:param count:
:param value_size:
:param write_mode:
:param init_disk:
:param thread_lock:
:param process_lock:
:param process_mutex:
"""
super().__init__(path, count, value_size, write_mode, init_disk, thread_lock, process_lock, process_mutex)

def set_data(self, key: str, value:any):
"""
任意数据通过pickle转换为二进制数据
:param key:
:param value:
:return:
"""
bin_data = dumps(value, 0)
return self.set(key, bin_data, len(bin_data)+1)

def set_data_thread_mutex(self, key: str, value):
"""
任意数据通过pickle转换为二进制数据
:param key:
:param value:
:return:
"""
bin_data = dumps(value, 0)
return self.set_thread_mutex(key, bin_data, len(bin_data))

def set_data_process_mutex(self, key: str, value):
"""
任意数据通过pickle转换为二进制数据
:param key:
:param value:
:return:
"""
bin_data = dumps(value, 0)
return self.set_process_mutex(key, bin_data, len(bin_data))

def get_data(self, key: str):
ret, bin_data = self.get(key)
if ret:
return ret, None
else:
return ret, loads(bin_data.encode())

def del_data(self, key):
self.del_void(key)

任意数据写入测试

Python任意数据类型写入读取测试

1
2
3
4
5
6
7
from pandas import DataFrame
data = DataFrame(data=[1, 2, 3], columns=["test"])
bin_data = dumps(data, 0)
print(type(bin_data))

shm.set_data("test1", data)
print(shm.get_data("test1"))


参考