Open Source, Open Future!
  menu
118 文章
ღゝ◡╹)ノ❤️

第3篇:特征工程 - 从原始价格到39技术指标

上一篇: 第2篇:数据获取与清洗

一、写在前面

上一篇解决了数据获取的问题,拿到了沪深300 + 中证500共800只股票、10年历史的干净行情数据,共158万条记录,存在 Parquet 格式文件里。

数据有了,问题来了:机器学习模型不能直接从"1829元"这个价格数字学到规律。
10元的股票涨1元是10%收益,1000元的股票涨1元只有0.1%收益——绝对价格没意义。

所以这篇要做的事情:把原始价格转换成机器学习模型能理解的"特征"。

这篇是整个系列里我花时间最多的一篇。业界有句话说得好:"数据和特征决定了机器学习的上限,而模型和算法只是逼近这个上限而已"。

先交代下数据规模:

来源:     data/stock_data.parquet
股票数:   797只(沪深300 + 中证500)
总行数:   1,589,404 条
日期范围: 2015-01-05 ~ 2024-12-31

二、什么是特征工程

先说说什么是特征。

我们拿到的原始数据类似这样:

日期: 2022-01-05
开盘价: 1850.08元
收盘价: 1829.08元
最高价: 1870.08元
最低价: 1823.08元
成交量: 28396

但机器学习模型不能直接从"1829元"这个数字学到规律。

所以要把原始价格转换成有意义的"特征":

return_1d: -1.47%(今天比昨天跌了1.47%)
ma_20: 1800元(20日均线)
close_to_ma_20: 1.6%(当前价格比均线高1.6%)
rsi: 65(相对强弱指标,超过70就超买)
macd: 0.5(趋势指标,大于0当前处于多头趋势,看涨)

这些才是模型能理解的信息。

用人话解释

特征工程就是把原始数据"翻译"成模型能懂的语言。

类比一下:

  • 做菜时,不是直接把生肉扔锅里,而是要切片、腌制、调味
  • 写代码时,不是直接把需求扔给编译器,而是要拆解成函数、类、模块

特征工程就是这个"预处理"的过程。

三、7大类特征详解

我把特征分成了7大类,每类都有不同的作用。

3.1 收益率特征 - 最基础但最重要

这是最基础的特征,也是我最先实现的。

def add_returns(self, df, periods=[1, 5, 10, 20]):
    """添加收益率特征"""
    df = df.copy()

    for period in periods:
        df[f'return_{period}d'] = df.groupby('symbol')['close'].pct_change(period)
        self.feature_names.append(f'return_{period}d')

    return df

原理

pct_change(n) 计算的是:

return_n = (今天价格 / n天前价格) - 1

比如:

  • 今天100元,昨天95元
  • return_1d = 100/95 - 1 = 5.26%

为什么有用

绝对价格意义不大,涨跌幅才反映趋势。

举个例子:茅台1000元涨到1100元(涨10%)和五粮液100元涨到110元(涨10%),虽然绝对涨幅差了10倍,但相对涨幅是一样的。模型更容易从相对涨幅中学到规律。

为什么要多个周期

不同周期反映不同时间尺度的趋势:

  • return_1d:短期波动
  • return_5d:一周趋势
  • return_10d:两周趋势
  • return_20d:一个月趋势

有的股票短期涨长期跌,有的短期跌长期涨,需要综合判断。

关键点:groupby('symbol')

这行代码非常重要:

df.groupby('symbol')['close'].pct_change(period)

为什么要 groupby?

因为数据是多只股票混在一起的:

茅台 1月1日  1000元
茅台 1月2日  1050元
五粮液 1月1日  200元  <- 如果不groupby,这里会用茅台1月2日的数据计算!

加上groupby之后,每只股票单独计算,不会串。

这是我最开始踩的坑,后面"踩过的坑"章节会详细说。

3.2 移动平均特征 - 判断趋势的利器

均线是技术分析里最常用的指标。

def add_moving_average(self, df, windows=[5, 10, 20, 60]):
    """添加移动平均线特征"""
    df = df.copy()

    for window in windows:
        # 简单移动平均
        df[f'ma_{window}'] = df.groupby('symbol')['close'].transform(
            lambda x: x.rolling(window).mean()
        )

        # 当前价格相对MA的位置
        df[f'close_to_ma_{window}'] = df['close'] / df[f'ma_{window}'] - 1

        self.feature_names.extend([f'ma_{window}', f'close_to_ma_{window}'])

    return df

原理

移动平均就是过去N天价格的平均值:

MA_20 = (过去20天价格之和) / 20

为什么有用

均线能消除短期噪声,看清趋势:

