【技術補充】如何將雜訊轉為金礦?台股大數據的資料清洗(Data Cleaning)實作-1

更新 發佈閱讀 56 分鐘
投資理財內容聲明

在金融量化分析中,最危險的不是沒有資料,而是用了「錯誤的資料」。本專案的原始程式碼中,針對台股特性實作了多重清洗機制,確保最終產出的週/月/年 K 線能反映真實市場動能。

以下是本程式碼執行的四大核心清洗功能:

1. 物理性邏輯檢查:排除「不可能的報價」

原始數據常因傳輸錯誤出現異常,程式第一步先進行物理性過濾:

  • 四價一致性檢查:確保「最高價大於開盤、收盤、最低價」且「最低價小於開盤、收盤、最高價」。
  • 零值與負值過濾:自動剔除股價小於等於 0 的異常紀錄。
  • 時區統一化:修正 yfinance 常見的時區偏移,避免 9/1 的資料被誤算進 8 月的月 K 線。

2. 停牌與復牌偵測:辨識「時空跳躍」

長期停牌後的復牌,股價往往會出現巨大的斷層(例如因掏空或重整後掛牌),這會扭曲統計矩陣。

  • Ghost 占位符偵測:自動辨識「四價相等且成交量為 0」的連續區段(Ghost segment)。
  • 復牌標記(Resume Flags):當兩筆交易日相隔超過 5 天,且開盤價與前一收盤價差距大於 30% 時,系統會標記為「復牌」,並在計算報酬時進行特殊處理或過濾。

3. 異常事件與「乒乓模式」過濾

為了防止除權息數據錯位或資料源汙染,程式加入了動態監控:

  • Ping-Pong 偵測:自動篩選出當天大漲 40% 隔天大跌 40% 這種極度疑似「資料錯誤」或「除權息未還原」的標的,並將其列入 skip_pingpong.csv 排除名單。
  • 極端報酬過濾:設定 EXTREME_RET_GAP 等門檻,攔截單日報酬率超過 100% 等不合理的極端跳空。

4. 市場寬度(Market Breadth)的穩健統計

在轉換為週、月、年 K 線時,程式不只是簡單相加,還考慮了「樣本代表性」:

  • 薄樣本過濾:月 K 線若該月交易日少於 5 天,或年 K 線少於 120 天,系統會將該期報酬設為 NaN(不列入統計),避免新上市股或殘年數據干擾整體平均。
  • 玩股網口徑對齊:週 K 線固定以「週五」為結算,並加入「前週無交易、本週大跳空」的過濾邏輯,與主流財經媒體的統計口徑保持一致。

💡 給讀者的話:為什麼我們需要這些練習?

這段程式碼就像是一個「自動化洗菜機」。如果你直接拿 GitHub 上的原始 CSV 去跑策略,你可能會被那些「假的大漲」給欺騙;但透過這套清洗邏輯,你才能得到真正乾淨、具備研究價值的 Parquet 格式數據。

這正是資料科學家 80% 的工作內容:在處理數據前,先學會與雜訊共舞。

如下程式碼是之前寫的,可以參考看看做了那些處理 因為有時候會出現收盤價大於最高價,或者收盤價小於最低價,而且還誤差大於10%的情況發生等等異常報酬情況發生。
也可以把程式碼直接餵AI分析後整合到自己的程式碼中,這樣就快多,不用再慢慢try。

因為字數限制的關係 如下程式碼會分兩段

# -*- coding: utf-8 -*-

"""

K → weekK/monthK/yearK.parquet(W-FRI + 玩股口徑 + 本地快取 + 7 QA

+ 包含所有修正與優化 (過濾/統計/QA輸出)

"""



import os, re, glob, time, warnings, random

from math import isfinite

from pathlib import Path

from concurrent.futures import ThreadPoolExecutor, as_completed

import sys # 用於 Colab 環境檢查



import numpy as np

import pandas as pd

import pyarrow as pa

import pyarrow.parquet as pq

from shutil import copy2



# ===== 基本設定 =====

warnings.filterwarnings("ignore", category=DeprecationWarning)

warnings.filterwarnings("ignore", category=FutureWarning)

warnings.filterwarnings("ignore")



WRITE_MODE = 'B'            # 'A' / 'B' / 'C'



# 🌟 優化 1: Row Group 翻倍或四倍

ROW_GROUP_SIZE = 512_000    # 建議 512,000

# 🌟 優化 2: 增加執行緒數,最大化 CPU 利用率

MAX_WORKERS = 16            # 建議 16 或更高

# 🌟 優化 3: 顯著加大批次合併量,減少 I/O 寫入次數

SHARD_SIZE = 10000          # 建議 8,000 - 15,000 之間



# 🌟 快取控制 (保留快取,下次執行跳過複製)

KEEP_LOCAL_CACHE = True



