前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >02|时间序列-基于LightGBM预测数字货币收益

02|时间序列-基于LightGBM预测数字货币收益

作者头像
用户4434621
发布2024-09-18 16:42:06
930
发布2024-09-18 16:42:06
举报
文章被收录于专栏:科学最Top
在上一篇文章,我们已经学会对真实市场的BTC、ETH 分钟级别数据,进行读取、蜡烛图可视化、缺失数据填补、相关性分析等基本操作。

那么书接上回,本篇文章我们将使用LightGBM模型对真实的市场收益进行预测,特别说明本文代码来自于Kaggle竞赛的第三名解决方案,我对部分代码进行了解读。

数据处理

需要引入的头文件,文件路径和一些基本参数设置。注意lag表示时间周期,例如以60为周期求平均就是过去60时刻移动平均,代码中时间周期包括[60,300,900]

代码语言:javascript
复制
import os
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import lightgbm as lgb
# import gresearch_crypto
import time
import datetime
import pickle
import gc
from tqdm import tqdm


n_fold = 7
seed0 = 8586
use_supple_for_train = True

# If True, the period used to evaluate Public LB will not be used for training.
# Set to False on final submission.
not_use_overlap_to_train = False

TRAIN_CSV = '量化数据/train.csv'
SUPPLE_TRAIN_CSV = '量化数据/supplemental_train.csv'
ASSET_DETAILS_CSV = '量化数据/asset_details.csv'

pd.set_option('display.max_rows', 6)
pd.set_option('display.max_columns', 350)
lags = [60,300,900]

Light GBM 模型的参数,各参数的含义可参考前面的链接资料。

代码语言:javascript
复制
params = {
    'early_stopping_rounds': 50,
    'objective': 'regression',
    'metric': 'rmse',
#     'metric': 'None',
    'boosting_type': 'gbdt',
    'max_depth': 5,
    'verbose': -1,
    'max_bin':600,
    'min_data_in_leaf':50,
    'learning_rate': 0.03,
    'subsample': 0.7,
    'subsample_freq': 1,
    'feature_fraction': 1,
    'lambda_l1': 0.5,
    'lambda_l2': 2,
    'seed':seed0,
    'feature_fraction_seed': seed0,
    'bagging_fraction_seed': seed0,
    'drop_seed': seed0,
    'data_random_seed': seed0,
    'extra_trees': True,
    'extra_seed': seed0,
    'zero_as_missing': True,
    "first_metric_only": True
         }

内存优化函数,这是一个非常实用的通用函数,Kaggle比赛对存储和运行时间都有限制,使用如下函数对dataframe进行处理后,可大幅度降低存储占用,大家可以收藏起来。

代码语言:javascript
复制
def reduce_mem_usage(df):
    """ iterate through all the columns of a dataframe and modify the data type
        to reduce memory usage.        
    """
    start_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
    for col in df.columns:
        col_type = df[col].dtype
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            df[col] = df[col].astype('category')

    end_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
    return df

接下来我们使用pandas库读取train.csv,supplemental_train.csv和asset_details.csv三个文件,前两个是同样的数据结构只是数据周期不一样,而asset_details.csv则包含不同货币的信息和权重。两个表的内容结构在上篇文章已经解释过,大家也可以自己读取并查看。

关于target

上图中所有的特征你应该都很熟悉,但是target是怎么计算的呢。请看以下两个公式,R(t)是Target 是在 15 分钟对数收益率。因为我们上一篇的分析得出,不同货币的变化实际是具有相关性的,target(t)减去的一项数值就是为了引入这种相关性。M(t)是使用货币对的权重对14种货币对加权结果,beta公式里的尖括号 ⟨.⟩表示滚动平均(窗口为 3750 分钟)。当然,target是已经计算好的,不过还是建议理解其计算过程。

下面的代码读取了数据集,并使用concat函数对train和supplement文件进行了上下拼接。然后使用前面定义的reduce_mem_usage()函数降低内存,可以看到效果还是很明显,存储空间减少超过50%。