原始价格: 100, 102, 98, 103, 97, 105, 99, ...  <- 波动很大
MA_5:     100, 100, 100.6, 100.8, 101, ...   <- 平滑很多

而且均线有个重要性质:

  • 当前价格 > 均线 → 上涨趋势
  • 当前价格 < 均线 → 下跌趋势

close_to_ma的作用

不仅要看均线本身,还要看当前价格离均线有多远:

close_to_ma_20 = df['close'] / df['ma_20'] - 1

比如:

  • 当前价100元,MA_20是95元
  • close_to_ma_20 = 100/95 - 1 = 5.26%

这个5.26%说明当前价格比均线高,而且高出5%。如果这个值太大(比如超过10%),可能是超买,要警惕回调。

为什么选这4个窗口

  • MA_5:一周均线,反映短期趋势
  • MA_10:两周均线
  • MA_20:一个月均线,最常用
  • MA_60:三个月均线,反映长期趋势

这4个窗口覆盖了短中长期,是技术分析的标配。

3.3 波动率特征 - 衡量风险

波动率衡量价格波动的剧烈程度,这是风险的直接体现。

def add_volatility(self, df, windows=[5, 20, 60]):
    """添加波动率特征"""
    df = df.copy()

    for window in windows:
        # 收益率波动率
        df[f'volatility_{window}d'] = df.groupby('symbol')['close'].transform(
            lambda x: x.pct_change().rolling(window).std()
        )

        # 价格范围波动率(ATR简化版)
        df['hl_range'] = (df['high'] - df['low']) / df['close']
        df[f'atr_{window}d'] = df.groupby('symbol')['hl_range'].transform(
            lambda x: x.rolling(window).mean()
        )

        self.feature_names.extend([f'volatility_{window}d', f'atr_{window}d'])

    df = df.drop('hl_range', axis=1)

    return df

原理

波动率就是收益率的标准差:

volatility_20 = std(过去20天的收益率)

标准差越大,价格波动越剧烈。

为什么有用

波动率反映风险:

  • 波动率高:风险大,但可能收益也大
  • 波动率低:稳定,但收益可能也小

而且波动率本身有周期性。高波动后往往接低波动,低波动后可能突然爆发高波动。这是可以预测的规律。

ATR是什么

ATR(Average True Range)是另一种衡量波动的方式:

hl_range = (high - low) / close
atr = mean(hl_range)

它看的是每天最高价和最低价的差距。这个比纯粹的收益率波动更能反映日内波动。

比如某只股票:

  • 开盘100元,收盘100元(收益率0%)
  • 但日内最高涨到110元,最低跌到90元

收益率波动看不出来,但ATR能捕捉到这种日内波动。

3.4 动量指标 - 判断超买超卖

动量指标用来判断趋势强弱和反转点。

RSI(相对强弱指标)

# RSI计算
df['price_change'] = df.groupby('symbol')['close'].diff()
df['gain'] = df['price_change'].apply(lambda x: x if x > 0 else 0)
df['loss'] = df['price_change'].apply(lambda x: abs(x) if x < 0 else 0)

df['avg_gain'] = df.groupby('symbol')['gain'].transform(
    lambda x: x.rolling(14).mean()
)
df['avg_loss'] = df.groupby('symbol')['loss'].transform(
    lambda x: x.rolling(14).mean()
)

df['rs'] = df['avg_gain'] / (df['avg_loss'] + 1e-10)
df['rsi'] = 100 - (100 / (1 + df['rs']))

RSI原理

RSI的计算逻辑:

1. 把过去14天分成"涨的天"和"跌的天"
2. 算平均涨幅和平均跌幅
3. RS = 平均涨幅 / 平均跌幅
4. RSI = 100 - (100 / (1 + RS))

RSI的含义

RSI取值范围是0-100:

  • RSI > 70:超买,可能要跌
  • RSI < 30:超卖,可能要涨
  • RSI = 50:涨跌平衡

比如RSI = 80,说明过去14天涨的力量远大于跌的力量,可能涨过头了,要警惕回调。

为什么加1e-10

df['rs'] = df['avg_gain'] / (df['avg_loss'] + 1e-10)

这是为了避免除以0。如果某只股票过去14天一直涨从来没跌,avg_loss就是0,直接除会报错。

加一个很小的数(1e-10)既避免了报错,又不影响计算结果。

MACD(指数平滑移动平均)

