《如何打造六國股市資料擷取系統:》第2篇|美國篇:下戴股票清單轉換與Yahoo Finance日K下載模組

更新 發佈閱讀 63 分鐘
投資理財內容聲明

美股股票清單抓取工具 - 功能分析

📋 程式概述

這是一個從官方來源自動抓取並篩選美股高品質普通股清單的 Python 工具。它會從 NASDAQ 和 NYSE 官方網站下載最新資料,並進行智能分類與過濾。

🎯 核心功能

1. 資料來源

  • NASDAQ 市場:從 nasdaqtrader.com 官方 API 抓取
  • NYSE 市場:同樣從官方 API 取得完整列表
  • 確保資料來源權威且即時更新

2. 智能分類系統

| 類別           | 說明                     | 範例                     |
|----------------|--------------------------|--------------------------|
| ETF/ETN | 交易所交易基金 | SPY, QQQ |
| Common Stock | 普通股 | AAPL, MSFT |
| ADR/Foreign | 美國存託憑證 / 海外股票 | BABA, TSM |
| Preferred | 特別股 / 優先股證券 | |
| Warrants | 認股權證 / 衍生性商品 | |
| Units | 單位證券 / SPAC 單位 | |
| Rights | 認購權 / 股東認購權 | |
| Trust/Fund/LP | 信託 / 基金 / 合夥 | REITs, MLP |

3. 高品質篩選條件

  • NASDAQ 篩選規則:只保留 Q 板塊(全球精選市場)和 G 板塊(全球市場)僅選取 Common Stock(普通股)排除測試證券
  • NYSE 篩選規則:只保留 Exchange='N'(NYSE 主板)僅選取 Common Stock(普通股)排除測試證券

📊 執行結果解讀

原始資料統計


### 📊 NASDAQ 原始資料分類

| 類型 | 數量 | 百分比 |
|----------------|----------|----------|
| Common Stock | 2,991| 57.9% |
| ETF/ETN | 1,036| 20.1% |
| Warrants | 347| 6.7% |
| 其他類型 | 788| 15.3% |
| **總計** | 5,162| |

---

### 📊 NYSE 原始資料分類

| 類型 | 數量 | 百分比 |
|----------------|----------|----------|
| ETF/ETN | 3,624| 53.6% |
| Common Stock | 1,899| 28.1% |
| Preferred | 326| 4.8% |
| 其他類型 | 916| 13.5% |
| **總計** | 6,765| |

最終篩選成果

| 指標項目         | 數值說明                                |
|------------------|-----------------------------------------|
| 原始總項目 | 11,927|
| 最終保留 | 3,411 檔高品質普通股 |
| 過濾率 | 71.4% |
| NASDAQ 保留率 | 33.1% (1,710 / 5,162) |
| NYSE 保留率 | 25.1% (1,701 / 6,765) |

💡 技術特點

1. 資料清洗

  • 自動移除測試證券(Test Issue)
  • 標準化股票代碼格式(大寫、去空格)
  • 使用 CQS Symbol 作為 NYSE 標準代碼

2. 去重機制

  • 合併 NASDAQ 與 NYSE 清單時
  • 以 NASDAQ 資料為優先(keep='first')
  • 確保每個股票代碼唯一

3. 全域變數設計

global us_list
us_list = [] # 儲存最終結果

🔧 使用範例

# 執行主程式
result = main()

# 查看清單內容
print(f"共抓取 {len(us_list)} 檔股票")
print(us_list[:5]) # 查看前 5

輸出格式

('AAPL', 'Apple Inc. - Common Stock')
('MSFT', 'Microsoft Corporation - Common Stock')
('GOOGL', 'Alphabet Inc. - Class A Common Stock')

⚙️ 依賴套件

pip install pandas requests

🎓 適用場景

  • 量化交易系統:建立交易標的池
  • 資料分析:美股市場研究
  • 投資組合管理:股票清單維護
  • 自動化監控:定期更新可交易清單

📝 輸出資料範例

5筆範例:
('AACB', 'Artius II Acquisition Inc. - Class A Ordinary Shares')
('AAL', 'American Airlines Group, Inc. - Common Stock')
('AAME', 'Atlantic American Corporation - Common Stock')
('AAOI', 'Applied Optoelectronics, Inc. - Common Stock')
('AAON', 'AAON, Inc. - Common Stock')


✅ 官方資料來源 - 確保準確性 ✅ 智能分類 - 自動識別證券類型 ✅ 嚴格篩選 - 只保留高品質普通股 ✅ 即時更新 - 每次執行獲取最新資料 ✅ 詳細統計 - 完整的過濾報表 ✅ 易於整合 - 可直接用於其他專案

🔍 注意事項

  • 需要網路連線以存取官方 API
  • 資料會隨市場變動而更新
  • 建議定期執行以維持清單最新
  • 過濾條件可根據需求調整

🧩 完整程式碼

# -*- coding: utf-8 -*-

