0%

使用OpenMP的Reduction的User Defined Reduction Facility实现Matrix按索引累加

OpenMP基本概念

OpenMP是一种用于共享内存并行系统的多线程程序设计方案,支持的编程语言包括C、C++和Fortran。OpenMP提供了对并行算法的高层抽象描述,特别适合在多核CPU机器上的并行程序设计。编译器根据程序中添加的pragma指令,自动将程序并行处理,使用OpenMP降低了并行编程的难度和复杂度。当编译器不支持OpenMP时,程序会退化成普通(串行)程序。程序中已有的OpenMP指令不会影响程序的正常编译运行。

1
2
3
4
5
private:指定一个或多个变量在每个线程中都有它自己的私有副本;
reduction:用来指定一个或多个变量是私有的,并且在并行处理结束后这些变量要执行指定的归约运算,并将结果返回给主线程同名变量;
num_threads:指定并行域内的线程的数目;
shared:指定一个或多个变量为多个线程间的共享变量;
default:用来指定并行域内的变量的使用方式,缺省是shared。

Reduction子句

OpenMP中的归约是parallel并行指令的reduction子句,在子句中指定归约操作符和归约变量。

归约操作符是序列中的两两元素做的运算,一定是一个二元运算符。归约变量则保存归约操作的中间结果。OpenMP用归约变量为每个线程创建一个私有的变量,用来存储自己归约的结果,所以归约的代码不需要critical保护也不会发生冲突:

1
2
3
4
5
6
int sum = 0; // 共享变量
//归约子句(归约操作符:归约变量)
# pragma omp parallel num_threads(thrdCnt) reduction(+:sum)
{
//循环结构,并行加速部分
}

OpenMP允许使用的归约操作符只有+,*,-,&,|,^,&&,||八种,对于单纯的汇总计算可以直接使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

py::array_t<double> calc_greeks_monitor(
const py::array_t<bool> &is_call,
const py::array_t<double> &S,
const py::array_t<double> &X,
const py::array_t<double> &T,
const py::array_t<double> &b,
const py::array_t<double> &r,
const py::array_t<double> &sigma,
const py::array_t<int> &unit,
const py::array_t<int> &pos
) {
py::buffer_info buf_is_call = is_call.request();
py::buffer_info buf_spot = S.request();
py::buffer_info buf_strike = X.request();
py::buffer_info buf_time = T.request();
py::buffer_info buf_carry_cost = b.request();
py::buffer_info buf_risk_free = r.request();
py::buffer_info buf_sigma = sigma.request();
py::buffer_info buf_unit = unit.request();
py::buffer_info buf_pos = pos.request();

bool *ptr_is_call = static_cast<bool *>(buf_is_call.ptr);
auto *ptr_spot = static_cast<double *>(buf_spot.ptr);
auto *ptr_strike = static_cast<double *>(buf_strike.ptr);
auto *ptr_time = static_cast<double *>(buf_time.ptr);
auto *ptr_carry_cost = static_cast<double *>(buf_carry_cost.ptr);
auto *ptr_risk_free = static_cast<double *>(buf_risk_free.ptr);
auto *ptr_sigma = static_cast<double *>(buf_sigma.ptr);
auto *ptr_unit = static_cast<int *>(buf_unit.ptr);
auto *ptr_pos = static_cast<int *>(buf_pos.ptr);

double delta = 0.0;
double cash_delta = 0.0;
double gamma = 0.0;
double vega = 0.0;
double theta = 0.0;

{
#pragma omp parallel for reduction(+:cash_delta) reduction(+:delta) reduction(+:gamma) reduction(+:theta) reduction(+:vega)
for (int idx = 0; idx < buf_spot.shape[0]; idx++) {
const int unit_ = ptr_unit[idx];
const int pos_ = ptr_pos[idx];

const double d = flow::model::bjerk02::calc_delta(
ptr_is_call[idx],
ptr_spot[idx],
ptr_strike[idx],
ptr_time[idx],
ptr_carry_cost[idx],
ptr_risk_free[idx],
ptr_sigma[idx]);
const double g = flow::model::bjerk02::calc_gamma(
ptr_is_call[idx],
ptr_spot[idx],
ptr_strike[idx],
ptr_time[idx],
ptr_carry_cost[idx],
ptr_risk_free[idx],
ptr_sigma[idx]);
const double t = flow::model::bjerk02::calc_theta(
ptr_is_call[idx],
ptr_spot[idx],
ptr_strike[idx],
ptr_time[idx],
ptr_carry_cost[idx],
ptr_risk_free[idx],
ptr_sigma[idx]);
const double v = flow::model::bjerk02::calc_vega(
ptr_is_call[idx],
ptr_spot[idx],
ptr_strike[idx],
ptr_time[idx],
ptr_carry_cost[idx],
ptr_risk_free[idx],
ptr_sigma[idx]);

cash_delta += nan_to_num(ptr_spot[idx] * d * unit_ * pos_);
delta += nan_to_num(d * unit_ * pos_);
gamma += nan_to_num(g * unit_ * pos_);
theta += nan_to_num(t * unit_ * pos_);
vega += nan_to_num(v * unit_ * pos_);
}
}

// cash-delta, delta, gamma-percent,theta day-decay, vega-percent
auto result = py::array_t<double>(5);
py::buffer_info buffer_result = result.request();
auto *ptr_result = static_cast<double *>(buffer_result.ptr);
ptr_result[0] = cash_delta;
ptr_result[1] = delta;
ptr_result[2] = gamma;
ptr_result[3] = theta;
ptr_result[4] = vega;
return result;
}

