在金融量化分析中,最危險的不是沒有資料,而是用了「錯誤的資料」。本專案的原始程式碼中,針對台股特性實作了多重清洗機制,確保最終產出的週/月/年 K 線能反映真實市場動能。
以下是本程式碼執行的四大核心清洗功能:1. 物理性邏輯檢查:排除「不可能的報價」
原始數據常因傳輸錯誤出現異常,程式第一步先進行物理性過濾:
- 四價一致性檢查:確保「最高價大於開盤、收盤、最低價」且「最低價小於開盤、收盤、最高價」。
- 零值與負值過濾:自動剔除股價小於等於 0 的異常紀錄。
- 時區統一化:修正 yfinance 常見的時區偏移,避免 9/1 的資料被誤算進 8 月的月 K 線。
2. 停牌與復牌偵測:辨識「時空跳躍」
長期停牌後的復牌,股價往往會出現巨大的斷層(例如因掏空或重整後掛牌),這會扭曲統計矩陣。
- Ghost 占位符偵測:自動辨識「四價相等且成交量為 0」的連續區段(Ghost segment)。
- 復牌標記(Resume Flags):當兩筆交易日相隔超過 5 天,且開盤價與前一收盤價差距大於 30% 時,系統會標記為「復牌」,並在計算報酬時進行特殊處理或過濾。
3. 異常事件與「乒乓模式」過濾
為了防止除權息數據錯位或資料源汙染,程式加入了動態監控:
- Ping-Pong 偵測:自動篩選出當天大漲 40% 隔天大跌 40% 這種極度疑似「資料錯誤」或「除權息未還原」的標的,並將其列入 skip_pingpong.csv 排除名單。
- 極端報酬過濾:設定 EXTREME_RET_GAP 等門檻,攔截單日報酬率超過 100% 等不合理的極端跳空。
4. 市場寬度(Market Breadth)的穩健統計
在轉換為週、月、年 K 線時,程式不只是簡單相加,還考慮了「樣本代表性」:
- 薄樣本過濾:月 K 線若該月交易日少於 5 天,或年 K 線少於 120 天,系統會將該期報酬設為 NaN(不列入統計),避免新上市股或殘年數據干擾整體平均。
- 玩股網口徑對齊:週 K 線固定以「週五」為結算,並加入「前週無交易、本週大跳空」的過濾邏輯,與主流財經媒體的統計口徑保持一致。
💡 給讀者的話:為什麼我們需要這些練習?
這段程式碼就像是一個「自動化洗菜機」。如果你直接拿 GitHub 上的原始 CSV 去跑策略,你可能會被那些「假的大漲」給欺騙;但透過這套清洗邏輯,你才能得到真正乾淨、具備研究價值的 Parquet 格式數據。
這正是資料科學家 80% 的工作內容:在處理數據前,先學會與雜訊共舞。
如下程式碼是之前寫的,可以參考看看做了那些處理 因為有時候會出現收盤價大於最高價,或者收盤價小於最低價,而且還誤差大於10%的情況發生等等異常報酬情況發生。
也可以把程式碼直接餵AI分析後整合到自己的程式碼中,這樣就快多,不用再慢慢try。
因為字數限制的關係 如下程式碼會分兩段
# -*- coding: utf-8 -*-
"""
日K → weekK/monthK/yearK.parquet(W-FRI + 玩股口徑 + 本地快取 + 7 QA)
+ 包含所有修正與優化 (過濾/統計/QA輸出)
"""
import os, re, glob, time, warnings, random
from math import isfinite
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys # 用於 Colab 環境檢查
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from shutil import copy2
# ===== 基本設定 =====
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore")
WRITE_MODE = 'B' # 'A' / 'B' / 'C'
# 🌟 優化 1: Row Group 翻倍或四倍
ROW_GROUP_SIZE = 512_000 # 建議 512,000
# 🌟 優化 2: 增加執行緒數,最大化 CPU 利用率
MAX_WORKERS = 16 # 建議 16 或更高
# 🌟 優化 3: 顯著加大批次合併量,減少 I/O 寫入次數
SHARD_SIZE = 10000 # 建議 8,000 - 15,000 之間
# 🌟 快取控制 (保留快取,下次執行跳過複製)
KEEP_LOCAL_CACHE = True
MARKET_LIST = [
"tw-share"
# "us-share", "hk-share", "jp-share", "kr-share", "cn-share",
]
MARKET_ABBR_MAP = {
"tw-share": "TW", "us-share": "US", "cn-share": "CN",
"hk-share": "HK", "jp-share": "JP", "kr-share": "KR"
}
# 根據您的需求,假設 DRIVE_BASE 和 LOCAL_BASE 如下
try:
# 檢查是否在 Colab 環境中
if 'google.colab' in sys.modules:
from google.colab import drive
try:
# 避免重複 mount 警告
drive.mount('/content/drive', force_remount=False)
except Exception:
pass
DRIVE_BASE = "/content/drive/MyDrive/各國股票檔案"
else:
DRIVE_BASE = os.path.abspath("./data_drive")
except Exception:
DRIVE_BASE = os.path.abspath("./data_drive")
LOCAL_BASE = "/content/_wmy_tmp"
LOCAL_DAYK_CACHE = f"{LOCAL_BASE}/dayK_cache" # 本地日K快取(保留)
# ===== 範圍限制(只處理 2024-01-01 ~ 2025-12-31)=====
DATE_START_STR = "2000-01-01"
DATE_END_STR = "2025-12-31"
PAD_DAYS = 14 # 緩衝天數:保留起始日前 N 天,讓第一期有前收可算
DATE_START = pd.Timestamp(DATE_START_STR)
DATE_END = pd.Timestamp(DATE_END_STR)
DATE_PAD_START = DATE_START - pd.Timedelta(days=PAD_DAYS)
# ===== 清洗與偵測參數 =====
MIN_YEAR = 1990
ALLOW_ZERO_VOLUME = True
CHECK_RETURN_D = True
# [***修正點 A***] 將日 K 極端報酬門檻調高,確保極端跳空日不會被日 K 預先刪除。
EXTREME_RET_GAP = 100.00
EXTREME_RET_CANDLE = 100.00
EXTREME_RET_MAX_FROMP = 100.00
EXTREME_RET_MIN_FROMP = 100.00
ABS_MAX_REASONABLE = 100.00
# 週級安全閥:上一週無交易且本週大跳空 → 視為拆分/復牌,不計報酬
STOPWEEK_JUMP_THRESHOLD = 0.80 # 80%
# ====== 復牌偵測參數(NEW)======
LONG_GAP_DAYS = 5 # 與前一筆有效交易日相隔 ≥ N 天 → 疑似停牌/長假
RESUME_JUMP_THRESHOLD = 0.30 # 復牌第一天:|開盤/前收 - 1| ≥ 30%
GHOST_MIN_LEN = 2 # 連續 ghost(四價相等且零量)至少幾天算占位段
PRE_RESUME_DAYS = 5 # 在 ghost 段「前面」最多回溯幾天,若大跳空也視為復牌頭
EPS_EQ = 1e-8 # 四價相等容差
SKIP_SUSPECT_STOCK = True
PINGPONG_THRESHOLD = 0.40
# [***新增 M/Y 門檻***] 穩健過濾參數
MIN_DAYS_MONTH = 5 # 月K:至少要有 5 個交易日
MIN_DAYS_YEAR = 120 # 年K:至少要有 120 個交易日 (約半年)
BIG_RET_MONTH_YEAR = 0.50 # 月/年報酬 |Ret_Trad| > 50% 且 HasResume/薄樣本 → 過濾
GAP_BIG_MONTH_YEAR = 0.50 # 月/年跳空 |Ret_Gap| > 50% 且 HasResume/薄樣本 → 過濾
# 欄位候選
DATE_COLS = ['date','日期','Date']
OPEN_COLS = ['open','開盤','開盤價','Open']
HIGH_COLS = ['high','最高','最高價','High']
LOW_COLS = ['low','低','最低價','Low']
CLOSE_COLS = ['close','收盤','收盤價','Adj Close','adj close','Close']
VOL_COLS = ['volume','成交量','Volume']
SPLIT_COLS = ['Stock Splits','stock splits','Splits']
DIV_COLS = ['Dividends','dividends']
def _is_equal4(o, h, l, c, eps=EPS_EQ):
mx = max(o, h, l, c); mn = min(o, h, l, c)
return (mx - mn) <= eps
def _pick(df, cands):
low = {str(c).lower(): c for c in df.columns}
for k in cands:
if k.lower() in low:
return low[k.lower()]
for k in cands:
rx = re.compile(k, re.I)
for c in df.columns:
if rx.search(str(c)):
return c
return None
def _read_csv_fast(path: str) -> pd.DataFrame:
try:
return pd.read_csv(path, encoding='utf-8-sig', engine='pyarrow')
except Exception:
try:
return pd.read_csv(path, encoding='utf-8', engine='pyarrow')
except Exception:
try:
return pd.read_csv(path, encoding='utf-8-sig')
except Exception:
return pd.read_csv(path, encoding='utf-8')
# [修正] 增加邏輯,當檔名包含 '_' 時,只取左側部分作為 StockID,右側作為 Name
def _parse_id_name(stem: str):
"""
自訂邏輯:
1. 若檔名主體包含 '_' (如: 1305.TW_華夏),則 ID 取左側 (1305.TW),Name 取右側。
2. 若不包含 '_' (如: AAPL),則 ID 取完整 stem,Name 為空。
"""
if '_' in stem:
# 將 stem (如 '1305.TW_華夏') 分割成 ID 和 Name
id_part, name_part = stem.rsplit('_', 1)
# 台灣市場 StockID 只取 '_' 左邊的部分 (1305.TW)
return id_part, name_part
# 其他國家或無名稱後綴的,整個 stem 就是 ID
return stem, ""
# [修正] 確保最終 StockID 不帶後綴 .csv
def _canonical_id(raw_id: str) -> str:
"""
直接使用 raw_id 作為最終的 StockID,保留所有後綴 (.TW, .HK, .KS 等)。
"""
# 由於 _parse_id_name 已經移除了 .csv,這裡只需返回 raw_id
# 確保不會誤將名稱(如 '華夏')當作 ID
return raw_id
def _prepare_local_dayk(market_key: str) -> str:
drive_dayk_dir = Path(f"{DRIVE_BASE}/{market_key}/dayK")
local_dayk_dir = Path(f"{LOCAL_DAYK_CACHE}/{market_key}")
local_dayk_dir.mkdir(parents=True, exist_ok=True)
csv_files = sorted(glob.glob(str(drive_dayk_dir / "*.csv")))
if not csv_files:
print(f"❌ 快取失敗: Drive 上的 {drive_dayk_dir} 找不到任何 CSV 檔案。")
return ""
def _copy_if_newer(src: str, dst_dir: Path):
dst = dst_dir / Path(src).name
try:
if not dst.exists():
copy2(src, dst)
else:
s, d = Path(src).stat(), dst.stat()
if (s.st_mtime > d.st_mtime) or (s.st_size != d.st_size):
copy2(src, dst)
except Exception as e:
print(f"⚠️ 快取檔案失敗:{src} -> {dst} ({e})")
t_start = time.time()
print(f"\n🚀 增量快取 {len(csv_files)} 檔日K CSV 到本地:{local_dayk_dir}")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
list(as_completed([executor.submit(_copy_if_newer, f, local_dayk_dir) for f in csv_files]))
print(f"✅ 快取完成!耗時 {time.time() - t_start:.2f} 秒。路徑: {local_dayk_dir}")
return str(local_dayk_dir)
def _basic_price_checks(df: pd.DataFrame, allow_zero_vol=ALLOW_ZERO_VOLUME) -> pd.DataFrame:
df = df.copy()
for c in ['開盤','最高','最低','收盤','成交量']:
if c in df.columns:
df[c] = pd.to_numeric(df[c], errors='coerce')
df = df[(df['開盤'] > 0) & (df['最高'] > 0) & (df['最低'] > 0) & (df['收盤'] > 0)]
df = df[(df['最高'] >= df[['開盤','收盤','最低']].max(axis=1)) &
(df['最低'] <= df[['開盤','收盤','最高']].min(axis=1)) &
(df['最高'] >= df['最低'])]
if not allow_zero_vol and '成交量' in df.columns:
df = df[df['成交量'] > 0]
# [***修正點 1/3:這裡暫不處理時區***]
df['日期'] = pd.to_datetime(df['日期'], errors='coerce').dt.tz_localize(None)
df = df.dropna(subset=['日期'])
df = df[df['日期'].dt.year >= MIN_YEAR]
df = df.sort_values('日期').drop_duplicates(subset=['日期']).reset_index(drop=True)
return df
def _extreme_return_filter_day(df: pd.DataFrame) -> pd.DataFrame:
if not CHECK_RETURN_D:
return df
df = df.copy()
prev_c = df['收盤'].shift(1).replace(0, np.nan)
o_safe = df['開盤'].replace(0, np.nan)
ret_gap = (o_safe / prev_c) - 1
ret_candle = (df['收盤'] / o_safe) - 1
ret_max_p = (df['最高'] / prev_c) - 1
ret_min_p = (df['最低'] / prev_c) - 1
mask_ok = (
(ret_gap.abs() < EXTREME_RET_GAP) &
(ret_candle.abs() < EXTREME_RET_CANDLE) &
(ret_max_p.abs() < EXTREME_RET_MAX_FROMP) &
(ret_min_p.abs() < EXTREME_RET_MIN_FROMP) &
(ret_gap.abs() <= ABS_MAX_REASONABLE) &
(ret_candle.abs() <= ABS_MAX_REASONABLE) &
(ret_max_p.abs() <= ABS_MAX_REASONABLE) &
(ret_min_p.abs() <= ABS_MAX_REASONABLE)
)
return df[mask_ok.fillna(False)].copy()
def _clip_by_date_range(df: pd.DataFrame, col='日期',
start=DATE_START, end=DATE_END, pad_start=DATE_PAD_START):
"""裁成 2024–2025;起始保留 pad_start 以便第一期有前收。"""
if df.empty or col not in df.columns:
return df
m = (df[col] >= pad_start) & (df[col] <= end)
return df.loc[m].copy()
# [✅ 修正過並向量化]
def _mark_resume_flags_with_ghost(df_orig: pd.DataFrame) -> pd.Series:
"""在『未刪除 ghost』表上標記復牌日。(向量化優化)"""
if df_orig.empty:
return pd.Series(False, index=df_orig.index)
df = df_orig.copy()
# 🌟 向量化檢查四價相等
o, h, l, c = df['開盤'].to_numpy(), df['最高'].to_numpy(), df['最低'].to_numpy(), df['收盤'].to_numpy()
# 檢查 NaN/Inf 且四價相等
eq_ohlc_vec = np.isfinite(o) & np.isfinite(h) & np.isfinite(l) & np.isfinite(c) & \
((np.maximum.reduce([o, h, l, c]) - np.minimum.reduce([o, h, l, c])) <= EPS_EQ)
ghost_mask = (df['成交量'].fillna(0) == 0) & eq_ohlc_vec
prev_valid_close = df['收盤'].where(~ghost_mask).shift(1).ffill()
prev_valid_date = df['日期'].where(~ghost_mask).shift(1).ffill()
gap_days = (df['日期'] - prev_valid_date).dt.days
open_to_prev = (df['開盤'] / prev_valid_close) - 1
cond_A = (gap_days >= LONG_GAP_DAYS) & (open_to_prev.abs() >= RESUME_JUMP_THRESHOLD)
g = ghost_mask.to_numpy(); n = len(df)
resume_idx = set(np.where(cond_A)[0].tolist())
i = 0
while i < n:
if g[i]:
j = i
while j < n and g[j]:
j += 1
seg_len = j - i
if seg_len >= GHOST_MIN_LEN:
for back in range(1, PRE_RESUME_DAYS + 1):
k = i - back
if k < 0: break
if (not g[k]) and (abs(open_to_prev.iloc[k]) >= RESUME_JUMP_THRESHOLD):
resume_idx.add(k)
i = j
else:
i += 1
s = pd.Series(False, index=df.index)
if resume_idx:
s.iloc[sorted(list(resume_idx))] = True
return s
# [✅ 修正過並向量化]
def _load_day_clean_full(path: str) -> pd.DataFrame:
"""正規欄位對齊 → 標記復牌 → 刪 ghost → 價量檢查 → 極端過濾"""
raw = _read_csv_fast(path)
dc = _pick(raw, DATE_COLS)
oc = _pick(raw, OPEN_COLS)
hc = _pick(raw, HIGH_COLS)
lc = _pick(raw, LOW_COLS)
cc = _pick(raw, CLOSE_COLS)
vc = _pick(raw, VOL_COLS)
sc = _pick(raw, SPLIT_COLS) # 可能不存在
if not all([dc, oc, hc, lc, cc]):
raise ValueError(f"缺必要欄位:{Path(path).name}")
if vc is None:
raw['__vol__'] = np.nan
vc = '__vol__'
df0 = raw[[dc, oc, hc, lc, cc, vc]].copy()
df0.columns = ['日期','開盤','最高','最低','收盤','成交量']
for c in ['開盤','最高','最低','收盤','成交量']:
df0[c] = pd.to_numeric(df0[c], errors='coerce')
# [***修正 1:時區邊界錯誤修正***]
df0['日期'] = pd.to_datetime(df0['日期'], errors='coerce')
if df0['日期'].dt.tz is not None:
df0['日期'] = df0['日期'].dt.tz_convert('Asia/Taipei').dt.tz_localize(None)
else:
df0['日期'] = df0['日期'].dt.tz_localize(None)
# [***修正 1 結束***]
resume_flag = _mark_resume_flags_with_ghost(df0)
# 🌟 配合向量化邏輯
o, h, l, c = df0['開盤'].to_numpy(), df0['最高'].to_numpy(), df0['最低'].to_numpy(), df0['收盤'].to_numpy()
eq_ohlc = np.isfinite(o) & np.isfinite(h) & np.isfinite(l) & np.isfinite(c) & \
((np.maximum.reduce([o, h, l, c]) - np.minimum.reduce([o, h, l, c])) <= EPS_EQ)
ghost_mask = (df0['成交量'].fillna(0) == 0) & eq_ohlc
if sc is not None and sc in raw.columns:
splits = pd.to_numeric(raw[sc], errors='coerce').fillna(0)
ghost_mask = ghost_mask | ((df0['成交量'].fillna(0) == 0) & (splits > 0))
df = df0.loc[~ghost_mask].copy()
df['resume_flag'] = resume_flag.loc[df.index]
# === [***最終修正點 B***] 強化日 K 復牌標記:捕捉未能被 Ghost 邏輯捕捉的極端跳空 ===
DAILY_JUMP_THRESHOLD = 1.00 # 100% 日報酬
prev_c_day = df['收盤'].shift(1).replace(0, np.nan)
daily_ret = (df['收盤'] / prev_c_day) - 1
extreme_daily_jump_mask = daily_ret.abs().gt(DAILY_JUMP_THRESHOLD).fillna(False)
df['resume_flag'] = df['resume_flag'] | extreme_daily_jump_mask
# ====================================================================================
df = _basic_price_checks(df)
if df.empty:
raise ValueError("清洗後為空")
df = _extreme_return_filter_day(df)
if df.empty:
raise ValueError("極端報酬過濾後為空")
return df
def _resample_ohlc_with_flags(df: pd.DataFrame, rule: str) -> pd.DataFrame:
df = df.copy()
# [***修正 2:移除多餘的 tz_localize(None)***]
df['日期'] = pd.to_datetime(df['日期'], errors='coerce')
g = df.set_index('日期').sort_index()
if rule == 'W': # 以週五結算(與玩股網一致)
rule_code, label, closed = 'W-FRI', 'right', 'right'
period_name = 'Week'
elif rule == 'M':
rule_code, label, closed = 'ME', 'right', 'right'
period_name = 'Period'
elif rule == 'Y':
rule_code, label, closed = 'YE', 'right', 'right'
period_name = 'Period'
else:
raise ValueError("rule must be 'W', 'M', or 'Y'.")
o = g['開盤'].resample(rule_code, label=label, closed=closed).first()
h = g['最高'].resample(rule_code, label=label, closed=closed).max()
l = g['最低'].resample(rule_code, label=label, closed=closed).min()
c = g['收盤'].resample(rule_code, label=label, closed=closed).last()
v = g['成交量'].resample(rule_code, label=label, closed=closed).sum(min_count=1)
# 期間交易天數
n = g['收盤'].resample(rule_code, label=label, closed=closed).count().rename(f'Cur{period_name}_Days')
# 期間是否發生過 resume_flag
has_resume = None
if 'resume_flag' in g.columns:
has_resume = g['resume_flag'].resample(rule_code, label=label, closed=closed).max().rename('HasResume')
parts = [o, h, l, c, v, n]
if has_resume is not None:
parts.append(has_resume)
out = pd.concat(parts, axis=1).dropna(subset=['開盤','收盤'])
cols = ['開盤','最高','最低','收盤','成交量', f'Cur{period_name}_Days']
if has_resume is not None:
cols.append('HasResume')
out.columns = cols
# 上期交易天數
out[f'Prev{period_name}_Days'] = out[f'Cur{period_name}_Days'].shift(1).fillna(0).astype(int)
if rule == 'W':
iso = out.index.to_series().dt.isocalendar()
out['ISO_Week'] = iso.year.astype(str) + '-' + iso.week.astype(str).str.zfill(2)
out = out.rename(columns={'CurWeek_Days': 'CurWeek_Days', 'PrevWeek_Days': 'PrevWeek_Days'})
out = out.reset_index().rename(columns={'index':'日期'})
return _basic_price_checks(out)
def _add_period_returns(df: pd.DataFrame, freq_code: str) -> pd.DataFrame:
df = df.sort_values('日期').reset_index(drop=True).copy()
prev_c = df['收盤'].shift(1).replace(0, np.nan)
o = df['開盤'].replace(0, np.nan)
h = df['最高'].replace(0, np.nan)
l = df['最低'].replace(0, np.nan)
c = df['收盤'].replace(0, np.nan)
# 報酬率計算 (與您原代碼一致)
df[f'PrevC_{freq_code}'] = prev_c
df[f'Ret_Gap_{freq_code}'] = (o / prev_c) - 1
df[f'Ret_Trad_{freq_code}'] = (c - prev_c) / prev_c
df[f'Ret_C_{freq_code}'] = (c - o) / o
df[f'Ret_Max_H_{freq_code}'] = (h - prev_c) / prev_c
df[f'Ret_Min_L_{freq_code}'] = (l - prev_c) / prev_c
df[f'Range_{freq_code}'] = (h - l) / o
df[f'Ret_Max_H_Pos_{freq_code}'] = df[f'Ret_Max_H_{freq_code}'].clip(lower=0)
df[f'Ret_H_from_O_{freq_code}'] = (h - o) / o
df[f'Ret_L_from_O_{freq_code}'] = (l - o) / o
df[f'Ret_End_RelH_{freq_code}'] = (c - h) / h
df[f'Ret_End_RelL_{freq_code}'] = (c - l) / l
# --- 過濾邏輯 ---
final_clear_mask = pd.Series(False, index=df.index)
cols_to_nan = [f'Ret_Trad_{freq_code}', f'Ret_Max_H_{freq_code}', f'Ret_Min_L_{freq_code}']
# 預設 IsFiltered_QA 為 0 (僅 M/Y 需要)
if freq_code in ('M', 'Y'):
df['IsFiltered_QA'] = 0
if freq_code == 'W' and 'PrevWeek_Days' in df.columns:
# 1. 週K 玩股口徑過濾:上一週無交易且本週大跳空
big_jump = (df[f'PrevC_W'] > 0) & (df['開盤'] > 0) & ((df['開盤'] / df[f'PrevC_W'] - 1).abs() > STOPWEEK_JUMP_THRESHOLD)
prev_w_no_trade = (df['PrevWeek_Days'] == 0)
jump_mask = big_jump & prev_w_no_trade
final_clear_mask = final_clear_mask | jump_mask
# 2. 週K HasResume + 極端事件過濾 ([***最終修正點 C***])
EXTREME_RET_USER_REQ = 1.00 # 100%
BIG_RET_THRESHOLD = 0.50
has_resume_mask = df.get('HasResume', pd.Series(False, index=df.index)).fillna(0).gt(0)
is_extreme_event_current_week = has_resume_mask & df[f'Ret_Trad_W'].abs().gt(EXTREME_RET_USER_REQ)
skip_current_week_from_previous_event = is_extreme_event_current_week.shift(1).fillna(False)
mask_thin_or_large_jump = has_resume_mask & (df['CurWeek_Days'].lt(3) | df[f'Ret_Trad_W'].abs().gt(BIG_RET_THRESHOLD))
final_clear_mask = final_clear_mask | is_extreme_event_current_week | skip_current_week_from_previous_event | mask_thin_or_large_jump
elif freq_code in ('M', 'Y') and 'HasResume' in df.columns:
# 3. 月/年K 穩健過濾:薄樣本 + 停牌復牌 + 大跳空
period_name = 'Period'
# [***修正 3:修正 Days 欄位名稱判斷***]
days_col = 'CurPeriod_Days' if 'CurPeriod_Days' in df.columns else f'Cur{period_name}_Days'
prev_days_col = 'PrevPeriod_Days' if 'PrevPeriod_Days' in df.columns else f'Prev{period_name}_Days'
min_days = MIN_DAYS_MONTH if freq_code == 'M' else MIN_DAYS_YEAR
# 條件 A: 覆蓋率不足 (新掛牌或殘年)
small_coverage_mask = df[days_col].fillna(0).astype(int) < min_days
# 條件 B: 停牌/復牌週期極端值
has_resume = df['HasResume'].fillna(0).gt(0)
thin_mask = df[days_col].fillna(0).astype(int) < 3 # 極薄樣本 (3日以下)
big_gap_mask = df[f'Ret_Gap_{freq_code}'].abs() > GAP_BIG_MONTH_YEAR
big_ret_mask = df[f'Ret_Trad_{freq_code}'].abs() > BIG_RET_MONTH_YEAR
resume_extreme_mask = has_resume & (thin_mask | big_ret_mask | big_gap_mask)
# 條件 C: 上期無交易 + 本期大跳空 (類似週K的玩股口徑)
stop_jump_mask = (df[prev_days_col].fillna(0).astype(int) == 0) & (df[f'Ret_Gap_{freq_code}'].abs() > STOPWEEK_JUMP_THRESHOLD)
final_clear_mask = final_clear_mask | small_coverage_mask | stop_jump_mask | resume_extreme_mask
# 🎯 [***新增 QA 統計***]:標記被過濾的行
df.loc[final_clear_mask, 'IsFiltered_QA'] = 1
# 應用過濾:將滿足過濾條件的報酬設為 NaN
if final_clear_mask.any():
df.loc[final_clear_mask, cols_to_nan] = np.nan
return df
class ParquetStreamer:
def __init__(self, path: str, keep_cols=None, row_group_size: int = ROW_GROUP_SIZE):
self.path = path
self.writer = None
self.schema = None
self.rows = 0
self.keep_cols = keep_cols
self.row_group_size = row_group_size
def _ensure_writer(self, table: pa.Table):
if self.writer is None:
Path(self.path).parent.mkdir(parents=True, exist_ok=True)
self.schema = table.schema
self.writer = pq.ParquetWriter(self.path, self.schema, compression='snappy')
def append_df(self, df: pd.DataFrame):
if df is None or df.empty:
return
df = df.copy()
if self.keep_cols is not None:
for c in self.keep_cols:
if c not in df.columns:
df[c] = np.nan
df = df.reindex(columns=self.keep_cols)
if '日期' in df.columns:
df['日期'] = pd.to_datetime(df['日期'], errors='coerce').dt.tz_localize(None)
# 確保 IsFiltered_QA 欄位是整數
if 'IsFiltered_QA' in df.columns:
df['IsFiltered_QA'] = pd.to_numeric(df['IsFiltered_QA'], errors='coerce').fillna(0).astype('int32')
# 🚀 【修正:強制 StockID 為字串】確保前導零保留
for c in df.columns:
if c == 'StockID':
df[c] = df[c].astype(str)
continue # 跳過原有的數值轉換邏輯
if df[c].dtype in ['int64','float64','int32','float32'] and c not in ['ISO_Week', 'IsFiltered_QA']:
df[c] = pd.to_numeric(df[c], errors='coerce').astype(np.float64)
# 【修正結束】
table = pa.Table.from_pandas(df, preserve_index=False)
self._ensure_writer(table)
if table.schema != self.schema:
table = pa.Table.from_pandas(df.reindex(columns=[f.name for f in self.schema]), preserve_index=False).cast(self.schema, safe=False)
self.writer.write_table(table, row_group_size=self.row_group_size)
self.rows += len(df)
def close(self):
if self.writer is not None:
try:
self.writer.close()
except:
pass