# MACD计算
df['ema_12'] = df.groupby('symbol')['close'].transform(
    lambda x: x.ewm(span=12, adjust=False).mean()
)
df['ema_26'] = df.groupby('symbol')['close'].transform(
    lambda x: x.ewm(span=26, adjust=False).mean()
)
df['macd'] = df['ema_12'] - df['ema_26']
df['macd_signal'] = df.groupby('symbol')['macd'].transform(
    lambda x: x.ewm(span=9, adjust=False).mean()
)
df['macd_hist'] = df['macd'] - df['macd_signal']

MACD原理

MACD比MA更复杂一点,用的是指数加权移动平均(EMA):

EMA_12 = 12日指数加权均线(近期权重更大)
EMA_26 = 26日指数加权均线
MACD = EMA_12 - EMA_26
Signal = MACD的9日均线
Hist = MACD - Signal

MACD的含义

  • MACD > 0:短期趋势强于长期,看涨
  • MACD < 0:短期趋势弱于长期,看跌
  • MACD上穿Signal(金叉):买入信号
  • MACD下穿Signal(死叉):卖出信号

为什么用EMA而不是MA

EMA给近期数据更大的权重,对价格变化更敏感。

比如:

  • MA_10:过去10天等权平均
  • EMA_10:最近的价格权重更大

这样能更快捕捉到趋势变化。

3.5 成交量特征 - 反映资金流向

成交量反映市场活跃度和资金流向。

def add_volume_features(self, df):
    """添加成交量特征"""
    df = df.copy()

    # 成交量移动平均
    for window in [5, 20]:
        df[f'volume_ma_{window}'] = df.groupby('symbol')['volume'].transform(
            lambda x: x.rolling(window).mean()
        )

        # 成交量相对均值
        df[f'volume_ratio_{window}'] = df['volume'] / (df[f'volume_ma_{window}'] + 1)

        self.feature_names.extend([f'volume_ma_{window}', f'volume_ratio_{window}'])

    # 成交额
    df['turnover'] = df['close'] * df['volume']
    df['turnover_ma_5'] = df.groupby('symbol')['turnover'].transform(
        lambda x: x.rolling(5).mean()
    )

    self.feature_names.extend(['turnover', 'turnover_ma_5'])

    return df

为什么要看成交量

价格涨跌要配合成交量看:

  • 放量上涨:多头力量强,趋势可靠
  • 缩量上涨:上涨乏力,可能是假突破
  • 放量下跌:空头力量强,要警惕
  • 缩量下跌:抛压不大,可能快止跌

volume_ratio的作用

volume_ratio_5 = volume / volume_ma_5

这个比值反映当前成交量相比平均水平:

  • volume_ratio > 2:放量(成交量是平均的2倍)
  • volume_ratio < 0.5:缩量

成交额 vs 成交量

成交量是股数,成交额是金额:

turnover = close * volume

为什么要看成交额?因为不同股票价格差异大:

  • 茅台1000元,成交10万手 = 10亿成交额
  • 五粮液100元,成交10万手 = 1亿成交额

成交额更能反映真实的资金量。

3.6 价格位置特征 - 判断高低位

价格位置特征判断当前是高位还是低位。

def add_price_position(self, df):
    """添加价格位置特征"""
    df = df.copy()

    # 20日最高/最低价
    df['high_20d'] = df.groupby('symbol')['high'].transform(
        lambda x: x.rolling(20).max()
    )
    df['low_20d'] = df.groupby('symbol')['low'].transform(
        lambda x: x.rolling(20).min()
    )

    # 当前价格在20日范围内的位置 (0-1)
    df['price_position'] = (df['close'] - df['low_20d']) / (
        df['high_20d'] - df['low_20d'] + 1e-10
    )

    self.feature_names.extend(['high_20d', 'low_20d', 'price_position'])

    return df

price_position的含义

price_position = (当前价 - 20日最低) / (20日最高 - 20日最低)

这个值在0-1之间:

  • price_position = 0:当前价格在20日最低点
  • price_position = 1:当前价格在20日最高点
  • price_position = 0.5:在中间位置

为什么有用

price_position能快速判断位置:

  • 0.8:高位,可能要回调

  • < 0.2:低位,可能要反弹
  • 0.4-0.6:中间位置,观望

3.7 滞后特征 - 显式传递历史信息

滞后特征让模型看到"历史"信息。

def add_lag_features(self, df, lags=[1, 2, 3, 5]):
    """添加滞后特征"""
    df = df.copy()

    for lag in lags:
        df[f'close_lag_{lag}'] = df.groupby('symbol')['close'].shift(lag)
        df[f'volume_lag_{lag}'] = df.groupby('symbol')['volume'].shift(lag)

        self.feature_names.extend([f'close_lag_{lag}', f'volume_lag_{lag}'])

    return df

shift的作用

shift(n)** 把数据向后移n位:**