使用User Defined Reduction Facility功能实现Matrix按索引累加

如果想按照月份进行汇总计算并输出表格,则需要自定义特殊结构体的重载操作符

  • 声明结构体
  • 定义结构体的操作符
  • 使用pragma omp declare reduction(…omp_out=…) initializer(…)语句添加功能
  • 使用pragma omp parallel for reduction(自定义操作符: M)并行加速

例如对一个二维数组重载+运算符的操作过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct matrix {
double value[12][5];
};

struct matrix add_matrix(struct matrix X, struct matrix Y) {
struct matrix M = {{0.}};
for (int i = 0; i < 12; i++) {
for (int j = 0; j < 5; j++) {
M.value[i][j] = X.value[i][j] + Y.value[i][j];
}
}
return M;
}

#pragma omp declare reduction(p_add_matrix: struct matrix:omp_out=add_matrix(omp_out, omp_in)) initializer( omp_priv={{ 0.}} )
{
#pragma omp parallel for reduction(p_add_matrix: M)
for(...){
....
}
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

struct matrix {
double value[12][5];
};


struct matrix add_matrix(struct matrix X, struct matrix Y) {
struct matrix M = {{0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.}};
for (int i = 0; i < 12; i++) {
for (int j = 0; j < 5; j++) {
M.value[i][j] = X.value[i][j] + Y.value[i][j];
}
}
return M;
}


py::array_t<double> calc_greeks_monitor_detail(
const py::array_t<bool> &is_call,
const py::array_t<double> &S,
const py::array_t<double> &X,
const py::array_t<double> &T,
const py::array_t<double> &b,
const py::array_t<double> &r,
const py::array_t<double> &sigma,
const py::array_t<int> &unit,
const py::array_t<int> &pos,
const py::array_t<int> &month
) {
py::buffer_info buf_is_call = is_call.request();
py::buffer_info buf_spot = S.request();
py::buffer_info buf_strike = X.request();
py::buffer_info buf_time = T.request();
py::buffer_info buf_carry_cost = b.request();
py::buffer_info buf_risk_free = r.request();
py::buffer_info buf_sigma = sigma.request();
py::buffer_info buf_unit = unit.request();
py::buffer_info buf_pos = pos.request();
py::buffer_info buf_month = month.request();

bool *ptr_is_call = static_cast<bool *>(buf_is_call.ptr);
auto *ptr_spot = static_cast<double *>(buf_spot.ptr);
auto *ptr_strike = static_cast<double *>(buf_strike.ptr);
auto *ptr_time = static_cast<double *>(buf_time.ptr);
auto *ptr_carry_cost = static_cast<double *>(buf_carry_cost.ptr);
auto *ptr_risk_free = static_cast<double *>(buf_risk_free.ptr);
auto *ptr_sigma = static_cast<double *>(buf_sigma.ptr);
auto *ptr_unit = static_cast<int *>(buf_unit.ptr);
auto *ptr_pos = static_cast<int *>(buf_pos.ptr);
auto *ptr_month = static_cast<int *>(buf_month.ptr);

// 12个月份+汇总 13行, cash-delta, delta, gamma-percent, theta day-decay, vega-percent
std::vector<ssize_t> shape = {13, 5};
auto result = py::array_t<double>(65);
result.resize({shape[0], shape[1]});
py::buffer_info buffer_result = result.request();
auto *ptr_result = static_cast<double *>(buffer_result.ptr);


#pragma omp declare reduction(p_add_matrix: struct matrix: \
omp_out=add_matrix(omp_out, omp_in)) initializer( \
omp_priv={{ 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0. }} )
struct matrix M = {{0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.}};
{
#pragma omp parallel for reduction(p_add_matrix: M)
for (int idx = 0; idx < buf_spot.shape[0]; idx++) {
const int month_num = ptr_month[idx] - 1;
const int unit_ = ptr_unit[idx];
const int pos_ = ptr_pos[idx];

const double d = flow::model::bjerk02::calc_delta(
ptr_is_call[idx],
ptr_spot[idx],
ptr_strike[idx],
ptr_time[idx],
ptr_carry_cost[idx],
ptr_risk_free[idx],
ptr_sigma[idx]);
const double g = flow::model::bjerk02::calc_gamma(
ptr_is_call[idx],
ptr_spot[idx],
ptr_strike[idx],
ptr_time[idx],
ptr_carry_cost[idx],
ptr_risk_free[idx],
ptr_sigma[idx]);
const double t = flow::model::bjerk02::calc_theta(
ptr_is_call[idx],
ptr_spot[idx],
ptr_strike[idx],
ptr_time[idx],
ptr_carry_cost[idx],
ptr_risk_free[idx],
ptr_sigma[idx]);
const double v = flow::model::bjerk02::calc_vega(
ptr_is_call[idx],
ptr_spot[idx],
ptr_strike[idx],
ptr_time[idx],
ptr_carry_cost[idx],
ptr_risk_free[idx],
ptr_sigma[idx]);

// 月份
M.value[month_num][0] += nan_to_num(ptr_spot[idx] * d * unit_ * pos_);
M.value[month_num][1] += nan_to_num(d * unit_ * pos_);
M.value[month_num][2] += nan_to_num(g * unit_ * pos_);
M.value[month_num][3] += nan_to_num(t * unit_ * pos_);
M.value[month_num][4] += nan_to_num(v * unit_ * pos_);
}
}

ptr_result[12 * shape[1] + 0] = 0.;
ptr_result[12 * shape[1] + 1] = 0.;
ptr_result[12 * shape[1] + 2] = 0.;
ptr_result[12 * shape[1] + 3] = 0.;
ptr_result[12 * shape[1] + 4] = 0.;
for (int idx = 0; idx < 12; idx++) {
ptr_result[idx * shape[1] + 0] = M.value[idx][0];
ptr_result[idx * shape[1] + 1] = M.value[idx][1];
ptr_result[idx * shape[1] + 2] = M.value[idx][2];
ptr_result[idx * shape[1] + 3] = M.value[idx][3];
ptr_result[idx * shape[1] + 4] = M.value[idx][4];

ptr_result[12 * shape[1] + 0] += nan_to_num(M.value[idx][0]);
ptr_result[12 * shape[1] + 1] += nan_to_num(M.value[idx][1]);
ptr_result[12 * shape[1] + 2] += nan_to_num(M.value[idx][2]);
ptr_result[12 * shape[1] + 3] += nan_to_num(M.value[idx][3]);
ptr_result[12 * shape[1] + 4] += nan_to_num(M.value[idx][4]);
}
return result;
}

Pybind11的封装部分

1
2
3
4
5
6
7
8
9
10
11
PYBIND11_MODULE(pybjerk, m) {
m.doc() = " Bjerksund and Stensland (2002) Pricing Formula"; // optional module docstring

m.def("calc_greeks_monitor", &calc_greeks_monitor, "calculate greeks monitor", py::arg("is_call"), py::arg("S"),
py::arg("X"), py::arg("T"), py::arg("b"), py::arg("r"), py::arg("sigma"), py::arg("unit"), py::arg("pos"));

m.def("calc_greeks_monitor_detail", &calc_greeks_monitor_detail, "calculate greeks detail", py::arg("is_call"),
py::arg("S"),
py::arg("X"), py::arg("T"), py::arg("b"), py::arg("r"), py::arg("sigma"), py::arg("unit"), py::arg("pos"),
py::arg("month"));
}