MARKET_LIST = [

     "tw-share"

    # "us-share", "hk-share", "jp-share", "kr-share", "cn-share",

]



MARKET_ABBR_MAP = {

    "tw-share": "TW", "us-share": "US", "cn-share": "CN",

    "hk-share": "HK", "jp-share": "JP", "kr-share": "KR"

}



# 根據您的需求,假設 DRIVE_BASELOCAL_BASE 如下

try:

    # 檢查是否在 Colab 環境中

    if 'google.colab' in sys.modules:

        from google.colab import drive

        try:

            # 避免重複 mount 警告

            drive.mount('/content/drive', force_remount=False)

        except Exception:

            pass

        DRIVE_BASE = "/content/drive/MyDrive/各國股票檔案"

    else:

        DRIVE_BASE = os.path.abspath("./data_drive")

except Exception:

    DRIVE_BASE = os.path.abspath("./data_drive")



LOCAL_BASE = "/content/_wmy_tmp"

LOCAL_DAYK_CACHE = f"{LOCAL_BASE}/dayK_cache"   # 本地日K快取(保留)



# ===== 範圍限制(只處理 2024-01-01 ~ 2025-12-31=====

DATE_START_STR = "2000-01-01"

DATE_END_STR   = "2025-12-31"

PAD_DAYS       = 14    # 緩衝天數:保留起始日前 N 天,讓第一期有前收可算



DATE_START = pd.Timestamp(DATE_START_STR)

DATE_END   = pd.Timestamp(DATE_END_STR)

DATE_PAD_START = DATE_START - pd.Timedelta(days=PAD_DAYS)



# ===== 清洗與偵測參數 =====

MIN_YEAR = 1990

ALLOW_ZERO_VOLUME = True



CHECK_RETURN_D = True

# [***修正點 A***] 將日 K 極端報酬門檻調高,確保極端跳空日不會被日 K 預先刪除。

EXTREME_RET_GAP = 100.00

EXTREME_RET_CANDLE = 100.00

EXTREME_RET_MAX_FROMP = 100.00

EXTREME_RET_MIN_FROMP = 100.00

ABS_MAX_REASONABLE = 100.00



# 週級安全閥:上一週無交易且本週大跳空 → 視為拆分/復牌,不計報酬

STOPWEEK_JUMP_THRESHOLD = 0.80   # 80%



# ====== 復牌偵測參數(NEW======

LONG_GAP_DAYS = 5        # 與前一筆有效交易日相隔 ≥ N 天 → 疑似停牌/長假

RESUME_JUMP_THRESHOLD = 0.30     # 復牌第一天:|開盤/前收 - 1|30%

GHOST_MIN_LEN = 2        # 連續 ghost(四價相等且零量)至少幾天算占位段

PRE_RESUME_DAYS = 5      # 在 ghost 段「前面」最多回溯幾天,若大跳空也視為復牌頭

EPS_EQ = 1e-8            # 四價相等容差



SKIP_SUSPECT_STOCK = True

PINGPONG_THRESHOLD = 0.40



# [***新增 M/Y 門檻***] 穩健過濾參數

MIN_DAYS_MONTH = 5      # 月K:至少要有 5 個交易日

MIN_DAYS_YEAR = 120      # 年K:至少要有 120 個交易日 (約半年)

BIG_RET_MONTH_YEAR = 0.50 # 月/年報酬 |Ret_Trad| > 50% 且 HasResume/薄樣本 → 過濾

GAP_BIG_MONTH_YEAR = 0.50 # 月/年跳空 |Ret_Gap| > 50% 且 HasResume/薄樣本 → 過濾



# 欄位候選

DATE_COLS  = ['date','日期','Date']

OPEN_COLS  = ['open','開盤','開盤價','Open']

HIGH_COLS  = ['high','最高','最高價','High']

LOW_COLS   = ['low','低','最低價','Low']

CLOSE_COLS = ['close','收盤','收盤價','Adj Close','adj close','Close']

VOL_COLS   = ['volume','成交量','Volume']

SPLIT_COLS = ['Stock Splits','stock splits','Splits']

DIV_COLS   = ['Dividends','dividends']



def _is_equal4(o, h, l, c, eps=EPS_EQ):

    mx = max(o, h, l, c); mn = min(o, h, l, c)

    return (mx - mn) <= eps



def _pick(df, cands):

    low = {str(c).lower(): c for c in df.columns}

    for k in cands:

        if k.lower() in low:

            return low[k.lower()]

    for k in cands:

        rx = re.compile(k, re.I)

        for c in df.columns:

            if rx.search(str(c)):

                return c

    return None



def _read_csv_fast(path: str) -> pd.DataFrame:

    try:

        return pd.read_csv(path, encoding='utf-8-sig', engine='pyarrow')

    except Exception:

        try:

            return pd.read_csv(path, encoding='utf-8', engine='pyarrow')

        except Exception:

            try:

                return pd.read_csv(path, encoding='utf-8-sig')

            except Exception:

                return pd.read_csv(path, encoding='utf-8')