"""

✅ 升級版:抓取 NASDAQ + NYSE 清單 + 詳細統計報表 (已移除檔案輸出,改為全域變數)

   - 分類統計:ETF / ADR / Units / Warrants / Common Stock / Others

   - 顯示過濾前後數量

"""



import pandas as pd

import requests

from collections import Counter



# 宣告全域變數,確保清單可以在 main() 外部存取

global us_list

us_list = []



def clean_symbol(sym: str) -> str:

    return sym.strip().upper()



def classify_security(name: str, is_etf: bool, market_cat=None, exchange=None) -> str:

    """根據名稱和標記分類證券類型"""

    if is_etf:

        return "ETF/ETN"

   

    name_upper = name.upper()

   

    if any(kw in name_upper for kw in ["RIGHTS"]):

        return "Rights"

    if any(kw in name_upper for kw in ["UNITS", "UNIT"]):

        return "Units"

    if any(kw in name_upper for kw in ["WARRANT"]):

        return "Warrants"

    if any(kw in name_upper for kw in ["PREFERRED"]):

        return "Preferred"

    if any(kw in name_upper for kw in ["DEPOSITARY", "ADR", "FOREIGN"]):

        return "ADR/Foreign"

    if any(kw in name_upper for kw in ["COMMON STOCK", "ORDINARY SHARES", "CLASS A", "CLASS B"]):

        return "Common Stock"

    if any(kw in name_upper for kw in ["REIT", "TRUST", "FUND", "LP", "LLC", "PARTNERSHIP"]):

        return "Trust/Fund/LP"

   

    return "Others"



def fetch_nasdaq_with_stats():

    print("📡 下載 NASDAQ 官方列表...")

    # ... (fetch_nasdaq_with_stats 邏輯保持不變)

    url = "https://www.nasdaqtrader.com/dynamic/symdir/nasdaqlisted.txt"

    r = requests.get(url)

    lines = [l for l in r.text.splitlines() if l and not l.startswith("#")]

    df = pd.read_csv(pd.io.common.StringIO("\n".join(lines)), sep="|")

    df = df[df["Test Issue"] == "N"]

   

    # 分類統計

    df["Is_ETF"] = df["ETF"] == "Y"

    df["Category"] = df.apply(lambda row: classify_security(

        row["Security Name"],

        row["Is_ETF"],

        market_cat=row["Market Category"]

    ), axis=1)

   

    stats = Counter(df["Category"])

    print(f"📊 NASDAQ 原始總數: {len(df)}")

    for cat, count in sorted(stats.items()):

        print(f"   • {cat}: {count}")

    print()

   

    # 過濾:只保留 Q/G 板塊的普通股

    filtered = df[

        (df["Market Category"].isin(["Q", "G"])) &

        (df["Category"] == "Common Stock")

    ].copy()

   

    return filtered[["Symbol", "Security Name"]].rename(columns={"Security Name": "Name"}), len(df), len(filtered)



def fetch_nyse_with_stats():

    print("📡 下載 NYSE 官方列表...")

    # ... (fetch_nyse_with_stats 邏輯保持不變)

    url = "https://www.nasdaqtrader.com/dynamic/symdir/otherlisted.txt"

    r = requests.get(url)

    lines = [l for l in r.text.splitlines() if l and not l.startswith("#")]

    df = pd.read_csv(pd.io.common.StringIO("\n".join(lines)), sep="|")

    df = df[df["Test Issue"] == "N"]

   

    # 分類統計

    df["Is_ETF"] = df["ETF"] == "Y"

    df["Category"] = df.apply(lambda row: classify_security(

        row["Security Name"],

        row["Is_ETF"],

        exchange=row["Exchange"]

    ), axis=1)

   

    stats = Counter(df["Category"])

    print(f"📊 NYSE 原始總數: {len(df)}")

    for cat, count in sorted(stats.items()):

        print(f"   • {cat}: {count}")

    print()

   

    # 過濾:只保留 Exchange='N' 的普通股

    filtered = df[

        (df["Exchange"] == "N") &

        (df["Category"] == "Common Stock")

    ].copy()

   

    # 使用 CQS Symbol 作為標準代碼

    filtered["Symbol"] = filtered["CQS Symbol"].apply(clean_symbol)

    return filtered[["Symbol", "Security Name"]].rename(columns={"Security Name": "Name"}), len(df), len(filtered)