代码语言:javascript
复制
df_train = pd.read_csv(TRAIN_CSV, usecols=['timestamp','Asset_ID', 'Close', 'Target'])
if use_supple_for_train:        df_supple = pd.read_csv(SUPPLE_TRAIN_CSV, usecols=['timestamp','Asset_ID', 'Close', 'Target'])#     display(df_supple)    df_train = pd.concat([df_train, df_supple])    del df_suppledf_train = reduce_mem_usage(df_train)# df_train

由于原始数据14种货币对是乱序的,前一行可能是BTC在a时刻的数据,下一行则是ETH在B时刻的数据,我们期望进行处理,使其每一行(一行相当于一个时间点)包含所有14种货币对的特征数据。于是,下面的代码定义了空train_merged,然后使用merge函数,每次在train_merged并入一种货币对的['timestamp', 'Close','Target']信息。参数suffixes是为了给不同货币对的变量进行区分。如果对merge用法不熟悉,可参考link。

代码语言:javascript
复制
%%time
train_merged = pd.DataFrame()
train_merged[df_train.columns] = 0
for id in tqdm( range(14) ):
    train_merged = train_merged.merge(df_train.loc[df_train["Asset_ID"] == id, ['timestamp', 'Close','Target']].copy(), on="timestamp", how='outer',suffixes=['', "_"+str(id)])
        
train_merged = train_merged.drop(df_train.columns.drop("timestamp"), axis=1)
train_merged = train_merged.sort_values('timestamp', ascending=True)
display(train_merged.head())

这样,我们在使用排序函数,按时间戳升序排列,就得到了如下结构的数据。每一行(时间点)包含14种货币对的收盘价和Targets数值。

但如你所见,有些货币对在一些时刻是空的,这非常常见,上一篇文章我们也统计分析过。为了填补这些空值,我们使用fillna()函数进行填补,注意由于参数limit限制了最大填补数量,所以最终还是会有些空值。实际上填补太多数值反而会影响预测结果。

代码语言:javascript
复制
# forward fill# Set an upper limit on the number of fills, since there may be long term gaps.for id in range(14):#     print(id, train_merged[f'Close_{id}'].isnull().sum())   # Number of missing before forward fill    train_merged[f'Close_{id}'] = train_merged[f'Close_{id}'].fillna(method='ffill', limit=100)#     print(id, train_merged[f'Close_{id}'].isnull().sum())   # Number of missing after forward fill

特征工程

深度学习调优通常以设计改进模型结构为主,但LightGBM核心则在于特征工程。以下是本文的核心特征工程部分。

代码语言:javascript
复制
def get_features(df, train=True):   
    if train == True:
        totimestamp = lambda s: np.int32(time.mktime(datetime.datetime.strptime(s, "%d/%m/%Y").timetuple()))
        valid_window = [totimestamp("12/03/2021")]
#         valid_window = [totimestamp("15/08/2021")]  #検証用
        df['train_flg'] = np.where(df['timestamp']>=valid_window[0], 0,1)

        supple_start_window = [totimestamp("22/09/2021")]
        if use_supple_for_train:
            df['train_flg'] = np.where(df['timestamp']>=supple_start_window[0], 1 ,df['train_flg']  )

   
    for id in range(14):    
        for lag in lags:
            df[f'log_close/mean_{lag}_id{id}'] = np.log( np.array(df[f'Close_{id}']) /  np.roll(np.append(np.convolve( np.array(df[f'Close_{id}']), np.ones(lag)/lag, mode="valid"), np.ones(lag-1)), lag-1)  )
            df[f'log_return_{lag}_id{id}']     = np.log( np.array(df[f'Close_{id}']) /  np.roll(np.array(df[f'Close_{id}']), lag)  )
    for lag in lags:
        df[f'mean_close/mean_{lag}'] =  np.mean(df.iloc[:,df.columns.str.startswith(f'log_close/mean_{lag}_id')], axis=1)
        df[f'mean_log_returns_{lag}'] = np.mean(df.iloc[:,df.columns.str.startswith(f'log_return_{lag}_id')] ,    axis=1)
        for id in range(14):
            df[f'log_close/mean_{lag}-mean_close/mean_{lag}_id{id}'] = np.array( df[f'log_close/mean_{lag}_id{id}']) - np.array( df[f'mean_close/mean_{lag}']  )
            df[f'log_return_{lag}-mean_log_returns_{lag}_id{id}']    = np.array( df[f'log_return_{lag}_id{id}'])     - np.array( df[f'mean_log_returns_{lag}'] )

    if train == True:
        for id in range(14):
            df = df.drop([f'Close_{id}'], axis=1)
        oldest_use_window = [totimestamp("12/01/2019")]
        df = df[  df['timestamp'] >= oldest_use_window[0]   ]

    return df