# [修正] 增加邏輯,當檔名包含 '_' 時,只取左側部分作為 StockID,右側作為 Name

def _parse_id_name(stem: str):

    """

    自訂邏輯:

    1. 若檔名主體包含 '_' (: 1305.TW_華夏),則 ID 取左側 (1305.TW),Name 取右側。

    2. 若不包含 '_' (: AAPL),則 ID 取完整 stem,Name 為空。

    """

    if '_' in stem:

        # 將 stem ('1305.TW_華夏') 分割成 ID 和 Name

        id_part, name_part = stem.rsplit('_', 1)

        # 台灣市場 StockID 只取 '_' 左邊的部分 (1305.TW)

        return id_part, name_part



    # 其他國家或無名稱後綴的,整個 stem 就是 ID

    return stem, ""



# [修正] 確保最終 StockID 不帶後綴 .csv

def _canonical_id(raw_id: str) -> str:

    """

    直接使用 raw_id 作為最終的 StockID,保留所有後綴 (.TW, .HK, .KS)

    """

    # 由於 _parse_id_name 已經移除了 .csv,這裡只需返回 raw_id

    # 確保不會誤將名稱(如 '華夏')當作 ID

    return raw_id



def _prepare_local_dayk(market_key: str) -> str:

    drive_dayk_dir = Path(f"{DRIVE_BASE}/{market_key}/dayK")



    local_dayk_dir = Path(f"{LOCAL_DAYK_CACHE}/{market_key}")

    local_dayk_dir.mkdir(parents=True, exist_ok=True)



    csv_files = sorted(glob.glob(str(drive_dayk_dir / "*.csv")))

    if not csv_files:

        print(f"❌ 快取失敗: Drive 上的 {drive_dayk_dir} 找不到任何 CSV 檔案。")

        return ""



    def _copy_if_newer(src: str, dst_dir: Path):

        dst = dst_dir / Path(src).name

        try:

            if not dst.exists():

                copy2(src, dst)

            else:

                s, d = Path(src).stat(), dst.stat()

                if (s.st_mtime > d.st_mtime) or (s.st_size != d.st_size):

                    copy2(src, dst)

        except Exception as e:

            print(f"⚠️ 快取檔案失敗:{src} -> {dst} ({e})")



    t_start = time.time()

    print(f"\n🚀 增量快取 {len(csv_files)} 檔日K CSV 到本地:{local_dayk_dir}")

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:

        list(as_completed([executor.submit(_copy_if_newer, f, local_dayk_dir) for f in csv_files]))

    print(f"✅ 快取完成!耗時 {time.time() - t_start:.2f} 秒。路徑: {local_dayk_dir}")

    return str(local_dayk_dir)



def _basic_price_checks(df: pd.DataFrame, allow_zero_vol=ALLOW_ZERO_VOLUME) -> pd.DataFrame:

    df = df.copy()

    for c in ['開盤','最高','最低','收盤','成交量']:

        if c in df.columns:

            df[c] = pd.to_numeric(df[c], errors='coerce')

    df = df[(df['開盤'] > 0) & (df['最高'] > 0) & (df['最低'] > 0) & (df['收盤'] > 0)]

    df = df[(df['最高'] >= df[['開盤','收盤','最低']].max(axis=1)) &

            (df['最低'] <= df[['開盤','收盤','最高']].min(axis=1)) &

            (df['最高'] >= df['最低'])]

    if not allow_zero_vol and '成交量' in df.columns:

        df = df[df['成交量'] > 0]

    # [***修正點 1/3:這裡暫不處理時區***]

    df['日期'] = pd.to_datetime(df['日期'], errors='coerce').dt.tz_localize(None)

    df = df.dropna(subset=['日期'])

    df = df[df['日期'].dt.year >= MIN_YEAR]

    df = df.sort_values('日期').drop_duplicates(subset=['日期']).reset_index(drop=True)

    return df



def _extreme_return_filter_day(df: pd.DataFrame) -> pd.DataFrame:

    if not CHECK_RETURN_D:

        return df

    df = df.copy()

    prev_c = df['收盤'].shift(1).replace(0, np.nan)

    o_safe = df['開盤'].replace(0, np.nan)

    ret_gap = (o_safe / prev_c) - 1

    ret_candle = (df['收盤'] / o_safe) - 1

    ret_max_p = (df['最高'] / prev_c) - 1

    ret_min_p = (df['最低'] / prev_c) - 1

    mask_ok = (

        (ret_gap.abs() < EXTREME_RET_GAP) &

        (ret_candle.abs() < EXTREME_RET_CANDLE) &

        (ret_max_p.abs() < EXTREME_RET_MAX_FROMP) &

        (ret_min_p.abs() < EXTREME_RET_MIN_FROMP) &

        (ret_gap.abs() <= ABS_MAX_REASONABLE) &

        (ret_candle.abs() <= ABS_MAX_REASONABLE) &

        (ret_max_p.abs() <= ABS_MAX_REASONABLE) &

        (ret_min_p.abs() <= ABS_MAX_REASONABLE)

    )

    return df[mask_ok.fillna(False)].copy()