def main():

    # 宣告 us_list 為全域變數,確保修改會影響 Cell 外部

    global us_list

   

    try:

        nasdaq_df, nasdaq_raw, nasdaq_kept = fetch_nasdaq_with_stats()

        nyse_df, nyse_raw, nyse_kept = fetch_nyse_with_stats()

    except Exception as e:

        print(f"❌ 下載失敗: {e}")

        return []



    all_df = pd.concat([nasdaq_df, nyse_df], ignore_index=True)

    all_df.drop_duplicates(subset=["Symbol"], keep="first", inplace=True)

   

    total_raw = nasdaq_raw + nyse_raw

    total_kept = len(all_df)

   

    print("="*60)

    print("📈 最終統計摘要")

    print("="*60)

    print(f"📁 資料來源:")

    print(f"   • NASDAQ: {nasdaq_raw} → 保留 {nasdaq_kept} 檔普通股")

    print(f"   • NYSE:   {nyse_raw} → 保留 {nyse_kept} 檔普通股")

    print()

    print(f"🗂️  合計原始項目: {total_raw}")

    print(f"✅ 最終高品質普通股: {total_kept}")

    print(f"📉 過濾排除率: {(1 - total_kept/total_raw)*100:.1f}%")

    print()

   

    # 將清單賦值給全域變數 us_list

    us_list = list(all_df[["Symbol", "Name"]].itertuples(index=False, name=None))

   

    # ⬇️ 移除儲存為 Python 模組的程式碼

    # with open("us_stocks.py", "w", encoding="utf-8") as f:

    #     ... (已移除)

    # print(f"💾 已儲存至: us_stocks.py")

   

    print(f"✅ 清單已儲存至全域變數 us_list (共 {len(us_list)} 檔)。")

    print("\n🧾 前5筆範例:")

    for item in us_list[:5]:

        print(f"   {item}")

   

    return us_list



if __name__ == "__main__":

    main()

⚙️ Cell 2:美股日K下載器主程式(AI 生成)

這段程式碼會根據 us_list 清單,自動執行以下流程:

  • ✅ 預篩:檢查哪些 ticker 有資料可抓
  • ✅ 建立 manifest:記錄每檔狀態(pending / done / failed)
  • ✅ 批次下載:優先使用 Yahoo Finance 的 max/10y
  • ✅ 單檔補救:批次失敗時自動 fallback
  • ✅ 儲存 CSV:每檔儲存為 <ticker>.csv
  • ✅ 更新狀態:每次下載後更新 manifest
  • ✅ 輸出報告:顯示成功/失敗統計,儲存執行參數

✅ 程式碼(請放在 Cell 1 之後)

# === Cell 2: Download US daily bars via yfinance (Enhanced with JP-like features) ===

import os, time, random, warnings, logging, json

import pandas as pd

import yfinance as yf

from concurrent.futures import ThreadPoolExecutor, as_completed # Keep ThreadPoolExecutor but use it for single fetch or prefilter

from pathlib import Path

from tqdm import tqdm



# 【免責聲明 / Disclaimer】 (Omitted for brevity in the response, but keep in the code)



# 降噪

for lg in ["yfinance", "urllib3", "requests"]:

    logging.getLogger(lg).setLevel(logging.CRITICAL)

    logging.getLogger(lg).propagate = False

warnings.filterwarnings("ignore")



# ========== 參數與路徑定義 ==========

MARKET_CODE = "us-share"            # 資料夾名稱

DATA_SUBDIR = "dayK"                # 日K子資料夾名

PROJECT_NAME = "美股日K資料下載器"   # 專案名稱(用於 Log)



try:

    from google.colab import drive

    print("🔗 正在掛載 Google Drive...")

    drive.mount('/content/drive', force_remount=False)

    print("✅ Drive 已掛載")

    BASE_DIR = "/content/drive/MyDrive/各國股票檔案"

except Exception:

    BASE_DIR = os.path.abspath("./data")

    print(f"⚠️ 未在 Colab 環境執行,使用本地路徑:{BASE_DIR}")



# 調整後的路徑

BASE_MARKET_DIR = f"{BASE_DIR}/{MARKET_CODE}"

DATA_DIR_US = f"{BASE_MARKET_DIR}/{DATA_SUBDIR}"

LIST_DIR = f'{BASE_MARKET_DIR}/lists' # 新增清單/Checkpoint 路徑

LOG_PARENT_DIR = f"{BASE_DIR}/Log"

LOG_DIR = f"{LOG_PARENT_DIR}/{PROJECT_NAME}"



# 建立所有需要的資料夾

os.makedirs(DATA_DIR_US, exist_ok=True)

os.makedirs(LOG_DIR, exist_ok=True)

os.makedirs(LIST_DIR, exist_ok=True) # 建立清單/Checkpoint 路徑



START_DATE = "2000-01-01"

END_DATE   = "2099-09-30"

THREADS_US = 8 # 增加線程數用於預篩,正式下載改用批次

BATCH_SIZE = 60 # 批次下載大小

PAUSE_SEC = 5.0 # 批次下載間隔

SAMPLE_LIMIT_US = None



ts_tag = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")

LOG_FILE = f'{LOG_DIR}/download_us_{ts_tag}.txt'



# Checkpoint 檔案 (Manifest/Resume)

MANIFEST_CSV = Path(LIST_DIR) / "us_manifest.csv"

STATE_JSON = Path(LIST_DIR) / "us_state.json"