文中设计了三种特征,如下所示:

  • 收盘价和过去N分钟的平均收盘价之比F1
  • 收盘价和N分钟之前的收盘价之比F2
  • 使用加权系数计算所有货币F1、F2的平均值得到F1_mean、F2_mean
  • 将F1、F2和F1_mean、F2_mean作差得到F1_residual、F2_residual
代码语言:javascript
复制
np.roll(np.append(np.convolve( np.array(df[f'Close_{id}']), np.ones(lag)/lag, mode="valid"), np.ones(lag-1)), lag-1)  )

上面这个计算可能比较绕,单独讲一下,np.convolve()求的是周期为lag的移动平均,很显然,前lag-1个数据是不够求移动平均的,所以使用np.append()在序列最后补上了lag-1个‘1’,然后使用np.roll()把序列整体右移lag-1位,这样用np.array(df[f'Close_{id}']) 除以刚刚的到的结果,就是当前收盘价和过去N分钟的平均收盘价之比,理解了这个其他也就好理解了。

获取特征

我们传入的合并好的train_merged,使用刚刚的get_features()函数获取特征,记为feat

代码语言:javascript
复制
%%time
feat = get_features(train_merged)
feat

我们之后训练用到的特征就是刚刚通过get_feature求的,其他的target、timestamp等统统不需要,这里做了一个标记。

另外非常重要的一点:我们通常理解的预测是用前一个时刻预测当前时刻,但是在这个方案中,“前一个时刻”实际是前一段时间(60,300,900)的F1、F2、F1_mean、F2_mean、F1_residual、F2_residual,并且包含多个货币对,也就是用多个货币在过去的特征,预测本货币对的target。

代码语言:javascript
复制
# define features for LGBM
not_use_features_train = ['timestamp', 'train_flg']
for id in range(14):
    not_use_features_train.append(f'Target_{id}')

features = feat.columns 
features = features.drop(not_use_features_train)
features = list(features)
# display(features)  
print(len(features))

del train_merged
del df_train
gc.collect()

模型训练

下面是评估用的函数和显示特征重要性的绘图函数,我觉得知道怎么用就行,具体怎么实现不必太关注。

代码语言:javascript
复制
# define the evaluation metric
def correlation(a, train_data):
    
    b = train_data.get_label()
    
    a = np.ravel(a)
    b = np.ravel(b)

    len_data = len(a)
    mean_a = np.sum(a) / len_data
    mean_b = np.sum(b) / len_data
    var_a = np.sum(np.square(a - mean_a)) / len_data
    var_b = np.sum(np.square(b - mean_b)) / len_data

    cov = np.sum((a * b))/len_data - mean_a*mean_b
    corr = cov / np.sqrt(var_a * var_b)

    return 'corr', corr, True

# For CV score calculation
def corr_score(pred, valid):
    len_data = len(pred)
    mean_pred = np.sum(pred) / len_data
    mean_valid = np.sum(valid) / len_data
    var_pred = np.sum(np.square(pred - mean_pred)) / len_data
    var_valid = np.sum(np.square(valid - mean_valid)) / len_data

    cov = np.sum((pred * valid))/len_data - mean_pred*mean_valid
    corr = cov / np.sqrt(var_pred * var_valid)

    return corr