def _clip_by_date_range(df: pd.DataFrame, col='日期',

                         start=DATE_START, end=DATE_END, pad_start=DATE_PAD_START):

    """裁成 2024–2025;起始保留 pad_start 以便第一期有前收。"""

    if df.empty or col not in df.columns:

        return df

    m = (df[col] >= pad_start) & (df[col] <= end)

    return df.loc[m].copy()



# [✅ 修正過並向量化]

def _mark_resume_flags_with_ghost(df_orig: pd.DataFrame) -> pd.Series:

    """在『未刪除 ghost』表上標記復牌日。(向量化優化)"""

    if df_orig.empty:

        return pd.Series(False, index=df_orig.index)



    df = df_orig.copy()



    # 🌟 向量化檢查四價相等

    o, h, l, c = df['開盤'].to_numpy(), df['最高'].to_numpy(), df['最低'].to_numpy(), df['收盤'].to_numpy()



    # 檢查 NaN/Inf 且四價相等

    eq_ohlc_vec = np.isfinite(o) & np.isfinite(h) & np.isfinite(l) & np.isfinite(c) & \

                  ((np.maximum.reduce([o, h, l, c]) - np.minimum.reduce([o, h, l, c])) <= EPS_EQ)



    ghost_mask = (df['成交量'].fillna(0) == 0) & eq_ohlc_vec



    prev_valid_close = df['收盤'].where(~ghost_mask).shift(1).ffill()

    prev_valid_date  = df['日期'].where(~ghost_mask).shift(1).ffill()



    gap_days = (df['日期'] - prev_valid_date).dt.days

    open_to_prev = (df['開盤'] / prev_valid_close) - 1

    cond_A = (gap_days >= LONG_GAP_DAYS) & (open_to_prev.abs() >= RESUME_JUMP_THRESHOLD)



    g = ghost_mask.to_numpy(); n = len(df)

    resume_idx = set(np.where(cond_A)[0].tolist())



    i = 0

    while i < n:

        if g[i]:

            j = i

            while j < n and g[j]:

                j += 1

            seg_len = j - i

            if seg_len >= GHOST_MIN_LEN:

                for back in range(1, PRE_RESUME_DAYS + 1):

                    k = i - back

                    if k < 0: break

                    if (not g[k]) and (abs(open_to_prev.iloc[k]) >= RESUME_JUMP_THRESHOLD):

                        resume_idx.add(k)

            i = j

        else:

            i += 1



    s = pd.Series(False, index=df.index)

    if resume_idx:

        s.iloc[sorted(list(resume_idx))] = True

    return s



# [✅ 修正過並向量化]

def _load_day_clean_full(path: str) -> pd.DataFrame:

    """正規欄位對齊 → 標記復牌 → 刪 ghost → 價量檢查 → 極端過濾"""

    raw = _read_csv_fast(path)



    dc = _pick(raw, DATE_COLS)

    oc = _pick(raw, OPEN_COLS)

    hc = _pick(raw, HIGH_COLS)

    lc = _pick(raw, LOW_COLS)

    cc = _pick(raw, CLOSE_COLS)

    vc = _pick(raw, VOL_COLS)

    sc = _pick(raw, SPLIT_COLS)  # 可能不存在



    if not all([dc, oc, hc, lc, cc]):

        raise ValueError(f"缺必要欄位:{Path(path).name}")

    if vc is None:

        raw['__vol__'] = np.nan

        vc = '__vol__'



    df0 = raw[[dc, oc, hc, lc, cc, vc]].copy()

    df0.columns = ['日期','開盤','最高','最低','收盤','成交量']

    for c in ['開盤','最高','最低','收盤','成交量']:

        df0[c] = pd.to_numeric(df0[c], errors='coerce')



    # [***修正 1:時區邊界錯誤修正***]

    df0['日期'] = pd.to_datetime(df0['日期'], errors='coerce')

    if df0['日期'].dt.tz is not None:

        df0['日期'] = df0['日期'].dt.tz_convert('Asia/Taipei').dt.tz_localize(None)

    else:

        df0['日期'] = df0['日期'].dt.tz_localize(None)

    # [***修正 1 結束***]



    resume_flag = _mark_resume_flags_with_ghost(df0)



    # 🌟 配合向量化邏輯

    o, h, l, c = df0['開盤'].to_numpy(), df0['最高'].to_numpy(), df0['最低'].to_numpy(), df0['收盤'].to_numpy()

    eq_ohlc = np.isfinite(o) & np.isfinite(h) & np.isfinite(l) & np.isfinite(c) & \

              ((np.maximum.reduce([o, h, l, c]) - np.minimum.reduce([o, h, l, c])) <= EPS_EQ)



    ghost_mask = (df0['成交量'].fillna(0) == 0) & eq_ohlc



    if sc is not None and sc in raw.columns:

        splits = pd.to_numeric(raw[sc], errors='coerce').fillna(0)

        ghost_mask = ghost_mask | ((df0['成交量'].fillna(0) == 0) & (splits > 0))



    df = df0.loc[~ghost_mask].copy()

    df['resume_flag'] = resume_flag.loc[df.index]



    # === [***最終修正點 B***] 強化日 K 復牌標記:捕捉未能被 Ghost 邏輯捕捉的極端跳空 ===

    DAILY_JUMP_THRESHOLD = 1.00 # 100% 日報酬

    prev_c_day = df['收盤'].shift(1).replace(0, np.nan)

    daily_ret = (df['收盤'] / prev_c_day) - 1

    extreme_daily_jump_mask = daily_ret.abs().gt(DAILY_JUMP_THRESHOLD).fillna(False)

    df['resume_flag'] = df['resume_flag'] | extreme_daily_jump_mask

    # ====================================================================================



    df = _basic_price_checks(df)

    if df.empty:

        raise ValueError("清洗後為空")



    df = _extreme_return_filter_day(df)

    if df.empty:

        raise ValueError("極端報酬過濾後為空")



    return df