原始:      [100, 105, 103, 108]
shift(1):  [NaN, 100, 105, 103]
shift(2):  [NaN, NaN, 100, 105]

这样模型在预测今天的时候,能看到昨天、前天的数据。

为什么不直接用日期

有人可能会问:模型不是自己能学到时间规律吗?为什么要手动加滞后特征?

原因是:时间序列数据有自相关性。今天的价格往往和昨天的价格强相关。如果不显式地把昨天的数据作为特征,模型很难学到这种规律。

四、构建目标变量

特征有了,还需要"标签"(要预测的目标)。

def add_target(self, df, periods=[1, 5]):
    """添加目标变量(未来收益率)"""
    df = df.copy()

    for period in periods:
        # 未来收益率
        df[f'target_return_{period}d'] = df.groupby('symbol')['close'].shift(-period) / df['close'] - 1

        # 未来方向(涨/跌)
        df[f'target_direction_{period}d'] = (df[f'target_return_{period}d'] > 0).astype(int)

    return df

关键:shift(-1)

注意这里是 shift(-period),负数!

df['target_return_1d'] = df.groupby('symbol')['close'].shift(-1) / df['close'] - 1

shift(-1)** 取的是"未来"的值:**

原始close:    [100, 105, 103, 108]
shift(-1):    [105, 103, 108, NaN]
target:       [5%, -1.9%, 4.85%, NaN]

这样:

  • 今天的特征 → 预测明天的收益率
  • 昨天的特征 → 预测今天的收益率(已知)

为什么要direction

除了预测具体的收益率(回归问题),还可以预测涨跌方向(分类问题):

target_direction_1d = (target_return_1d > 0).astype(int)
  • 涨:1
  • 跌:0

有时候方向准确率比绝对值更重要。涨1%还是涨5%不重要,重要的是知道它会涨。

五、完整流程

把所有特征构建步骤串起来:

def build_features(self, df, add_target=True):
    """构建所有特征"""
    print("开始构建特征...")
    self.feature_names = []

    df = self.add_returns(df)
    print("  ✓ 收益率特征")

    df = self.add_moving_average(df)
    print("  ✓ 移动平均特征")

    df = self.add_volatility(df)
    print("  ✓ 波动率特征")

    df = self.add_momentum_indicators(df)
    print("  ✓ 动量指标")

    df = self.add_volume_features(df)
    print("  ✓ 成交量特征")

    df = self.add_price_position(df)
    print("  ✓ 价格位置特征")

    df = self.add_lag_features(df)
    print("  ✓ 滞后特征")

    if add_target:
        df = self.add_target(df)
        print("  ✓ 目标变量")

    # 删除含有NaN的行
    before_len = len(df)
    df = df.dropna()
    after_len = len(df)

    print(f"\n特征构建完成:")
    print(f"  特征数量: {len(self.feature_names)}")
    print(f"  数据条数: {after_len} (清理前: {before_len})")

    return df

为什么要删除NaN

特征计算过程中会产生NaN:

  • rolling(20):前20天没有数据
  • shift(5):前5天没有数据
  • pct_change(10):前10天没有数据

这些NaN必须删除,否则模型训练会报错。

这也是为什么最终数据条数会减少。158万条原始数据,算完特征后约剩 148 万条左右(前 60 个交易日因滚动窗口不足被删除)。

六、运行效果

用刚才构建的特征引擎跑一下:

engine = FeatureEngine()
df_features = engine.build_features(df, add_target=True)

输出:

开始构建特征...
  ✓ 收益率特征
  ✓ 移动平均特征
  ✓ 波动率特征
  ✓ 动量指标
  ✓ 成交量特征
  ✓ 价格位置特征
  ✓ 滞后特征
  ✓ 目标变量

特征构建完成:
  特征数量: 39
  数据条数: 1,537,663 (清理前: 1,589,404)

最终得到39个特征:

收益率(4个):   return_1d, return_5d, return_10d, return_20d
移动平均(8个): ma_5, ma_10, ma_20, ma_60,
                close_to_ma_5, close_to_ma_10, close_to_ma_20, close_to_ma_60
波动率(6个):   volatility_5d, volatility_20d, volatility_60d,
                atr_5d, atr_20d, atr_60d
动量指标(4个): rsi, macd, macd_signal, macd_hist
成交量(6个):   volume_ma_5, volume_ma_20, volume_ratio_5, volume_ratio_20,
                turnover, turnover_ma_5
价格位置(3个): high_20d, low_20d, price_position
滞后特征(8个): close_lag_1/2/3/5, volume_lag_1/2/3/5

七、踩过的坑

坑1:忘记groupby

