2022-08-13更新
在实盘程序运行中发现shmkv的原始实现代码有bug,修复bug重构版本store 。
~PyShmKV 通过修改pupa内存数据库实现python多进程共享内存数据库,可以在python内将任意数据类型的数据通过kv方式跨进程交互。~
内存内数据体结构
pupa 原作者设计是通过双buffer实现的在一写多读情况下无锁的KV数据库,代码由纯C编写,笔者在使用的时候发现核心部分有bug,并且被设计为单独运行的数据库。这里修改为C++并添加封装变成进程内组件,添加类封装,方便创建多个实例。
如下图,是pupa的内存结构设计
其中Header部分由三个结构体组成即item、key和value三部分在内存中的首位置和字节长度。
sec1和sec2分别表示双buffer的内存首位置
size表示预设大小,used表示实际使用量
id=1 or 2 表示当前使用的buffer是哪个
这个内存数据库的设计是通过双buffer
的item
实现索引一写多读的lock-free。因为要适配共享内存,需要内存连续且有限长度(不能动态创建内存空间),所以对key和value也构建的双buffer的结构,这样可以通过make_snapshot
的方式实现内存释放功能。对于item
部分,每次set都创建一个新的snapshot操作然后切换,对于
key
和value
两个部分,只有内存空间写满了之后才进行make_snapshot
,通过重新去掉无item指向的内存空间释放内存。
1 2 3 4 5 6 7 8 9 10 typedef struct { unsigned int id; unsigned int size; unsigned int used; size_t sec1_offset; size_t sec2_offset; } skv_store_section;
在这个内存结构设计中,item
是真正的索引,item
的结构体内部存储key和value在内存中偏移量。使用二分法寻找对应的比对key,从而确定相应的item获取value。
1 2 3 4 5 6 7 typedef struct { size_t key_offset; size_t value_offset; unsigned int key_len; unsigned int value_len; } skv_store_item;
Key
和Value
为双buffer的存储区域。
Init
size表示空间大小,是否使用超出范围
used表示已经存储的key数量
id表示当前使用过的内存空间
offset这个区域内存首位置偏移量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 inline size_t skv_store_init ( skv_store_hdr *store_hdr, unsigned int key_count, unsigned int value_size) { size_t offset = sizeof (skv_store_hdr); store_hdr->item_section.size = key_count; store_hdr->item_section.used = 0 ; store_hdr->item_section.id = SKV_STORE_SECTION_ONE; store_hdr->item_section.sec1_offset = offset; offset += key_count * (long ) sizeof (skv_store_item); store_hdr->item_section.sec2_offset = offset; offset += key_count * (long ) sizeof (skv_store_item); store_hdr->key_section.size = SKV_KEY_AVERAGE_LEN * key_count; store_hdr->key_section.used = 0 ; store_hdr->key_section.id = SKV_STORE_SECTION_ONE; store_hdr->key_section.sec1_offset = offset; offset += key_count * SKV_KEY_AVERAGE_LEN; store_hdr->key_section.sec2_offset = offset; offset += key_count * SKV_KEY_AVERAGE_LEN; store_hdr->value_section.size = value_size * key_count; store_hdr->value_section.used = 0 ; store_hdr->value_section.id = SKV_STORE_SECTION_ONE; store_hdr->value_section.sec1_offset = offset; offset += key_count * value_size; store_hdr->value_section.sec2_offset = offset; offset += key_count * value_size; return offset; }
Store Get
Struct
这部分代码里面store_item_wrapper
是非常令人费解的设计,这个数据结构是计算某一个item数据结构相对于共享内存的偏移量。
但是无论是set还是get都是增量信息,没有必要搞这样的操作。
1 2 3 4 5 6 7 8 typedef struct { skv_store_item store_item; skv_ctx *ctx; size_t key_offset; } skv_store_item_wrapper;
1 2 3 4 store_item_wrapper.ctx = ctx; store_item_wrapper.key_offset = (key->data - (char *) ctx->store_hdr); store_item_wrapper.store_item.key_len = key->len + 1 ;
bsearch
Get
的查找功能使用C语言基础的bsearch
函数实现
1 2 3 4 5 void *bsearch (const void *key, const void *base, size_t nitems, size_t size, int (*compar)(const void *, const void *)) ;
* key
-- 指向要查找的元素的指针,类型转换为
void *
。
* base
-- 指向进行查找的数组的第一个对象的指针,类型转换为
void *
。
* nitems
-- base 所指向的数组中元素的个数。
* size
-- 数组中每个元素的大小,以字节为单位。
* compar
-- 用来比较两个元素的函数。
memcmp
memcmp
把存储区 str1 和存储区 str2 的前 n 个字节进行比较
1 int memcmp (const void *str1, const void *str2, size_t n) ;
offsetof
offsetof(type, member-designator)
会生成一个类型为
size_t
的整型常量,它是一个结构成员相对于结构开头的字节偏移量。成员是由
member-designator
给定的,结构的名称是在 type
中给定的。
get
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 inline int skv_store_item_compare_ ( const void *p1, const void *p2, void *arg) { int ret; char *store_hdr = nullptr ; skv_store_item_wrapper *store_item_wrapper = nullptr ; ret = (int ) ((skv_store_item *) p1)->key_len - (int ) ((skv_store_item *) p2)->key_len; if (ret != 0 ) { return ret; } store_item_wrapper = (skv_store_item_wrapper *) arg; store_hdr = (char *) store_item_wrapper->ctx->store_hdr; if (store_item_wrapper->key_offset == 0 ) { ret = memcmp (store_hdr + ((skv_store_item *) p1)->key_offset, store_hdr + ((skv_store_item *) p2)->key_offset, ((skv_store_item *) p1)->key_len); } else { ret = memcmp (store_hdr + store_item_wrapper->key_offset, store_hdr + ((skv_store_item *) p2)->key_offset, ((skv_store_item *) p1)->key_len); } return ret; } inline int skv_store_item_key_compare (const void *p1, const void *p2) { skv_store_item_wrapper *p_store_item_wrapper = nullptr ; p_store_item_wrapper = (skv_store_item_wrapper *) ((char *) p1 - offsetof (skv_store_item_wrapper, store_item)); return skv_store_item_compare_ ((void *) p1, p2, p_store_item_wrapper); } inline int skv_store_get (skv_ctx *ctx, skv_str *key, skv_str *value) { skv_store_item *p_cache_item = nullptr ; skv_store_item *p_store_item = nullptr ; skv_store_section *p_item_section = nullptr ; skv_store_item_wrapper store_item_wrapper; p_store_item = (skv_store_item *) SKV_STORE_GET_ADDRESS ( ctx->store_hdr, ctx->store_hdr->item_section); store_item_wrapper.ctx = ctx; store_item_wrapper.key_offset = (key->data - (char *) ctx->store_hdr); store_item_wrapper.store_item.key_len = key->len + 1 ; p_item_section = &ctx->store_hdr->item_section; p_cache_item = static_cast <skv_store_item *>(bsearch (&store_item_wrapper.store_item, p_store_item, p_item_section->used, sizeof (skv_store_item), skv_store_item_key_compare)); if (p_cache_item == nullptr ) { DEBUG_LOG ("Not found key: %.*s" , key->len, key->data); return SKV_NOT_FOUND; } if (p_cache_item->key_len == 0 ) { DEBUG_LOG ("The length of the key (%.*s) is invalid." , key->len, key->data); return SKV_ERROR; } value->data = (char *) ctx->store_hdr + p_cache_item->value_offset; value->len = p_cache_item->value_len; return SKV_OK; }
Store Set
make_snapshot
每次set操作都是在操作item的snapshot快照。set操作里面,双buffer的切换操作每次切换的是item的双buffer,
key和value的双buffer是在写满数据之后才进行切换,不跟随set操作切换buffer。
make item snapshot
bsearch, add or replace
change item section id(原子操作,lock-free read)
创建snapshot,每次set操作都是在snapshot中操作,然后再切换id指向。
* todo:
这里直接使用下一个函数的方式创建snapshot的方式直接管理内存应该更好?for循环memcpy和直接大内存memcpy性能差别有多少?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static void pupa_store_item_make_snapshot (skv_ctx *ctx) { skv_store_item *p_cache_items = NULL ; skv_store_item *p_store_items = NULL ; p_cache_items = (skv_store_item *) SKV_STORE_GET_SNAPSHOT_ADDRESS ( ctx->store_hdr, ctx->store_hdr->item_section); p_store_items = (skv_store_item *) SKV_STORE_GET_ADDRESS ( ctx->store_hdr, ctx->store_hdr->item_section); if (ctx->store_hdr->item_section.used) { memcpy (p_cache_items, p_store_items, sizeof (skv_store_item) * ctx->store_hdr->item_section.used); } ctx->store_items_snapshot = p_cache_items; }
free memory and change
section id
在set操作中,add or
replace都是直接在内存地址的末尾copy新的内容,这样会造成前面的内存中的内容并没有删除,只是修改了指针导致内存空间没有回收利用。
当空间填满的时候需要重新排序copy内容,释放没有item指向的内存空间。
*
这个库这里设计会产生一个类似于gc的操作,导致set写入时间不稳定。另外set和get使用bsearch操作也会导致操作时间不稳定,随着size变化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 static int skv_store_key_compaction (skv_ctx *ctx, skv_string *key, char **address) { int i; char *p = NULL ; size_t used_size; size_t offset; skv_store_item *p_cache_item = NULL ; p = SKV_STORE_GET_SNAPSHOT_ADDRESS (ctx->store_hdr, ctx->store_hdr->key_section); offset = SKV_STORE_GET_SNAPSHOT_OFFSET (ctx->store_hdr->key_section); used_size = 0 ; for (i = 0 ; i < ctx->store_hdr->item_section.used; i++) { p_cache_item = &ctx->store_items_snapshot[i]; memcpy (p + used_size, (char *) ctx->store_hdr + p_cache_item->key_offset, p_cache_item->key_len); p_cache_item->key_offset = offset + used_size; used_size += p_cache_item->key_len; } if ((key->len + 1 + used_size) > ctx->store_hdr->key_section.size) { return SKV_OVERFLOW; } *address = p; ctx->store_hdr->key_section.used = used_size; ctx->store_hdr->key_section.id = SKV_STORE_GET_SEC_SNAPSHOT_ID ( ctx->store_hdr->key_section); return SKV_OK; } static int skv_store_value_compaction (skv_ctx *ctx, skv_string *value, char **address) { int i; char *p = NULL ; size_t used_size; size_t offset; skv_store_item *p_cache_item; p = SKV_STORE_GET_SNAPSHOT_ADDRESS (ctx->store_hdr, ctx->store_hdr->value_section); offset = SKV_STORE_GET_SNAPSHOT_OFFSET (ctx->store_hdr->value_section); used_size = 0 ; for (i = 0 ; i < ctx->store_hdr->item_section.used; i++) { p_cache_item = &ctx->store_items_snapshot[i]; memcpy (p + used_size, (char *) ctx->store_hdr + p_cache_item->value_offset, p_cache_item->value_len); p_cache_item->value_offset = offset + used_size; used_size += p_cache_item->value_len; } if ((value->len + 1 + used_size) > ctx->store_hdr->value_section.size) { return SKV_OVERFLOW; } *address = p + used_size; ctx->store_hdr->value_section.used = used_size; ctx->store_hdr->value_section.id = SKV_STORE_GET_SEC_SNAPSHOT_ID (ctx->store_hdr->value_section); return SKV_OK; }
add kv
item add key value,新增数据在store末尾新增
* 如果发现key data
section空间内还有足够的剩余空间,直接在store最后位置添加新增的key数据,char格式copy进去。
* 如果发现key data
section空间内不够,重新创建一个snapshot(减少失去item指向的无效空间),然后将snapshot与store互换之后重复上一步操作。
*
~因为这个时候没有增加key-used数据,所以在get操作里面是看不到这个在末尾新增的数据块的~,这步骤想错了,注意set是创建item的双buffer每次set切换,但是key和value的双buffer只有在写满数据的时候才切换。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 inline int skv_store_item_add (skv_ctx *ctx, skv_store_item *store_item, skv_str *key, skv_str *value) { int ret; size_t offset; char *p = nullptr ; skv_store_hdr *store_hdr = ctx->store_hdr; if (store_hdr->item_section.used == store_hdr->item_section.size) { return SKV_OVERFLOW; } if ((store_hdr->key_section.used + (key->len + 1 )) > store_hdr->key_section.size) { DEBUG_LOG ("Start to compact the key section." ); offset = SKV_STORE_GET_SNAPSHOT_OFFSET (store_hdr->key_section); ret = skv_store_key_compaction (ctx, key, &p); if (ret != SKV_OK) { return ret; } } else { p = SKV_STORE_GET_FREE_ADDRESS (store_hdr, store_hdr->key_section); offset = SKV_STORE_GET_OFFSET (store_hdr->key_section); } store_item->key_len = key->len + 1 ; store_item->key_offset = offset + store_hdr->key_section.used; memcpy (p, key->data, key->len); p += key->len; *p = '\0' ; store_hdr->key_section.used += store_item->key_len; p = nullptr ; if ((store_hdr->value_section.used + (value->len + 1 )) > store_hdr->value_section.size) { DEBUG_LOG ("Start to compact the value section." ); offset = SKV_STORE_GET_SNAPSHOT_OFFSET (store_hdr->value_section); ret = skv_store_value_compaction (ctx, value, &p); if (ret != SKV_OK) { return ret; } } else { p = SKV_STORE_GET_FREE_ADDRESS (store_hdr, store_hdr->value_section); offset = SKV_STORE_GET_OFFSET (store_hdr->value_section); } store_item->value_offset = offset + store_hdr->value_section.used; store_item->value_len = value->len + 1 ; memcpy (p, value->data, value->len); p += value->len; *p = '\0' ; store_hdr->value_section.used += store_item->value_len; return SKV_OK; }
replace kv
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 inline int skv_store_item_replace (skv_ctx *ctx, skv_store_item *store_item, skv_str *value) { char *p = nullptr ; skv_store_hdr *store_hdr = ctx->store_hdr; size_t offset; int ret; if ((store_hdr->value_section.used + (value->len + 1 )) > store_hdr->value_section.size) { DEBUG_LOG ("Start to compact the value section." ); offset = SKV_STORE_GET_SNAPSHOT_OFFSET (store_hdr->value_section); ret = skv_store_value_compaction (ctx, value, &p); if (ret != SKV_OK) { return ret; } } else { p = SKV_STORE_GET_FREE_ADDRESS (store_hdr, store_hdr->value_section); offset = SKV_STORE_GET_OFFSET (store_hdr->value_section); } store_item->value_offset = offset + store_hdr->value_section.used; store_item->value_len = value->len + 1 ; memcpy (p, value->data, value->len); p += value->len; *p = '\0' ; store_hdr->value_section.used += store_item->value_len; return SKV_OK; }
item sort
因为二分法的原因,需要整个item序列是有序的(根据key的内容)。
* key的长度
* key的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 inline int skv_item_qsort_for_b_search (void *elt, void *arg, void *base, size_t size, int low, int high, int (*cmp)(const void *ptr_a, const void *ptr_b, void *arg)) { int ret, mid; if (high <= low) { ret = cmp (elt, (char *) base + low * size, arg); if (ret > 0 ) { return low + 1 ; } return low; } mid = (low + high) / 2 ; ret = cmp (elt, (char *) base + mid * size, arg); if (ret == 0 ) { return mid + 1 ; } else if (ret > 0 ) { return skv_item_qsort_for_b_search (elt, arg, base, size, mid + 1 , high, cmp); } else { return skv_item_qsort_for_b_search (elt, arg, base, size, low, mid - 1 , cmp); } } inline void skv_item_sort (void *base, size_t n, size_t size, void *arg, int (*cmp)(const void *ptr_a, const void *ptr_b, void *arg)) { char *p; int pos, count; count = (int ) (n) -1 ; p = (char *) malloc (size); if (p == nullptr ) { return ; } memcpy (p, (char *) base + count * size, size); pos = skv_item_qsort_for_b_search (p, arg, base, size, 0 , count, cmp); if (pos < count) { memmove ((char *) base + (pos + 1 ) * size, (char *) base + pos * size, (count - pos) * size); memcpy ((char *) base + pos * size, p, size); } free (p); } inline int store_item_qsort_compare (const void *p1, const void *p2, void *arg) { int ret; char *store_hdr; skv_store_item_wrapper *store_item_wrapper; ret = (int ) ((skv_store_item *) p1)->key_len - (int ) ((skv_store_item *) p2)->key_len; if (ret != 0 ) { return ret; } store_item_wrapper = (skv_store_item_wrapper *) arg; store_hdr = (char *) store_item_wrapper->ctx->store_hdr; if (store_item_wrapper->key_offset == 0 ) { ret = memcmp (store_hdr + ((skv_store_item *) p1)->key_offset, store_hdr + ((skv_store_item *) p2)->key_offset, ((skv_store_item *) p1)->key_len); } else { ret = memcmp (store_hdr + store_item_wrapper->key_offset, store_hdr + ((skv_store_item *) p2)->key_offset, ((skv_store_item *) p1)->key_len); } return ret; }
set
注意 set操作里面,双buffer的切换操作每次切换的是item的双buffer,
key和value的双buffer是在写满数据之后才进行切换,不跟随set操作切换buffer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 inline int skv_store_set (skv_ctx *ctx, skv_str *key, skv_str *value) { int ret; skv_store_item *p_cache_item = nullptr ; skv_store_section *p_item_section; skv_store_item_wrapper store_item_wrapper; skv_store_item_make_snapshot (ctx); p_item_section = &ctx->store_hdr->item_section; store_item_wrapper.ctx = ctx; store_item_wrapper.key_offset = (int64_t ) (key->data - (char *) ctx->store_hdr); store_item_wrapper.store_item.key_len = key->len + 1 ; p_cache_item = static_cast <skv_store_item *>(bsearch (&store_item_wrapper.store_item, ctx->store_items_snapshot, p_item_section->used, sizeof (skv_store_item), skv_store_item_key_compare)); if (p_cache_item == nullptr ) { DEBUG_LOG ("Add key : %.*s - %.*s" , key->len, key->data, value->len, value->data); ret = skv_store_item_add (ctx, ctx->store_items_snapshot + p_item_section->used, key, value); } else { DEBUG_LOG ("Replace key(%.*s) with (%.*s)" , key->len, key->data, value->len, value->data); ret = skv_store_item_replace (ctx, p_cache_item, value); } if (ret != SKV_OK) { return ret; } if (p_cache_item == nullptr ) { p_item_section->used++; store_item_wrapper.key_offset = 0 ; skv_item_sort (ctx->store_items_snapshot, ctx->store_hdr->item_section.used, sizeof (skv_store_item), &store_item_wrapper, store_item_qsort_compare); } p_item_section->id = SKV_STORE_GET_SEC_SNAPSHOT_ID (ctx->store_hdr->item_section); return SKV_OK; }
Store Del
删除的是最后一个item,直接item.used--
删除的不是最后一个item,直接将下一个item至最后全部进一位
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 inline int skv_store_del (skv_ctx *ctx, skv_str *key) { skv_store_item *p_cache_item = nullptr ; skv_store_section *p_item_section = nullptr ; skv_store_item_wrapper store_item_wrapper; skv_store_item_make_snapshot (ctx); p_item_section = &ctx->store_hdr->item_section; store_item_wrapper.ctx = ctx; store_item_wrapper.key_offset = (int64_t ) (key->data - (char *) ctx->store_hdr); store_item_wrapper.store_item.key_len = key->len + 1 ; p_cache_item = static_cast <skv_store_item *>(bsearch (&store_item_wrapper.store_item, ctx->store_items_snapshot, p_item_section->used, sizeof (skv_store_item), skv_store_item_key_compare)); if (p_cache_item == nullptr ) { DEBUG_LOG ("Not found key: %.*s" , key->len, key->data); return SKV_NOT_FOUND; } if ((p_cache_item - ctx->store_items_snapshot + 1 ) < p_item_section->used) { memcpy (p_cache_item, p_cache_item + 1 , sizeof (skv_store_item) * (p_item_section->used - (p_cache_item - ctx->store_items_snapshot) + 1 )); } p_item_section->id = SKV_STORE_GET_SEC_SNAPSHOT_ID (ctx->store_hdr->item_section); p_item_section->used--; return SKV_OK; }
Interface
在接口类添加了process_lock
和thread_lock
的功能,实现低性能的锁安全的多写多读功能。
对于高性能的多写多读功能可以在set
函数前置scmp
的消息队列实现。
* MPSC_Queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 class ShmKV {private : skv_ctx ctx{}; CProcessMutex *ptr_process_mutex = nullptr ; std::mutex ptr_thread_lock; private : static void init_disk_file (const std::string &path, const bool &write_mode) { if (write_mode) { if (remove (const_cast <char *>(path.c_str ())) == 0 ) printf ("Removed %s succeeded.\n" , const_cast <char *>(path.c_str ())); else DEBUG_LOG ("Removed %s failed.\n" , const_cast <char *>(path.c_str ())); } } int init_mmap (const std::string &path, const unsigned int &key_count, const unsigned int &value_size, const bool &write_mode) { int ret; if (write_mode) { ret = skv_init (&ctx, const_cast <char *>(path.c_str ()), key_count, value_size, SKV_OP_TYPE_RW); } else { ret = skv_init (&ctx, const_cast <char *>(path.c_str ()), key_count, value_size, SKV_OP_TYPE_RO); } if (ret != SKV_OK) { DEBUG_LOG ("Failed to initialize skv.\n" ); return ret; } return 0 ; } public : ShmKV (const std::string &path, const unsigned int &count, const unsigned int &value_size, const bool &write_mode, const bool &init_disk, const bool &thread_lock = false , const bool &process_lock = false , const std::string &process_mutex = "process_mutex" ) { init_disk_file (path, init_disk); int ret = init_mmap (path, count, value_size, write_mode); if (ret != 0 ) { throw std::runtime_error ("SKV初始化失败!" ); } if (process_lock & thread_lock) { DEBUG_LOG ("error config, process_lock true, thread_lock true!" ); exit (SKV_ERROR); } if (process_lock) { ptr_process_mutex = new CProcessMutex (process_mutex.c_str ()); } } ~ShmKV () { skv_finish (&ctx); delete ptr_process_mutex; } public : bool set (const char *key, const unsigned int &key_len, const char *value, const unsigned int &value_len) { skv_str skv_key, skv_value; skv_key.len = key_len; skv_key.data = const_cast <char *>(key); skv_value.len = value_len; skv_value.data = const_cast <char *>(value); int ret = skv_set (&ctx, &skv_key, &skv_value); if (ret != SKV_OK) { DEBUG_LOG ("Failed to set %.*s.\n" , skv_key.len, skv_key.data); return true ; } return false ; } bool set_process_mutex (const char *key, const unsigned int &key_len, const char *value, const unsigned int &value_len) { bool lock = ptr_process_mutex->Lock (); if (lock) { bool ret = set (key, key_len, value, value_len); lock = ptr_process_mutex->UnLock (); return ret; } else { return false ; } } bool set_thread_mutex (const char *key, const unsigned int &key_len, const char *value, const unsigned int &value_len) { ptr_thread_lock.lock (); bool ret = set (key, key_len, value, value_len); ptr_thread_lock.unlock (); return ret; } bool get (const char *key, const unsigned int &key_len, skv_str *skv_value) { skv_str skv_key; skv_key.len = key_len; skv_key.data = const_cast <char *>(key); int ret = skv_get (&ctx, &skv_key, skv_value); if (ret != SKV_OK) { DEBUG_LOG ("Failed to get %.*s.\n" , skv_key.len, skv_key.data); return true ; } return false ; } bool del (const char *key, const unsigned int &key_len) { skv_str skv_key; skv_key.len = key_len; skv_key.data = const_cast <char *>(key); int ret = skv_del (&ctx, &skv_key); if (ret != SKV_OK) { return true ; } return false ; } };
Pybind11封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 class PyShmKV {private : ots::shmkv::ShmKV *p; public : PyShmKV (const std::string &path, const unsigned int &count, const unsigned int &value_size, const bool &write_mode, const bool &init_disk, const bool &thread_lock = false , const bool &process_lock = false , const std::string &process_mutex = "process_mutex" ) { p = new ots::shmkv::ShmKV (path, count, value_size, write_mode, init_disk, thread_lock, process_lock, process_mutex); } ~PyShmKV () { delete p; } bool set (const std::string &k, const std::string &v, const unsigned int &value_len) { bool ret = p->set (const_cast <char *>(k.c_str ()), static_cast <int >(k.length ()), v.c_str (), value_len); return ret; }; bool set_process_mutex (const std::string &k, const std::string &v, const unsigned int &value_len) { bool ret = p->set_process_mutex (const_cast <char *>(k.c_str ()), static_cast <int >(k.length ()), v.c_str (), value_len); return ret; }; bool set_thread_mutex (const std::string &k, const std::string &v, const unsigned int &value_len) { bool ret = p->set_thread_mutex (const_cast <char *>(k.c_str ()), static_cast <int >(k.length ()), v.c_str (), value_len); return ret; }; py::tuple get (const std::string &k) { ots::shmkv::skv_str skv_value; bool ret = p->get (const_cast <char *>(k.c_str ()), static_cast <int >(k.length ()), &skv_value); if (ret) { return py::make_tuple (true , "null" ); } std::string str = skv_value.data; return py::make_tuple (false , str); } bool del (const std::string &k) { bool ret = p->del (const_cast <char *>(k.c_str ()), static_cast <int >(k.length ())); return ret; }; }; PYBIND11_MODULE (pyshmkv, m) { py::class_<PyShmKV>(m, "PyShmKV" ) .def (py::init<const std::string &, const unsigned int &, const unsigned int &, const bool &, const bool &, const bool &, const bool &, const std::string &>()) .def ("set" , &PyShmKV::set) .def ("set_process_mutex" , &PyShmKV::set_process_mutex) .def ("set_thread_mutex" , &PyShmKV::set_thread_mutex) .def ("get" , &PyShmKV::get) .def ("del_void" , &PyShmKV::del); }
Python封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 from pickle import dumps, loadsfrom pyshmkv import PyShmKV as _PyShmKVclass PyShmKV (_PyShmKV ): def __init__ (self, path: str , count: int , value_size: int , write_mode: bool , init_disk: bool , thread_lock: bool , process_lock: bool , process_mutex: str = "process_mutex" ): """ :param path: :param count: :param value_size: :param write_mode: :param init_disk: :param thread_lock: :param process_lock: :param process_mutex: """ super ().__init__(path, count, value_size, write_mode, init_disk, thread_lock, process_lock, process_mutex) def set_data (self, key: str , value:any ): """ 任意数据通过pickle转换为二进制数据 :param key: :param value: :return: """ bin_data = dumps(value, 0 ) return self.set (key, bin_data, len (bin_data)+1 ) def set_data_thread_mutex (self, key: str , value ): """ 任意数据通过pickle转换为二进制数据 :param key: :param value: :return: """ bin_data = dumps(value, 0 ) return self.set_thread_mutex(key, bin_data, len (bin_data)) def set_data_process_mutex (self, key: str , value ): """ 任意数据通过pickle转换为二进制数据 :param key: :param value: :return: """ bin_data = dumps(value, 0 ) return self.set_process_mutex(key, bin_data, len (bin_data)) def get_data (self, key: str ): ret, bin_data = self.get(key) if ret: return ret, None else : return ret, loads(bin_data.encode()) def del_data (self, key ): self.del_void(key)
任意数据写入测试
Python任意数据类型写入读取测试 1 2 3 4 5 6 7 from pandas import DataFramedata = DataFrame(data=[1 , 2 , 3 ], columns=["test" ]) bin_data = dumps(data, 0 ) print (type (bin_data))shm.set_data("test1" , data) print (shm.get_data("test1" ))
参考