def _resample_ohlc_with_flags(df: pd.DataFrame, rule: str) -> pd.DataFrame:

    df = df.copy()



    # [***修正 2:移除多餘的 tz_localize(None)***]

    df['日期'] = pd.to_datetime(df['日期'], errors='coerce')

    g = df.set_index('日期').sort_index()



    if rule == 'W':  # 以週五結算(與玩股網一致)

        rule_code, label, closed = 'W-FRI', 'right', 'right'

        period_name = 'Week'

    elif rule == 'M':

        rule_code, label, closed = 'ME', 'right', 'right'

        period_name = 'Period'

    elif rule == 'Y':

        rule_code, label, closed = 'YE', 'right', 'right'

        period_name = 'Period'

    else:

        raise ValueError("rule must be 'W', 'M', or 'Y'.")



    o = g['開盤'].resample(rule_code, label=label, closed=closed).first()

    h = g['最高'].resample(rule_code, label=label, closed=closed).max()

    l = g['最低'].resample(rule_code, label=label, closed=closed).min()

    c = g['收盤'].resample(rule_code, label=label, closed=closed).last()

    v = g['成交量'].resample(rule_code, label=label, closed=closed).sum(min_count=1)



    # 期間交易天數

    n = g['收盤'].resample(rule_code, label=label, closed=closed).count().rename(f'Cur{period_name}_Days')



    # 期間是否發生過 resume_flag

    has_resume = None

    if 'resume_flag' in g.columns:

        has_resume = g['resume_flag'].resample(rule_code, label=label, closed=closed).max().rename('HasResume')



    parts = [o, h, l, c, v, n]

    if has_resume is not None:

        parts.append(has_resume)



    out = pd.concat(parts, axis=1).dropna(subset=['開盤','收盤'])

    cols = ['開盤','最高','最低','收盤','成交量', f'Cur{period_name}_Days']

    if has_resume is not None:

        cols.append('HasResume')

    out.columns = cols



    # 上期交易天數

    out[f'Prev{period_name}_Days'] = out[f'Cur{period_name}_Days'].shift(1).fillna(0).astype(int)



    if rule == 'W':

        iso = out.index.to_series().dt.isocalendar()

        out['ISO_Week'] = iso.year.astype(str) + '-' + iso.week.astype(str).str.zfill(2)

        out = out.rename(columns={'CurWeek_Days': 'CurWeek_Days', 'PrevWeek_Days': 'PrevWeek_Days'})



    out = out.reset_index().rename(columns={'index':'日期'})

    return _basic_price_checks(out)