def log_message(msg: str):

    """將訊息同時輸出到 console 與 log 檔案"""

    with open(LOG_FILE, "a", encoding="utf-8") as f:

        f.write(f"{pd.Timestamp.now()}: {msg}\n")

    print(msg)



# standardize_df 函數 (Unchanged)

def standardize_df(df: pd.DataFrame) -> pd.DataFrame:

    # ... (Original standardize_df logic)

    if df is None or df.empty:

        return pd.DataFrame()

    df = df.reset_index()

    if 'Date' not in df.columns:

        first_col = df.columns[0]

        if str(first_col).lower().startswith("date"):

            df.rename(columns={first_col: 'Date'}, inplace=True)

        else:

            return pd.DataFrame()

    df['date'] = pd.to_datetime(df['Date'], errors='coerce', utc=True)

    for _ in range(2):

        try:

            df['date'] = df['date'].dt.tz_convert(None)

        except Exception:

            try:

                df['date'] = df['date'].dt.tz_localize(None)

            except Exception:

                pass

    df = df.rename(columns={'Open':'open','High':'high','Low':'low','Close':'close','Volume':'volume'})

    req = ['date','open','high','low','close','volume']

    if not all(c in df.columns for c in req):

        return pd.DataFrame()

    df = df.dropna(subset=['date'])

    for c in ['open','high','low','close','volume']:

        df[c] = pd.to_numeric(df[c], errors='coerce')

    df = df.dropna(subset=['open','high','low','close','volume'])

    df = df[df['volume'] > 0]

    df = df[(df['date'] >= pd.to_datetime(START_DATE)) & (df['date'] <= pd.to_datetime(END_DATE))]

    df = df.sort_values('date').reset_index(drop=True)

    return df[req]



# safe_history 函數 (Unchanged)

def safe_history(symbol: str, start: str, end: str, interval="1d", max_retries=6, base_delay=1.0):

    periods = ["max", "10y", "5y", "2y", "1y"]

    for i in range(max_retries):

        try:

            tk = yf.Ticker(symbol)

            if i < len(periods):

                p = periods[i]

                df = tk.history(period=p, interval=interval, auto_adjust=False)

            else:

                df = tk.history(start=start, end=end, interval=interval, auto_adjust=False)

            if df is not None and not df.empty:

                return df

            time.sleep(base_delay + 0.5*i + random.uniform(0, 0.7))

        except Exception as e:

            msg = str(e)

            if any(k in msg for k in ["Invalid Crumb","Unauthorized","401"]):

                time.sleep(8 + 2*i + random.uniform(0, 2))

            else:

                time.sleep(base_delay + 0.5*i + random.uniform(0, 1.0))

    return None



def map_symbol_us(ticker: str) -> str:

    """YF 符號映射,主要用於確保 Ticker 物件能抓取"""

    return str(ticker).upper().strip().replace(".", "-")



# is_valid_csv 函數 (Unchanged)

def is_valid_csv(file_path: str) -> bool:

    # ... (Original is_valid_csv logic)

    try:

        df = pd.read_csv(file_path)

        req = ['date','open','high','low','close','volume']

        if not all(c in df.columns for c in req):

            return False

        # 由於 standardize_df 會使用 START_DATE/END_DATE,這裡用新的,可避免日期範圍改變造成的無效判斷

        df2 = standardize_df(df)

        return not df2.empty

    except Exception:

        return False



# ====== Manifest:逐檔狀態檔(resume 用)(Based on JP logic) ======

def build_manifest(ok_rows, force_rebuild=False):

    """建立或讀取 manifest。欄位:ticker,name,status,last_error,last_try"""

    if (not force_rebuild) and MANIFEST_CSV.exists():

        mf = pd.read_csv(MANIFEST_CSV)

        need_cols = {"ticker","name","status","last_error","last_try"}

        if need_cols.issubset(set(mf.columns)):

            print(f"📄 讀取現有 manifest:{MANIFEST_CSV}({len(mf)} 列)")

            return mf

        else:

            print("⚠️ 舊 manifest 欄位不完整,將重建")



    # 新建

    mf = pd.DataFrame(ok_rows, columns=["ticker","name"])

    mf["status"] = "pending"    # pending / done / failed / skipped

    mf["last_error"] = ""

    mf["last_try"] = ""

    # 已存在檔案標記為 done

    have = {f.split(".")[0] for f in os.listdir(DATA_DIR_US) if f.endswith(".csv")}

    mf.loc[mf["ticker"].isin(have), ["status","last_error"]] = ["done",""]

    mf.to_csv(MANIFEST_CSV, index=False)

    print(f"💾 新建 manifest:{MANIFEST_CSV}({len(mf)} 列,已有 {len(have)} 檔標記 done)")

    return mf



def save_manifest(mf):

    mf.to_csv(MANIFEST_CSV, index=False)



# ====== 預篩 (Prefilter) - 簡易版 ======