一开始我没加groupby:

# 错误!
df['ma_20'] = df['close'].rolling(20).mean()

结果发现茅台的均线用到了五粮液的数据。因为数据是这样排列的:

茅台 2022-01-01
茅台 2022-01-02
...
茅台 2022-01-20
五粮液 2022-01-04  <- 这里的均线用了茅台的数据!

加上groupby就解决了:

# 正确
df['ma_20'] = df.groupby('symbol')['close'].transform(
    lambda x: x.rolling(20).mean()
)

7.1 坑1:忘记groupby

最开始我写的代码是这样的:

错误做法

df['ma_20'] = df['close'].rolling(20).mean()

问题:这会把所有股票的数据混在一起算均线。

比如数据是这样排列的:

茅台 1月1日  1000元
茅台 1月2日  1050元
...
茅台 1月20日 1100元
五粮液 1月1日  200元  <- 这里的MA_20会包含茅台的数据!

结果就是五粮液的均线里混入了茅台的价格,完全错了。

正确做法

df['ma_20'] = df.groupby('symbol')['close'].transform(
    lambda x: x.rolling(20).mean()
)

加上 groupby('symbol') 后,每只股票独立计算,不会串。

这个坑我调试了好久才发现,因为代码不报错,但结果就是不对。

7.2 坑2:数据泄露

这个是最容易犯的错误。

错误做法**:

df['target'] = df['close'].pct_change(1)

这算的是今天相比昨天的涨跌,是"过去"的信息。用过去预测过去,模型当然准确率高,但没用。

正确做法**:

df['target'] = df.groupby('symbol')['close'].shift(-1) / df['close'] - 1

shift(-1)** 取的是明天的价格,这才是"未来"的信息。

我最开始就犯了这个错,模型训练出来准确率90%,高兴了半天,后来发现是数据泄露。真实预测时准确率只有52%。

7.3 坑3:NaN没处理干净

一开始我只在最后dropna,但中间有些计算会因为NaN报错:

df['rs'] = df['avg_gain'] / df['avg_loss']  # 如果avg_loss是0或NaN,会出问题

后来改成:

df['rs'] = df['avg_gain'] / (df['avg_loss'] + 1e-10)  # 避免除以0
df = df.dropna()  # 最后统一删除NaN行

策略是:

  • 中间计算时用小数(1e-10)避免除零
  • 最后统一 dropna 删除所有包含 NaN 的行

八、特征有效性验证

并不是所有特征都有用,可以看看特征的IC值(信息系数):

# 计算每个特征和目标的相关性
ic = df_features[feature_names].corrwith(df_features['target_return_1d'])
ic = ic.sort_values(ascending=False)

print("特征重要性(IC值):")
print(ic.head(10))

输出:

return_1d         0.082
macd             0.051
close_to_ma_5    0.043
rsi              0.038
volume_ratio_5   0.031
...

IC值大于0.03的特征一般认为是有效的。

800只股票、10年数据,这里的 IC 是基于 148 万样本算出来的,统计置信度很高——不用担心小样本偶然性。

九、保存特征数据

# 保存特征数据(继续用 Parquet)
df_features.to_parquet('data/features.parquet', index=False, compression='snappy')
print(f"特征数据已保存: data/features.parquet")
print(f"  行数: {len(df_features):,}")
print(f"  列数: {len(df_features.columns)}")

输出:

特征数据已保存: data/features.parquet
  行数: 1,537,663
  列数: 53

十、总结

特征工程是整个系统最核心的部分之一。这篇花了很长篇幅,主要内容:

数据规模

  • 输入:797只股票,158万条原始数据
  • 输出:154万条特征数据(丢弃了每只股票最早60个交易日)
  • 39个特征 + 4个目标变量

7大类特征

  • 收益率:反映涨跌幅(4个)
  • 移动平均:反映趋势(8个)
  • 波动率:反映风险(6个)
  • 动量指标:判断超买超卖(4个)
  • 成交量:反映资金流向(6个)
  • 价格位置:判断高低位(3个)
  • 滞后特征:显式传递时序信息(8个)

关键技术点

  • groupby('symbol')** 分组计算,防止不同股票数据串行
  • shift(-1)** 取未来值作为标签,方向不能搞反
  • Parquet 保存,读取比 CSV 快很多

踩过的坑

  • 忘记 groupby:茅台的均线混入了五粮液的数据
  • 数据泄露:target 用了过去收益率,导致虚假高准确率
  • NaN 处理:中间计算要先防除零(+1e-10),最后再统一 dropna

下一篇会用这些特征训练预测模型,看看能不能预测股票涨跌。

下一篇: 第4篇:预测模型