def _add_period_returns(df: pd.DataFrame, freq_code: str) -> pd.DataFrame:

    df = df.sort_values('日期').reset_index(drop=True).copy()

    prev_c = df['收盤'].shift(1).replace(0, np.nan)

    o = df['開盤'].replace(0, np.nan)

    h = df['最高'].replace(0, np.nan)

    l = df['最低'].replace(0, np.nan)

    c = df['收盤'].replace(0, np.nan)



    # 報酬率計算 (與您原代碼一致)

    df[f'PrevC_{freq_code}']          = prev_c

    df[f'Ret_Gap_{freq_code}']        = (o / prev_c) - 1

    df[f'Ret_Trad_{freq_code}']       = (c - prev_c) / prev_c

    df[f'Ret_C_{freq_code}']          = (c - o) / o

    df[f'Ret_Max_H_{freq_code}']      = (h - prev_c) / prev_c

    df[f'Ret_Min_L_{freq_code}']      = (l - prev_c) / prev_c

    df[f'Range_{freq_code}']          = (h - l) / o

    df[f'Ret_Max_H_Pos_{freq_code}']  = df[f'Ret_Max_H_{freq_code}'].clip(lower=0)

    df[f'Ret_H_from_O_{freq_code}']   = (h - o) / o

    df[f'Ret_L_from_O_{freq_code}']   = (l - o) / o

    df[f'Ret_End_RelH_{freq_code}']   = (c - h) / h

    df[f'Ret_End_RelL_{freq_code}']   = (c - l) / l



    # --- 過濾邏輯 ---

    final_clear_mask = pd.Series(False, index=df.index)

    cols_to_nan = [f'Ret_Trad_{freq_code}', f'Ret_Max_H_{freq_code}', f'Ret_Min_L_{freq_code}']



    # 預設 IsFiltered_QA 為 0 (M/Y 需要)

    if freq_code in ('M', 'Y'):

        df['IsFiltered_QA'] = 0



    if freq_code == 'W' and 'PrevWeek_Days' in df.columns:

        # 1.K 玩股口徑過濾:上一週無交易且本週大跳空

        big_jump = (df[f'PrevC_W'] > 0) & (df['開盤'] > 0) & ((df['開盤'] / df[f'PrevC_W'] - 1).abs() > STOPWEEK_JUMP_THRESHOLD)

        prev_w_no_trade = (df['PrevWeek_Days'] == 0)

        jump_mask = big_jump & prev_w_no_trade

        final_clear_mask = final_clear_mask | jump_mask



        # 2.K HasResume + 極端事件過濾 ([***最終修正點 C***])

        EXTREME_RET_USER_REQ = 1.00 # 100%

        BIG_RET_THRESHOLD = 0.50

        has_resume_mask = df.get('HasResume', pd.Series(False, index=df.index)).fillna(0).gt(0)

        is_extreme_event_current_week = has_resume_mask & df[f'Ret_Trad_W'].abs().gt(EXTREME_RET_USER_REQ)

        skip_current_week_from_previous_event = is_extreme_event_current_week.shift(1).fillna(False)

        mask_thin_or_large_jump = has_resume_mask & (df['CurWeek_Days'].lt(3) | df[f'Ret_Trad_W'].abs().gt(BIG_RET_THRESHOLD))

        final_clear_mask = final_clear_mask | is_extreme_event_current_week | skip_current_week_from_previous_event | mask_thin_or_large_jump



    elif freq_code in ('M', 'Y') and 'HasResume' in df.columns:

        # 3./K 穩健過濾:薄樣本 + 停牌復牌 + 大跳空

        period_name = 'Period'



        # [***修正 3:修正 Days 欄位名稱判斷***]

        days_col = 'CurPeriod_Days' if 'CurPeriod_Days' in df.columns else f'Cur{period_name}_Days'

        prev_days_col = 'PrevPeriod_Days' if 'PrevPeriod_Days' in df.columns else f'Prev{period_name}_Days'



        min_days = MIN_DAYS_MONTH if freq_code == 'M' else MIN_DAYS_YEAR



        # 條件 A: 覆蓋率不足 (新掛牌或殘年)

        small_coverage_mask = df[days_col].fillna(0).astype(int) < min_days



        # 條件 B: 停牌/復牌週期極端值

        has_resume = df['HasResume'].fillna(0).gt(0)

        thin_mask  = df[days_col].fillna(0).astype(int) < 3 # 極薄樣本 (3日以下)

        big_gap_mask   = df[f'Ret_Gap_{freq_code}'].abs() > GAP_BIG_MONTH_YEAR

        big_ret_mask   = df[f'Ret_Trad_{freq_code}'].abs() > BIG_RET_MONTH_YEAR

        resume_extreme_mask = has_resume & (thin_mask | big_ret_mask | big_gap_mask)



        # 條件 C: 上期無交易 + 本期大跳空 (類似週K的玩股口徑)

        stop_jump_mask = (df[prev_days_col].fillna(0).astype(int) == 0) & (df[f'Ret_Gap_{freq_code}'].abs() > STOPWEEK_JUMP_THRESHOLD)



        final_clear_mask = final_clear_mask | small_coverage_mask | stop_jump_mask | resume_extreme_mask



        # 🎯 [***新增 QA 統計***]:標記被過濾的行

        df.loc[final_clear_mask, 'IsFiltered_QA'] = 1



    # 應用過濾:將滿足過濾條件的報酬設為 NaN

    if final_clear_mask.any():

        df.loc[final_clear_mask, cols_to_nan] = np.nan



    return df