# For CV score calculation
def wcorr_score(pred, valid, weight):
    len_data = len(pred)
    sum_w = np.sum(weight)
    mean_pred = np.sum(pred * weight) / sum_w
    mean_valid = np.sum(valid * weight) / sum_w
    var_pred = np.sum(weight * np.square(pred - mean_pred)) / sum_w
    var_valid = np.sum(weight * np.square(valid - mean_valid)) / sum_w

    cov = np.sum((pred * valid * weight)) / sum_w - mean_pred*mean_valid
    corr = cov / np.sqrt(var_pred * var_valid)

    return corr

# from: https://blog.amedama.jp/entry/lightgbm-cv-feature-importance
# (used in nyanp's Optiver solution)
def plot_importance(importances, features_names = features, PLOT_TOP_N = 20, figsize=(10, 10)):
    importance_df = pd.DataFrame(data=importances, columns=features)
    sorted_indices = importance_df.median(axis=0).sort_values(ascending=False).index
    sorted_importance_df = importance_df.loc[:, sorted_indices]
    plot_cols = sorted_importance_df.columns[:PLOT_TOP_N]
    _, ax = plt.subplots(figsize=figsize)
    ax.grid()
    ax.set_xscale('log')
    ax.set_ylabel('Feature')
    ax.set_xlabel('Importance')
    sns.boxplot(data=sorted_importance_df[plot_cols],
                orient='h',
                ax=ax)
    plt.show()

这是划分数据的函数,7折交叉验证。大部分人应该能看懂代码,但我提醒一点embargo = 3750,是因为在计算target的时候,用到了过去3750的移动平均,为了避免未来信息的泄漏,所以代码中在切分数据集的时候,至少要间隔3750以上。

代码语言:javascript
复制
# from: https://www.kaggle.com/code/nrcjea001/lgbm-embargocv-weightedpearson-lagtarget/
def get_time_series_cross_val_splits(data, cv = n_fold, embargo = 3750):
    all_train_timestamps = data['timestamp'].unique()
    len_split = len(all_train_timestamps) // cv
    test_splits = [all_train_timestamps[i * len_split:(i + 1) * len_split] for i in range(cv)]
    # fix the last test split to have all the last timestamps, in case the number of timestamps wasn't divisible by cv
    rem = len(all_train_timestamps) - len_split*cv
    if rem>0:
        test_splits[-1] = np.append(test_splits[-1], all_train_timestamps[-rem:])

    train_splits = []
    for test_split in test_splits:
        test_split_max = int(np.max(test_split))
        test_split_min = int(np.min(test_split))
        # get all of the timestamps that aren't in the test split
        train_split_not_embargoed = [e for e in all_train_timestamps if not (test_split_min <= int(e) <= test_split_max)]
        # embargo the train split so we have no leakage. Note timestamps are expressed in seconds, so multiply by 60
        embargo_sec = 60*embargo
        train_split = [e for e in train_split_not_embargoed if
                       abs(int(e) - test_split_max) > embargo_sec and abs(int(e) - test_split_min) > embargo_sec]
        train_splits.append(train_split)

    # convenient way to iterate over train and test splits
    train_test_zip = zip(train_splits, test_splits)
    return train_test_zip

加油,马上到训练的函数了。如你所见,当我们把特征定义好后,使用lightGBM非常简单,get_Xy_and_model_for_asset()函数最终返回的是真实的target和对应的预测target。提醒一下,train_split, test_split存储的都是时间戳,我们使用df_proc.loc[]函数,配合之前定义的features,就可以切分数据集了。

代码语言:javascript
复制
def get_Xy_and_model_for_asset(df_proc, asset_id):
    df_proc = df_proc.loc[  (df_proc[f'Target_{asset_id}'] == df_proc[f'Target_{asset_id}'])  ]
    if not_use_overlap_to_train:
        df_proc = df_proc.loc[  (df_proc['train_flg'] == 1)  ]
    
