南寧做網(wǎng)站外包/品牌宣傳策略有哪些
1.問題背景描述
python項目中的時序數(shù)據(jù)都存放在TD數(shù)據(jù)庫中,數(shù)據(jù)是秒級存入的,當查詢一周數(shù)據(jù)時將超過50w數(shù)據(jù)量,這是一次性獲取全量數(shù)據(jù)到python程序很慢,全流程10秒以上,希望進行優(yōu)化加速
2.排查
首先,分步排查從td取超過7秒,在python程序中處理格式超過3秒;
其次,業(yè)務邏輯處理步驟中,時間大量消耗的邏輯是 時間對象轉成字符串;
再次,td取數(shù)步驟中,時間大量消耗的邏輯是 獲取到數(shù)據(jù)后時間戳轉為時間對象
最后,思路確定為,從td獲取的ts字段直接按bigint返回,交由業(yè)務邏輯處理,直接從bigint轉成字符串;
3.查源碼嘗試修改
TD取的ts字段是用bigint存的(C_TIMESTAMP類型),排查源碼發(fā)現(xiàn)公共包中會將C_TIMESTAMP類型的字段都轉成datetime對象返回,而轉化方法_convert_millisecond_to_datetime就是慢的根源;繼續(xù)查相關源碼,若想用bigint返回,發(fā)現(xiàn)CONVERT_FUNC_BLOCK這個函數(shù)工廠,key是每個字段類型fields[i][“type”],這個字段類型是在taos_fetch_fields
# TDHelper().db_query(sql) 調(diào)用入口
class TDHelper: # 自定義的TD適配邏輯def db_query(self, sql, return_timestamp=False):with self.cursor() as c:return self._query_handler(c, sql, return_timestamp)def _query_handler(self, cursor, sql, return_timestamp=False):try:cursor.execute(sql) # execute中會獲取_fields屬性,由決定后續(xù)字段序列化的邏輯# cursor._fields[0]._type = 5 # 修改,測試用result = cursor.fetchall()if not return_timestamp:return resultelse:ret_result = []for one in result:ret_result.append((one[0].timestamp(), *one[1:]))return ret_resultexcept ProgrammingError as e:if e.msg == 'Fail to get table info, error: Table does not exist':# 只輸出sql,不需要輸出異常信息logger.warning('Table does not exist, sql [{}]'.format(sql))return []raise e# 如下都是TD公共包中的源碼def fetchall(self): # cursor的方法if self._result is None:raise OperationalError("Invalid use of fetchall")fields = self._fields if self._fields is not None else taos_fetch_fields(self._result)buffer = [[] for i in range(len(fields))]self._rowcount = 0while True:block, num_of_rows = taos_fetch_block(self._result, self._fields) # 關鍵邏輯errno = taos_errno(self._result)if errno != 0:raise ProgrammingError(taos_errstr(self._result), errno)if num_of_rows == 0:breakself._rowcount += num_of_rowsfor i in range(len(self._fields)):buffer[i].extend(block[i])return list(map(tuple, zip(*buffer)))def taos_fetch_block(result, fields=None, field_count=None):if fields is None:fields = taos_fetch_fields(result)if field_count is None:field_count = taos_field_count(result)pblock = ctypes.c_void_p(0)num_of_rows = _libtaos.taos_fetch_block(result, ctypes.byref(pblock))if num_of_rows == 0:return None, 0precision = taos_result_precision(result)blocks = [None] * field_countfor i in range(len(fields)):data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]if fields[i]["type"] not in CONVERT_FUNC_BLOCK_v3 and fields[i]["type"] not in CONVERT_FUNC_BLOCK:raise DatabaseError("Invalid data type returned from database")offsets = []is_null = []if fields[i]["type"] in (FieldType.C_VARCHAR, FieldType.C_NCHAR, FieldType.C_JSON):offsets = taos_get_column_data_offset(result, i, num_of_rows)blocks[i] = CONVERT_FUNC_BLOCK_v3[fields[i]["type"]](data, is_null, num_of_rows, offsets, precision)else:is_null = [taos_is_null(result, j, i) for j in range(num_of_rows)]# 關鍵邏輯blocks[i] = CONVERT_FUNC_BLOCK[fields[i]["type"]](data, is_null, num_of_rows, offsets, precision)return blocks, abs(num_of_rows)CONVERT_FUNC_BLOCK = {FieldType.C_BOOL: _crow_bool_to_python,FieldType.C_TINYINT: _crow_tinyint_to_python,FieldType.C_SMALLINT: _crow_smallint_to_python,FieldType.C_INT: _crow_int_to_python,FieldType.C_BIGINT: _crow_bigint_to_python,FieldType.C_FLOAT: _crow_float_to_python,FieldType.C_DOUBLE: _crow_double_to_python,FieldType.C_BINARY: _crow_binary_to_python_block,FieldType.C_TIMESTAMP: _crow_timestamp_to_python, # 關鍵邏輯FieldType.C_NCHAR: _crow_nchar_to_python_block,FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python,FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,FieldType.C_JSON: _crow_nchar_to_python_block,
}def _crow_timestamp_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):"""Function to convert C bool row to python row."""_timestamp_converter = _convert_millisecond_to_datetime # 關鍵邏輯if precision == FieldType.C_TIMESTAMP_MILLI:_timestamp_converter = _convert_millisecond_to_datetimeelif precision == FieldType.C_TIMESTAMP_MICRO:_timestamp_converter = _convert_microsecond_to_datetimeelif precision == FieldType.C_TIMESTAMP_NANO:_timestamp_converter = _convert_nanosecond_to_datetimeelse:raise DatabaseError("Unknown precision returned from database")return [None if is_null[i] else _timestamp_converter(ele)for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)])]def _convert_millisecond_to_datetime(milli):try:if _priv_tz is None:return _datetime_epoch + timedelta(seconds=milli / 1000.0)return (_utc_datetime_epoch + timedelta(seconds=milli / 1000.0)).astimezone(_priv_tz) # 萬惡之源except OverflowError:# catch OverflowError and passprint("WARN: datetime overflow!")pass
4. 最終修改和效果
修改ts字段的類型從C_TIMESTAMP改為C_BIGINT,相關邏輯如下,
參數(shù)說明:cursor 即TDHelper().cursor()獲得,sql = ‘select ts,val from table_1 ORDER BY ts desc limit 1’
最后效果,50w數(shù)據(jù)從10s優(yōu)化到3s
def test(cursor, sql)try:cursor.execute(sql)# ts字段的類型修改為bigintif cursor._fields[0]._type == FieldType.C_TIMESTAMP:cursor._fields[0]._type = FieldType.C_BIGINTreturn cursor.fetchall()except ProgrammingError as e:if e.msg == 'Fail to get table info, error: Table does not exist':# 只輸出sql,不需要輸出異常信息logger.warning('Table does not exist, sql [{}]'.format(sql))return []raise e