class ParquetStreamer:

    def __init__(self, path: str, keep_cols=None, row_group_size: int = ROW_GROUP_SIZE):

        self.path = path

        self.writer = None

        self.schema = None

        self.rows = 0

        self.keep_cols = keep_cols

        self.row_group_size = row_group_size



    def _ensure_writer(self, table: pa.Table):

        if self.writer is None:

            Path(self.path).parent.mkdir(parents=True, exist_ok=True)

            self.schema = table.schema

            self.writer = pq.ParquetWriter(self.path, self.schema, compression='snappy')



    def append_df(self, df: pd.DataFrame):

        if df is None or df.empty:

            return

        df = df.copy()

        if self.keep_cols is not None:

            for c in self.keep_cols:

                if c not in df.columns:

                    df[c] = np.nan

            df = df.reindex(columns=self.keep_cols)

        if '日期' in df.columns:

            df['日期'] = pd.to_datetime(df['日期'], errors='coerce').dt.tz_localize(None)



        # 確保 IsFiltered_QA 欄位是整數

        if 'IsFiltered_QA' in df.columns:

            df['IsFiltered_QA'] = pd.to_numeric(df['IsFiltered_QA'], errors='coerce').fillna(0).astype('int32')



        # 🚀 【修正:強制 StockID 為字串】確保前導零保留

        for c in df.columns:

            if c == 'StockID':

                df[c] = df[c].astype(str)

                continue # 跳過原有的數值轉換邏輯



            if df[c].dtype in ['int64','float64','int32','float32'] and c not in ['ISO_Week', 'IsFiltered_QA']:

                df[c] = pd.to_numeric(df[c], errors='coerce').astype(np.float64)

        # 【修正結束】



        table = pa.Table.from_pandas(df, preserve_index=False)

        self._ensure_writer(table)

        if table.schema != self.schema:

            table = pa.Table.from_pandas(df.reindex(columns=[f.name for f in self.schema]), preserve_index=False).cast(self.schema, safe=False)

        self.writer.write_table(table, row_group_size=self.row_group_size)

        self.rows += len(df)



    def close(self):

        if self.writer is not None:

            try:

                self.writer.close()

            except:

                pass