# EmbargoCV
    train_test_zip = get_time_series_cross_val_splits(df_proc, cv = n_fold, embargo = 3750)
    print("entering time series cross validation loop")
    importances = []
    oof_pred = []
    oof_valid = []
    
    for split, train_test_split in enumerate(train_test_zip):
        gc.collect()
        
        print(f"doing split {split+1} out of {n_fold}")
        train_split, test_split = train_test_split
        train_split_index = df_proc['timestamp'].isin(train_split)
        test_split_index = df_proc['timestamp'].isin(test_split)
    
        train_dataset = lgb.Dataset(df_proc.loc[train_split_index, features],
                                    df_proc.loc[train_split_index, f'Target_{asset_id}'].values, 
                                    feature_name = features, 
                                   )
        val_dataset = lgb.Dataset(df_proc.loc[test_split_index, features], 
                                  df_proc.loc[test_split_index, f'Target_{asset_id}'].values, 
                                  feature_name = features, 
                                 )

        print(f"number of train data: {len(df_proc.loc[train_split_index])}")
        print(f"number of val data:   {len(df_proc.loc[test_split_index])}")

        model = lgb.train(params = params,
                          train_set = train_dataset, 
                          valid_sets=[train_dataset, val_dataset],
                          valid_names=['tr', 'vl'],
                          num_boost_round = 5000,
                          verbose_eval = 100,     
                          feval = correlation,
                         )
        importances.append(model.feature_importance(importance_type='gain'))
        
        file = f'trained_model_id{asset_id}_fold{split}.pkl'
        pickle.dump(model, open(file, 'wb'))
        print(f"Trained model was saved to 'trained_model_id{asset_id}_fold{split}.pkl'")
        print("")
            
        oof_pred += list(  model.predict(df_proc.loc[test_split_index, features])        )
        oof_valid += list(   df_proc.loc[test_split_index, f'Target_{asset_id}'].values    )
    
    
    plot_importance(np.array(importances),features, PLOT_TOP_N = 20, figsize=(10, 5))

    return oof_pred, oof_valid

恭喜你,运行到下面的代码,不出意外的话已经开始训练了

代码语言:javascript
复制
oof = [ [] for id in range(14)   ]

all_oof_pred = []
all_oof_valid = []
all_oof_weight = []

for asset_id, asset_name in zip(df_asset_details['Asset_ID'], df_asset_details['Asset_Name']):
    print(f"Training model for {asset_name:<16} (ID={asset_id:<2})")
    
    oof_pred, oof_valid = get_Xy_and_model_for_asset(feat, asset_id)
    
    weight_temp = float( df_asset_details.loc[  df_asset_details['Asset_ID'] == asset_id  , 'Weight'   ]  )
    
    all_oof_pred += oof_pred
    all_oof_valid += oof_valid
    all_oof_weight += [weight_temp] * len(oof_pred)
    
    oof[asset_id] = corr_score(     np.array(oof_pred)   ,    np.array(oof_valid)    )
    
    print(f'OOF corr score of {asset_name} (ID={asset_id}) is {oof[asset_id]:.5f}. (Weight: {float(weight_temp):.5f})')
    print('')
    print('')

应该能得到下面的训练结果:

训练结束,你会保存一系列的模型文件,通过pickle.load()你可以载入这些模型,然后调用model.predict使用你训练好的模型。快去试一下吧~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-11-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 科学最Top 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据处理
  • 关于target
  • 特征工程
  • 深度学习调优通常以设计改进模型结构为主,但LightGBM核心则在于特征工程。以下是本文的核心特征工程部分。
  • 获取特征
  • 模型训练
相关产品与服务
灰盒安全测试
腾讯知识图谱(Tencent Knowledge Graph,TKG)是一个集成图数据库、图计算引擎和图可视化分析的一站式平台。支持抽取和融合异构数据,支持千亿级节点关系的存储和计算,支持规则匹配、机器学习、图嵌入等图数据挖掘算法,拥有丰富的图数据渲染和展现的可视化方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档