0%

ClickHouse同步数据

记一次 ClickHouse 数据迁移, https://zhuanlan.zhihu.com/p/220172155

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Project : tset
# @File : task.py
# @Author : Gavin
# @Time : 2022/9/23 8:15
# @Desc : clickhouse数据同步脚本
# @Url : https://zhuanlan.zhihu.com/p/220172155

import collections
import datetime
import functools
import logging
import time

from clickhouse_driver import Client
from logger import init_logger

# todo:修改ip地址
source_conn = Client(host='', port='9000', user='default', password='default')
target_conn = Client(host='', port='9000', user='default', password='default')


def format_partition_expr(p):
if isinstance(p, int):
return p
return f"'{p}'"


def execute_queries(conn, queries):
if isinstance(queries, str):
queries = queries.split(';')
for q in queries:
conn.execute(q.strip())


class Table(object):
def __init__(self, database, name, ddl, partition_key, is_view):
self.database = database
self.name = name
self.ddl = ddl.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
self.partition_key = partition_key
self.is_view = is_view

def exists(self, conn):
q = f"SELECT name FROM system.tables WHERE database = '{self.database}' AND name = '{self.name}'"
return len(conn.execute(q)) > 0

def get_partitions(self, conn):
partitions = []
q = f'SELECT {self.partition_key}, count() FROM {self.identity} GROUP BY {self.partition_key} ORDER BY {self.partition_key}'
partitions = collections.OrderedDict(conn.execute(q))
return partitions

def get_total_count(self, conn):
q = f'SELECT COUNT() FROM {self.identity}'
return conn.execute(q)[0][0]

def check_consistency(self):
if not self.exists(target_conn):
return False, None

source_ttl_count = self.get_total_count(source_conn)
target_ttl_count = self.get_total_count(target_conn)
if source_ttl_count == target_ttl_count:
return True, None

if not self.partition_key:
return False, None

source_partitions = self.get_partitions(source_conn)
target_partitions = self.get_partitions(target_conn)
bug_partitions = []
for p, c in source_partitions.items():
if p not in target_partitions or c != target_partitions[p]:
bug_partitions.append(p)
return False, bug_partitions

def create(self, replace=False):
print(self.database)
target_conn.execute(f'CREATE DATABASE IF NOT EXISTS {self.database}')
if self.is_view:
replace = True
if replace:
target_conn.execute(f'DROP TABLE IF EXISTS {self.identity}')
target_conn.execute(self.ddl)

def copy_data_from_remote(self, by_partition=True):
self.create()
if self.is_view:
logging.info('ignore view %s', self.identity)
return

is_identical, bug_partitions = self.check_consistency()
if is_identical:
logging.info('table %s has the same number of rows, skip', self.identity)
return

if self.partition_key and by_partition:
for p in bug_partitions:
logging.info('copy partition %s=%s', self.partition_key, p)
self._copy_partition_from_remote(p)
else:
self._copy_table_from_remote()

def _copy_table_from_remote(self):
queries = f'''
DROP TABLE {self.identity};
{self.ddl};
INSERT INTO {self.identity}
SELECT * FROM remote('{source_conn.connection.host}', {self.identity}, '{source_conn.connection.user}', '{source_conn.connection.password}')
'''
execute_queries(target_conn, queries)

def _copy_partition_from_remote(self, partition):
partition = format_partition_expr(partition)
queries = f'''
ALTER TABLE {self.identity} DROP PARTITION {partition};
INSERT INTO {self.identity}
SELECT * FROM remote('{source_conn.host}', {self.identity}, '{source_conn.user}', '{source_conn.password}')
WHERE {self.partition_key} = {partition}
'''
execute_queries(target_conn, queries)

@property
def identity(self):
return f'{self.database}.{self.name}'

def __str__(self):
return self.identity

__repr__ = __str__


def get_all_tables() -> [Table]:
# 查询出所有用户的数据库和表,包括视图。视图依赖其他表,所以放到最后。
q = '''
SELECT database, name, create_table_query, partition_key, engine = 'View' AS is_view
FROM system.tables
WHERE database NOT IN ('system')
ORDER BY if(engine = 'View', 999, 0), database, name
'''
rows = source_conn.execute(q)
tables = [Table(*values) for values in rows]
return tables


def copy_remote_tables(tables):
for idx, t in enumerate(tables):
start_time = datetime.datetime.now()
logging.info('>>>> start to migrate table %s, progress %s/%s', t.identity, idx + 1, len(tables))
t.copy_data_from_remote()
logging.info('<<<< migrated table %s in %s', t.identity, datetime.datetime.now() - start_time)


def with_retry(max_attempts=5, backoff=120):
def decorator(f):
@functools.wraps(f)
def inner(*args, **kwargs):
attempts = 0
while True:
attempts += 1
logging.info('start attempt #%s', attempts)
try:
f(*args, **kwargs)
except Exception as e:
if attempts >= max_attempts:
raise e
logging.exception('caught exception')
time.sleep(backoff)
else:
break

return inner

return decorator


@with_retry(max_attempts=10, backoff=60)
def copy_tick_data():
tables = get_all_tables()

# 商品期权tick数据,不同步buffer缓存内存数据表
ctp_tick_commodity_option_list = [i for i in tables if "ctp_tick_commodity_option_" in i.name]
ctp_tick_commodity_option_list = [i for i in ctp_tick_commodity_option_list if
"buffer_ctp_tick_commodity_option_" not in i.name]

# 商品期货tick数据,不同步buffer缓存内存数据表
ctp_tick_commodity_future_list = [i for i in tables if "ctp_tick_commodity_future_" in i.name]
ctp_tick_commodity_future_list = [i for i in ctp_tick_commodity_future_list if
"buffer_ctp_tick_commodity_future_" not in i.name]

# ctp_tick_etf_option_这些数据是最早在mongodb收录的tick数据
ctp_tick_etf_option_list = [i for i in tables if "ctp_tick_etf_option_" in i.name]
ctp_tick_etf_option_list = [i for i in ctp_tick_etf_option_list if
"buffer" not in i.name]

# ctp_tick_stock_etf_option 上交所 深交所 股票ETF期权tick数据
ctp_tick_stock_etf_option_list = [i for i in tables if "ctp_tick_stock_etf_option_" in i.name]
ctp_tick_stock_etf_option_list = [i for i in ctp_tick_stock_etf_option_list if
"buffer" not in i.name]

# 中金所tick数据, 中金所股指期货数据单独的收录,不与上期所等商品交易所商品品种合并
ctp_tick_cffex_list = [i for i in tables if "ctp_tick_cffex_day_" in i.name]
ctp_tick_cffex_list = [i for i in ctp_tick_cffex_list if "buffer" not in i.name]

# 上交所、深交所股票期权level1数据
ctp_stock_list = [i for i in tables if "tick_stock_" in i.name]
ctp_stock_list = [i for i in ctp_stock_list if "buffer" not in i.name]

tick_list = ctp_tick_commodity_option_list + ctp_tick_commodity_future_list + ctp_tick_etf_option_list + ctp_tick_stock_etf_option_list + ctp_tick_cffex_list + ctp_stock_list
tick_list = list(set(tick_list))
tick_list.sort()
del tables

logging.info('got %d tables: %s', len(tick_list), tick_list)

copy_remote_tables(tick_list)


if __name__ == '__main__':
init_logger("clickhouse_copy.log")
logging.info("run copy task")
copy_tick_data()