留言
avatar-img
《炒股不看周月年K漲幅機率就是耍流氓》
16會員
290內容數
普通上班族,用 AI 與 Python 將炒股量化。我的數據宣言是:《炒股不做量化,都是在耍流氓》。
2025/11/12
我在跑台股 yearK 統計時,發現一筆堪稱「驚悚」的異常: 某檔股票的收盤價出現六位數(30 萬元)以上! 這篇不只要解剖這筆異常,更要講清楚—— 其實,這不是孤例。 任何減資、拆股、反向分割的股票,都可能出現同樣錯位。 我們會一起看真實案例、驗證證據, 並展示一整套能
Thumbnail
2025/11/12
我在跑台股 yearK 統計時,發現一筆堪稱「驚悚」的異常: 某檔股票的收盤價出現六位數(30 萬元)以上! 這篇不只要解剖這筆異常,更要講清楚—— 其實,這不是孤例。 任何減資、拆股、反向分割的股票,都可能出現同樣錯位。 我們會一起看真實案例、驗證證據, 並展示一整套能
Thumbnail
2025/11/09
簡單來說,我們用自己寫的程式把日K資料轉成月K,然後篩出2020-2025年間所有月漲幅超過100%的股票清單。 為什麼要做這個檢查 因為轉資料的程式很複雜: 要處理除權息 要過濾異常值 要偵測停牌復牌 要處理時區問題 怕哪個環節出錯,把沒漲100%的股票也算進來,或者該算的沒算到
2025/11/09
簡單來說,我們用自己寫的程式把日K資料轉成月K,然後篩出2020-2025年間所有月漲幅超過100%的股票清單。 為什麼要做這個檢查 因為轉資料的程式很複雜: 要處理除權息 要過濾異常值 要偵測停牌復牌 要處理時區問題 怕哪個環節出錯,把沒漲100%的股票也算進來,或者該算的沒算到
2025/11/09
由於方格子單篇文章有字數限制,因此本次完整的日K → 週/月/年K 清洗與報酬計算程式碼,將分成兩篇文章提供: def detect_pingpong_patterns(day_df: pd.DataFrame, price_col='收盤', threshold=0.4) -> pd.D
2025/11/09
由於方格子單篇文章有字數限制,因此本次完整的日K → 週/月/年K 清洗與報酬計算程式碼,將分成兩篇文章提供: def detect_pingpong_patterns(day_df: pd.DataFrame, price_col='收盤', threshold=0.4) -> pd.D
看更多
你可能也想看
Thumbnail
從 JavaScript 到 Python
Thumbnail
從 JavaScript 到 Python
Thumbnail
在 Python 自動化領域中,pyautogui 是非常常用的工具,可以幫你模擬滑鼠與鍵盤操作。 這篇帶你快速掌握: ✔ 滑鼠移動 ✔ 點擊操作 ✔ 拖曳 ✔ 滾動
Thumbnail
在 Python 自動化領域中,pyautogui 是非常常用的工具,可以幫你模擬滑鼠與鍵盤操作。 這篇帶你快速掌握: ✔ 滑鼠移動 ✔ 點擊操作 ✔ 拖曳 ✔ 滾動
Thumbnail
《轉轉生》(Re:INCARNATION)為奈及利亞編舞家庫德斯.奧尼奎庫與 Q 舞團創作的當代舞蹈作品,結合拉各斯街頭節奏、Afrobeat/Afrobeats、以及約魯巴宇宙觀的非線性時間,建構出關於輪迴的「誕生—死亡—重生」儀式結構。本文將從約魯巴哲學概念出發,解析其去殖民的身體政治。
Thumbnail
《轉轉生》(Re:INCARNATION)為奈及利亞編舞家庫德斯.奧尼奎庫與 Q 舞團創作的當代舞蹈作品,結合拉各斯街頭節奏、Afrobeat/Afrobeats、以及約魯巴宇宙觀的非線性時間,建構出關於輪迴的「誕生—死亡—重生」儀式結構。本文將從約魯巴哲學概念出發,解析其去殖民的身體政治。
Thumbnail
這是一場修復文化與重建精神的儀式,觀眾不需要完全看懂《遊林驚夢:巧遇Hagay》,但你能感受心與土地團聚的渴望,也不急著在此處釐清或定義什麼,但你的在場感受,就是一條線索,關於如何找著自己的路徑、自己的聲音。
Thumbnail
這是一場修復文化與重建精神的儀式,觀眾不需要完全看懂《遊林驚夢:巧遇Hagay》,但你能感受心與土地團聚的渴望,也不急著在此處釐清或定義什麼,但你的在場感受,就是一條線索,關於如何找著自己的路徑、自己的聲音。
Thumbnail
背景:從冷門配角到市場主線,算力與電力被重新定價   小P從2008進入股市,每一個時期的投資亮點都不同,記得2009蘋果手機剛上市,當時蘋果只要在媒體上提到哪一間供應鏈,隔天股價就有驚人的表現,當時光學鏡頭非常熱門,因為手機第一次搭上鏡頭可以拍照,也造就傳統相機廠的殞落,如今手機已經全面普及,題
Thumbnail
背景:從冷門配角到市場主線,算力與電力被重新定價   小P從2008進入股市,每一個時期的投資亮點都不同,記得2009蘋果手機剛上市,當時蘋果只要在媒體上提到哪一間供應鏈,隔天股價就有驚人的表現,當時光學鏡頭非常熱門,因為手機第一次搭上鏡頭可以拍照,也造就傳統相機廠的殞落,如今手機已經全面普及,題
Thumbnail
isalnum() : 檢測字串是否由字母和數字組成 isalpha() : 檢測字串是否只由字母組成 isalnum() str1 = "abc123" print(str1.isalnum()) str2 = "pop.cat" print(str2.isalnum()) str3
Thumbnail
isalnum() : 檢測字串是否由字母和數字組成 isalpha() : 檢測字串是否只由字母組成 isalnum() str1 = "abc123" print(str1.isalnum()) str2 = "pop.cat" print(str2.isalnum()) str3
Thumbnail
在做自動化工具或測試腳本時,常常會需要操作視窗,例如: 👉 找到特定程式 👉 切換視窗 👉 自動化點擊操作 這時候,pyautogui 是一個很好上手的工具。
Thumbnail
在做自動化工具或測試腳本時,常常會需要操作視窗,例如: 👉 找到特定程式 👉 切換視窗 👉 自動化點擊操作 這時候,pyautogui 是一個很好上手的工具。
Thumbnail
本文分析導演巴里・柯斯基(Barrie Kosky)如何運用極簡的舞臺配置,將布萊希特(Bertolt Brecht)的「疏離效果」轉化為視覺奇觀與黑色幽默,探討《三便士歌劇》在當代劇場中的新詮釋,並藉由舞臺、燈光、服裝、音樂等多方面,分析該作如何在保留批判核心的同時,觸及觀眾的觀看位置與人性幽微。
Thumbnail
本文分析導演巴里・柯斯基(Barrie Kosky)如何運用極簡的舞臺配置,將布萊希特(Bertolt Brecht)的「疏離效果」轉化為視覺奇觀與黑色幽默,探討《三便士歌劇》在當代劇場中的新詮釋,並藉由舞臺、燈光、服裝、音樂等多方面,分析該作如何在保留批判核心的同時,觸及觀眾的觀看位置與人性幽微。
追蹤感興趣的內容從 Google News 追蹤更多 vocus 的最新精選內容追蹤 Google News