def prefilter_us(rows):

    def quick_check(ticker):

        sym = map_symbol_us(ticker)

        try:

            tk = yf.Ticker(sym)

            df = tk.history(period="1y", interval="1mo", auto_adjust=False)

            if df is not None and not df.empty:

                return "ok"

            # 嘗試長週期

            df = tk.history(period="5y", interval="3mo", auto_adjust=False)

            if df is not None and not df.empty:

                return "ok"

            return "bad"

        except Exception:

            return "bad"



    ok_rows = []

    bad_rows = []



    # 使用多執行緒進行預篩 (使用 THREADS_US)

    log_message(f"🏃 正在對 {len(rows)} 檔進行預篩 (THREADS={THREADS_US})...")



    with ThreadPoolExecutor(max_workers=THREADS_US) as ex:

        futs = {ex.submit(quick_check, tkr): (tkr, name) for tkr, name in rows}

        for f in tqdm(as_completed(futs), total=len(futs), desc="US 預篩進度"):

            tkr, name = futs[f]

            try:

                result = f.result()

                if result == "ok":

                    ok_rows.append((tkr, name))

                else:

                    bad_rows.append((tkr, name))

            except Exception as e:

                bad_rows.append((tkr, name))



    log_message(f"✅ 預篩結果:ok={len(ok_rows)}, bad={len(bad_rows)}")

    return ok_rows



# ====== 批次下載與存檔 (Based on JP logic) ======

def download_batch_us(tickers):

    syms = [map_symbol_us(t) for t in tickers]

    df = None

    try:

        df = yf.download(syms, period="max", interval="1d", group_by="ticker", auto_adjust=False, threads=False)

    except Exception as e:

        log_message(f"[download] 批次失敗({len(syms)}): {e} → fallback 10y")

        time.sleep(PAUSE_SEC + random.uniform(0, 1.5))

        try:

            df = yf.download(syms, period="10y", interval="1d", group_by="ticker", auto_adjust=False, threads=False)

        except Exception as e2:

            log_message(f"[download] 10y仍失敗,跳過此批:{e2}")

            return None

    return df



def write_one_from_multi(df_multi, tkr):

    sym = map_symbol_us(tkr)

    try:

        if isinstance(df_multi.columns, pd.MultiIndex):

            sub = df_multi[sym].copy()

        else:

            # 只有單檔時 df_multi.columns 是單層

            if tkr in sym and len(df_multi.columns)==6:

                 sub = df_multi.copy()

            else:

                 return False



        if sub is None or sub.empty:

            return False



        # 這裡使用 standardize_df 來確保欄位和日期範圍正確性

        sub = standardize_df(sub)



        if sub.empty:

            return False



        out = os.path.join(DATA_DIR_US, f"{tkr}.csv")

        sub.to_csv(out, index=False)

        return True

    except Exception as e:

        log_message(f"儲存/標準化失敗 {tkr}: {e}")

        return False



def resume_download_loop(mf):

    # 只挑選 pending/failed/(以及沒有檔案的 skipped)

    # 確保不會重複下載已存在的 valid 檔案

    have = {f.split(".")[0] for f in os.listdir(DATA_DIR_US) if f.endswith(".csv")}

    mf.loc[mf["ticker"].isin(have), ["status","last_error","last_try"]] = ["done","","auto-detected"]

    save_manifest(mf)



    # 重新計算需要下載的清單

    need_tickers = mf[mf["status"].isin(["pending","failed","skipped"]) & (~mf["ticker"].isin(have))]["ticker"].tolist()

    if not need_tickers:

        log_message("✅ 無需下載:manifest 已全部完成或檔案已存在")

        return



    total_batches = (len(need_tickers) + BATCH_SIZE - 1) // BATCH_SIZE



    # 使用 tqdm 顯示進度

    pbar = tqdm(total=len(need_tickers), desc="總下載進度", unit="檔")



    for bi in range(0, len(need_tickers), BATCH_SIZE):

        batch_tickers = need_tickers[bi:bi+BATCH_SIZE]



        # 檢查是否還有未完成項

        current_batch_in_need = mf[mf["ticker"].isin(batch_tickers) & (mf["status"] != "done")]["ticker"].tolist()

        if not current_batch_in_need:

            pbar.update(len(batch_tickers))

            continue



        pbar.set_description(f"[批次 {bi//BATCH_SIZE+1}/{total_batches}] 下載中")



        df = download_batch_us(current_batch_in_need)



        updated_count = 0



        if df is None:

            # 整批失敗→逐檔 fallback (單檔下載)

            for tkr in current_batch_in_need:

                sym = map_symbol_us(tkr)

                ok = False

                try:

                    d1 = safe_history(sym, START_DATE, END_DATE, "1d")

                    d1 = standardize_df(d1)

                    if d1 is not None and not d1.empty:

                        out = os.path.join(DATA_DIR_US, f"{tkr}.csv")

                        d1.to_csv(out, index=False)

                        ok = True

                    else:

                        raise Exception("empty_df_after_standardize")

                except Exception as e:

                    mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["failed", str(e), "single-fallback"]

                if ok:

                    mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["done","", "single-fallback"]

                save_manifest(mf)

                pbar.update(1)

                updated_count += 1



            time.sleep(PAUSE_SEC * 1.5 + random.uniform(0, 2))

            continue



        # 批次成功:把在這批有資料的寫出

        for tkr in current_batch_in_need:

            ok = write_one_from_multi(df, tkr)

            if ok:

                mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["done", "", "batch"]

            else:

                # 批次拉不到,再做單檔補拉

                sym = map_symbol_us(tkr)

                try:

                    d1 = safe_history(sym, START_DATE, END_DATE, "1d")

                    d1 = standardize_df(d1)

                    if d1 is not None and not d1.empty:

                        out = os.path.join(DATA_DIR_US, f"{tkr}.csv")

                        d1.to_csv(out, index=False)

                        mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["done", "", "single-after-batch"]

                    else:

                        mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["failed", "empty_df", "single-after-batch"]

                except Exception as e:

                    mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["failed", str(e), "single-after-batch"]



            pbar.update(1)

            updated_count += 1



        save_manifest(mf)

        time.sleep(PAUSE_SEC + random.uniform(0, 1.5))



    pbar.close()



def main_us():

    print("📁 目錄:")

    print(f"   BASE_DIR = {BASE_DIR}")

    print(f"   LIST_DIR = {LIST_DIR}")

    print(f"   {MARKET_CODE}/{DATA_SUBDIR} = {DATA_DIR_US}")

    print(f"   logs     = {LOG_DIR}")



    print("\n🚀 美股下載開始(含續跑機制)")



    # 檢查 us_list

    try:

        global us_list

        _ = us_list

    except NameError:

        log_message("❌ 找不到變數 `us_list`。請先定義美股清單再執行本段。")

        us_list = []



    if not us_list:

        print("清單為空,程序結束。")

        return



    # 1) 清單

    rows_all = us_list

    if SAMPLE_LIMIT_US:

        rows_all = rows_all[:SAMPLE_LIMIT_US]

    log_message(f"🧾 讀到代碼數:{len(rows_all)}")



    # 2) 預篩(用多執行緒快速過濾無效代碼)

    ok_rows = prefilter_us(rows_all)



    # 3)/讀 manifest(pending/done/failed/skipped)

    mf = build_manifest(ok_rows)



    # 4) 續跑-只補未完成 (批次下載循環)

    resume_download_loop(mf)



    # 5) 統計輸出

    mf = pd.read_csv(MANIFEST_CSV)

    tot = len(mf)

    done = int((mf["status"]=="done").sum())

    failed = int((mf["status"]=="failed").sum())

    pending = int((mf["status"]=="pending").sum())

    skipped = int((mf["status"]=="skipped").sum()) # 其實在 JP 邏輯中,這裡的 done 包含了原本的 skipped



    # 重新統計 done / failed / skipped

    final_have = {f.split(".")[0] for f in os.listdir(DATA_DIR_US) if f.endswith(".csv")}



    succ = len(final_have.intersection(set(mf["ticker"]))) # 成功下載且檔案存在

    fail_final = len(mf[(mf["status"]=="failed") & (~mf["ticker"].isin(final_have))])

    skip_final = len(mf[mf["ticker"].isin(final_have) & (mf["status"]!="failed")])



    log_message(f"📊 狀態統計:total={tot}, success={succ}, failed={fail_final}, skipped={skip_final}")



    # 6) 存下執行參數(方便日後比對)

    with open(STATE_JSON, "w", encoding="utf-8") as f:

        json.dump({

            "ts": ts_tag,

            "start_date": START_DATE,

            "end_date": END_DATE,

            "batch_size": BATCH_SIZE,

            "pause_sec": PAUSE_SEC,

            "threads_us": THREADS_US,

            "sample_limit": SAMPLE_LIMIT_US

        }, f, ensure_ascii=False, indent=2)

    print(f"💾 參數快照:{STATE_JSON}")



    # 7) 報表路徑調整為新的 LOG_DIR

    pd.DataFrame(mf).to_csv(f"{LOG_DIR}/logs_us_{ts_tag}.csv", index=False)

    print(f"📄 詳細結果已儲存:{LOG_DIR}/logs_us_{ts_tag}.csv")



    print("\n📍 建議:若遇到 Too Many Requests,降低 BATCH_SIZE 或增加 PAUSE_SEC,然後重新執行。")



if __name__ == "__main__":

    main_us()


將上方程式碼逐個貼上colab cell執行即可。預設會在goole driver建立資料夾存放日K檔案。

vocus|新世代的創作平台

如果複製程式碼貼到colab上方會出現如下空白,導致執行後發生錯誤訊息

File "<tokenize>", line 205 IndentationError: unindent does not match any outer indentation level

請選擇該處空白選取候用取代方式全部取代,再次執行即可

vocus|新世代的創作平台




🧑‍🔬 作者身份與非專業聲明|AUTHOR'S STATUS AND INTENT 本報告的作者為獨立的、業餘數據研究愛好者,非專業量化分析師,亦不具備任何持牌金融顧問資格。本專題報告是作者利用全職工作外的個人時間完成。 The author of this report is an independent, amateur data researcher and NOT a professional quantitative analyst or a licensed financial advisor. This work is completed in the author's personal free time for statistical research purposes.

📊 數據來源與品質限制|DATA SOURCE LIMITATION 本報告所有歷史價格數據均來自免費公共資源(如 Yahoo Finance)。雖然作者已通過 V4.0 QA 系統盡力檢查並排除明顯錯誤,但由於數據源限制,作者不保證數據 100% 無誤。 All data is sourced from free public providers (e.g., Yahoo Finance). While the author uses the V4.0 QA System to minimize errors, the author offers NO WARRANTY of 100% accuracy. Data integrity is constrained by the free source.

🚫 無投資建議聲明|NO INVESTMENT ADVICE 本文內容、圖表及 AI 分析結果僅供研究參考與教學啟發之用,不構成任何投資買賣建議、諮詢或招攬。所有分析僅描述歷史統計規律。 This content is for statistical research and educational inspiration only. It does NOT constitute personalized financial advice, investment recommendations, or a solicitation to buy or sell securities.

⚠️ 風險與責任劃分|RISK & LIABILITY 股票市場投資涉及重大風險。您應自行判斷並承擔所有投資風險。作者(和平台)對您基於本報告所做出的任何投資決策和潛在損失,不承擔任何責任。 Stock market investing involves significant risk. The reader must exercise their own judgment. The author (and the platform) assumes NO LIABILITY for any financial losses incurred based on the information provided herein.




留言
avatar-img
《炒股不看周月年K漲幅機率就是耍流氓》
16會員
290內容數
普通上班族,用 AI 與 Python 將炒股量化。我的數據宣言是:《炒股不做量化,都是在耍流氓》。
你可能也想看
Thumbnail
這是一場修復文化與重建精神的儀式,觀眾不需要完全看懂《遊林驚夢:巧遇Hagay》,但你能感受心與土地團聚的渴望,也不急著在此處釐清或定義什麼,但你的在場感受,就是一條線索,關於如何找著自己的路徑、自己的聲音。
Thumbnail
這是一場修復文化與重建精神的儀式,觀眾不需要完全看懂《遊林驚夢:巧遇Hagay》,但你能感受心與土地團聚的渴望,也不急著在此處釐清或定義什麼,但你的在場感受,就是一條線索,關於如何找著自己的路徑、自己的聲音。
Thumbnail
重點摘要包括股價變動、失敗投資經驗、日圓貶值的影響、AIPC與Wi-Fi 7新技術的發展以及美國基建法案對市場的影響。給出了對股市投資的一些個人看法。
Thumbnail
重點摘要包括股價變動、失敗投資經驗、日圓貶值的影響、AIPC與Wi-Fi 7新技術的發展以及美國基建法案對市場的影響。給出了對股市投資的一些個人看法。
Thumbnail
探索市場波動、槓桿投資風險、ASIC與GPU的市場競爭、企業多元供應鏈策略、以及新創公司投資機會。深入分析如何管理投資風險、預測技術發展趨勢,並提供對新興科技企業投資的實用建議,幫助投資者在變化莫測的市場中做出明智決策。
Thumbnail
探索市場波動、槓桿投資風險、ASIC與GPU的市場競爭、企業多元供應鏈策略、以及新創公司投資機會。深入分析如何管理投資風險、預測技術發展趨勢,並提供對新興科技企業投資的實用建議,幫助投資者在變化莫測的市場中做出明智決策。
Thumbnail
《轉轉生》(Re:INCARNATION)為奈及利亞編舞家庫德斯.奧尼奎庫與 Q 舞團創作的當代舞蹈作品,結合拉各斯街頭節奏、Afrobeat/Afrobeats、以及約魯巴宇宙觀的非線性時間,建構出關於輪迴的「誕生—死亡—重生」儀式結構。本文將從約魯巴哲學概念出發,解析其去殖民的身體政治。
Thumbnail
《轉轉生》(Re:INCARNATION)為奈及利亞編舞家庫德斯.奧尼奎庫與 Q 舞團創作的當代舞蹈作品,結合拉各斯街頭節奏、Afrobeat/Afrobeats、以及約魯巴宇宙觀的非線性時間,建構出關於輪迴的「誕生—死亡—重生」儀式結構。本文將從約魯巴哲學概念出發,解析其去殖民的身體政治。
Thumbnail
本篇將分享給訂閱讀者關於作者本身目前之操作 (選股 )邏輯與針對未來一季的投資策略。
Thumbnail
本篇將分享給訂閱讀者關於作者本身目前之操作 (選股 )邏輯與針對未來一季的投資策略。
Thumbnail
這是股癌筆記包含一個QA的文章,內容包括市場時機與操作策略、股票投資心得與策略、市場多空情緒分析、技術分析與市場趨勢探討、投資心態與策略調整經歷等多個方面。文章中還分享了對AI伺服器和相關技術、產業趨勢、供應鏈動態、以及市場傳言與現實情況對比的分析。
Thumbnail
這是股癌筆記包含一個QA的文章,內容包括市場時機與操作策略、股票投資心得與策略、市場多空情緒分析、技術分析與市場趨勢探討、投資心態與策略調整經歷等多個方面。文章中還分享了對AI伺服器和相關技術、產業趨勢、供應鏈動態、以及市場傳言與現實情況對比的分析。
Thumbnail
本文深入探討了美股和台股的最新表現。從投資策略到個人經驗,特別強調在市場波動時期保持冷靜的重要性,以及選擇潛力股的策略。此外,文章還討論了AI技術股的投資機會,提供了實用的風險管理建議,並探討了ASIC市場的競爭與合作。無論是長期投資者還是短線操作者,都能從中獲得寶貴的見解。
Thumbnail
本文深入探討了美股和台股的最新表現。從投資策略到個人經驗,特別強調在市場波動時期保持冷靜的重要性,以及選擇潛力股的策略。此外,文章還討論了AI技術股的投資機會,提供了實用的風險管理建議,並探討了ASIC市場的競爭與合作。無論是長期投資者還是短線操作者,都能從中獲得寶貴的見解。
Thumbnail
本文分析導演巴里・柯斯基(Barrie Kosky)如何運用極簡的舞臺配置,將布萊希特(Bertolt Brecht)的「疏離效果」轉化為視覺奇觀與黑色幽默,探討《三便士歌劇》在當代劇場中的新詮釋,並藉由舞臺、燈光、服裝、音樂等多方面,分析該作如何在保留批判核心的同時,觸及觀眾的觀看位置與人性幽微。
Thumbnail
本文分析導演巴里・柯斯基(Barrie Kosky)如何運用極簡的舞臺配置,將布萊希特(Bertolt Brecht)的「疏離效果」轉化為視覺奇觀與黑色幽默,探討《三便士歌劇》在當代劇場中的新詮釋,並藉由舞臺、燈光、服裝、音樂等多方面,分析該作如何在保留批判核心的同時,觸及觀眾的觀看位置與人性幽微。
Thumbnail
這是每週定期的筆記,記錄投資事項及股市狀況。文章主要介紹了購買公仔和模型的經歷以及與股市投資的關聯,分析股市表現、公司事件和投資策略。重點整理跨連事件報導、微軟機會和臺積電前景等。
Thumbnail
這是每週定期的筆記,記錄投資事項及股市狀況。文章主要介紹了購買公仔和模型的經歷以及與股市投資的關聯,分析股市表現、公司事件和投資策略。重點整理跨連事件報導、微軟機會和臺積電前景等。
Thumbnail
背景:從冷門配角到市場主線,算力與電力被重新定價   小P從2008進入股市,每一個時期的投資亮點都不同,記得2009蘋果手機剛上市,當時蘋果只要在媒體上提到哪一間供應鏈,隔天股價就有驚人的表現,當時光學鏡頭非常熱門,因為手機第一次搭上鏡頭可以拍照,也造就傳統相機廠的殞落,如今手機已經全面普及,題
Thumbnail
背景:從冷門配角到市場主線,算力與電力被重新定價   小P從2008進入股市,每一個時期的投資亮點都不同,記得2009蘋果手機剛上市,當時蘋果只要在媒體上提到哪一間供應鏈,隔天股價就有驚人的表現,當時光學鏡頭非常熱門,因為手機第一次搭上鏡頭可以拍照,也造就傳統相機廠的殞落,如今手機已經全面普及,題
Thumbnail
最新的市場趨勢與投資策略,深入解析AI應用、特斯拉和車用半導體等領域。掌握槓桿使用技巧,洞察大盤動態,確保投資風險管理,並提供冷靜的投資心態指導。
Thumbnail
最新的市場趨勢與投資策略,深入解析AI應用、特斯拉和車用半導體等領域。掌握槓桿使用技巧,洞察大盤動態,確保投資風險管理,並提供冷靜的投資心態指導。
Thumbnail
保持理性面對市場波動,避免過度對沖和降低槓桿,監控日元匯率,並保持長期投資視角。了解市場調整期的應對策略、事件型交易的操作技巧,以及行業領頭羊的動向。關注經濟數據和市場傳聞,靈活調整投資策略,確保投資決策基於充分的信息和合理的分析。
Thumbnail
保持理性面對市場波動,避免過度對沖和降低槓桿,監控日元匯率,並保持長期投資視角。了解市場調整期的應對策略、事件型交易的操作技巧,以及行業領頭羊的動向。關注經濟數據和市場傳聞,靈活調整投資策略,確保投資決策基於充分的信息和合理的分析。
追蹤感興趣的內容從 Google News 追蹤更多 vocus 的最新精選內容追蹤 Google News