美股股票清單抓取工具 - 功能分析
📋 程式概述
這是一個從官方來源自動抓取並篩選美股高品質普通股清單的 Python 工具。它會從 NASDAQ 和 NYSE 官方網站下載最新資料,並進行智能分類與過濾。
🎯 核心功能
1. 資料來源
- NASDAQ 市場:從 nasdaqtrader.com 官方 API 抓取
- NYSE 市場:同樣從官方 API 取得完整列表
- 確保資料來源權威且即時更新
2. 智能分類系統
| 類別 | 說明 | 範例 |3. 高品質篩選條件
|----------------|--------------------------|--------------------------|
| ETF/ETN | 交易所交易基金 | SPY, QQQ |
| Common Stock | 普通股 | AAPL, MSFT |
| ADR/Foreign | 美國存託憑證 / 海外股票 | BABA, TSM |
| Preferred | 特別股 / 優先股證券 | |
| Warrants | 認股權證 / 衍生性商品 | |
| Units | 單位證券 / SPAC 單位 | |
| Rights | 認購權 / 股東認購權 | |
| Trust/Fund/LP | 信託 / 基金 / 合夥 | REITs, MLP |
- NASDAQ 篩選規則:只保留 Q 板塊(全球精選市場)和 G 板塊(全球市場)僅選取 Common Stock(普通股)排除測試證券
- NYSE 篩選規則:只保留 Exchange='N'(NYSE 主板)僅選取 Common Stock(普通股)排除測試證券
📊 執行結果解讀
原始資料統計
### 📊 NASDAQ 原始資料分類
| 類型 | 數量 | 百分比 |
|----------------|----------|----------|
| Common Stock | 2,991 檔 | 57.9% |
| ETF/ETN | 1,036 檔 | 20.1% |
| Warrants | 347 檔 | 6.7% |
| 其他類型 | 788 檔 | 15.3% |
| **總計** | 5,162 檔 | |
---
### 📊 NYSE 原始資料分類
| 類型 | 數量 | 百分比 |
|----------------|----------|----------|
| ETF/ETN | 3,624 檔 | 53.6% |
| Common Stock | 1,899 檔 | 28.1% |
| Preferred | 326 檔 | 4.8% |
| 其他類型 | 916 檔 | 13.5% |
| **總計** | 6,765 檔 | |
最終篩選成果
| 指標項目 | 數值說明 |
|------------------|-----------------------------------------|
| 原始總項目 | 11,927 檔 |
| 最終保留 | 3,411 檔高品質普通股 |
| 過濾率 | 71.4% |
| NASDAQ 保留率 | 33.1% (1,710 / 5,162) |
| NYSE 保留率 | 25.1% (1,701 / 6,765) |
💡 技術特點
1. 資料清洗
- 自動移除測試證券(Test Issue)
- 標準化股票代碼格式(大寫、去空格)
- 使用 CQS Symbol 作為 NYSE 標準代碼
2. 去重機制
- 合併 NASDAQ 與 NYSE 清單時
- 以 NASDAQ 資料為優先(keep='first')
- 確保每個股票代碼唯一
3. 全域變數設計
global us_list
us_list = [] # 儲存最終結果
🔧 使用範例
# 執行主程式
result = main()
# 查看清單內容
print(f"共抓取 {len(us_list)} 檔股票")
print(us_list[:5]) # 查看前 5 筆
輸出格式
('AAPL', 'Apple Inc. - Common Stock')
('MSFT', 'Microsoft Corporation - Common Stock')
('GOOGL', 'Alphabet Inc. - Class A Common Stock')
⚙️ 依賴套件
pip install pandas requests
🎓 適用場景
- 量化交易系統:建立交易標的池
- 資料分析:美股市場研究
- 投資組合管理:股票清單維護
- 自動化監控:定期更新可交易清單
📝 輸出資料範例
前5筆範例:
('AACB', 'Artius II Acquisition Inc. - Class A Ordinary Shares')
('AAL', 'American Airlines Group, Inc. - Common Stock')
('AAME', 'Atlantic American Corporation - Common Stock')
('AAOI', 'Applied Optoelectronics, Inc. - Common Stock')
('AAON', 'AAON, Inc. - Common Stock')
✅ 官方資料來源 - 確保準確性 ✅ 智能分類 - 自動識別證券類型 ✅ 嚴格篩選 - 只保留高品質普通股 ✅ 即時更新 - 每次執行獲取最新資料 ✅ 詳細統計 - 完整的過濾報表 ✅ 易於整合 - 可直接用於其他專案
🔍 注意事項
- 需要網路連線以存取官方 API
- 資料會隨市場變動而更新
- 建議定期執行以維持清單最新
- 過濾條件可根據需求調整
🧩 完整程式碼
# -*- coding: utf-8 -*-
"""
✅ 升級版:抓取 NASDAQ + NYSE 清單 + 詳細統計報表 (已移除檔案輸出,改為全域變數)
- 分類統計:ETF / ADR / Units / Warrants / Common Stock / Others
- 顯示過濾前後數量
"""
import pandas as pd
import requests
from collections import Counter
# 宣告全域變數,確保清單可以在 main() 外部存取
global us_list
us_list = []
def clean_symbol(sym: str) -> str:
return sym.strip().upper()
def classify_security(name: str, is_etf: bool, market_cat=None, exchange=None) -> str:
"""根據名稱和標記分類證券類型"""
if is_etf:
return "ETF/ETN"
name_upper = name.upper()
if any(kw in name_upper for kw in ["RIGHTS"]):
return "Rights"
if any(kw in name_upper for kw in ["UNITS", "UNIT"]):
return "Units"
if any(kw in name_upper for kw in ["WARRANT"]):
return "Warrants"
if any(kw in name_upper for kw in ["PREFERRED"]):
return "Preferred"
if any(kw in name_upper for kw in ["DEPOSITARY", "ADR", "FOREIGN"]):
return "ADR/Foreign"
if any(kw in name_upper for kw in ["COMMON STOCK", "ORDINARY SHARES", "CLASS A", "CLASS B"]):
return "Common Stock"
if any(kw in name_upper for kw in ["REIT", "TRUST", "FUND", "LP", "LLC", "PARTNERSHIP"]):
return "Trust/Fund/LP"
return "Others"
def fetch_nasdaq_with_stats():
print("📡 下載 NASDAQ 官方列表...")
# ... (fetch_nasdaq_with_stats 邏輯保持不變)
url = "https://www.nasdaqtrader.com/dynamic/symdir/nasdaqlisted.txt"
r = requests.get(url)
lines = [l for l in r.text.splitlines() if l and not l.startswith("#")]
df = pd.read_csv(pd.io.common.StringIO("\n".join(lines)), sep="|")
df = df[df["Test Issue"] == "N"]
# 分類統計
df["Is_ETF"] = df["ETF"] == "Y"
df["Category"] = df.apply(lambda row: classify_security(
row["Security Name"],
row["Is_ETF"],
market_cat=row["Market Category"]
), axis=1)
stats = Counter(df["Category"])
print(f"📊 NASDAQ 原始總數: {len(df)}")
for cat, count in sorted(stats.items()):
print(f" • {cat}: {count}")
print()
# 過濾:只保留 Q/G 板塊的普通股
filtered = df[
(df["Market Category"].isin(["Q", "G"])) &
(df["Category"] == "Common Stock")
].copy()
return filtered[["Symbol", "Security Name"]].rename(columns={"Security Name": "Name"}), len(df), len(filtered)
def fetch_nyse_with_stats():
print("📡 下載 NYSE 官方列表...")
# ... (fetch_nyse_with_stats 邏輯保持不變)
url = "https://www.nasdaqtrader.com/dynamic/symdir/otherlisted.txt"
r = requests.get(url)
lines = [l for l in r.text.splitlines() if l and not l.startswith("#")]
df = pd.read_csv(pd.io.common.StringIO("\n".join(lines)), sep="|")
df = df[df["Test Issue"] == "N"]
# 分類統計
df["Is_ETF"] = df["ETF"] == "Y"
df["Category"] = df.apply(lambda row: classify_security(
row["Security Name"],
row["Is_ETF"],
exchange=row["Exchange"]
), axis=1)
stats = Counter(df["Category"])
print(f"📊 NYSE 原始總數: {len(df)}")
for cat, count in sorted(stats.items()):
print(f" • {cat}: {count}")
print()
# 過濾:只保留 Exchange='N' 的普通股
filtered = df[
(df["Exchange"] == "N") &
(df["Category"] == "Common Stock")
].copy()
# 使用 CQS Symbol 作為標準代碼
filtered["Symbol"] = filtered["CQS Symbol"].apply(clean_symbol)
return filtered[["Symbol", "Security Name"]].rename(columns={"Security Name": "Name"}), len(df), len(filtered)
def main():
# 宣告 us_list 為全域變數,確保修改會影響 Cell 外部
global us_list
try:
nasdaq_df, nasdaq_raw, nasdaq_kept = fetch_nasdaq_with_stats()
nyse_df, nyse_raw, nyse_kept = fetch_nyse_with_stats()
except Exception as e:
print(f"❌ 下載失敗: {e}")
return []
all_df = pd.concat([nasdaq_df, nyse_df], ignore_index=True)
all_df.drop_duplicates(subset=["Symbol"], keep="first", inplace=True)
total_raw = nasdaq_raw + nyse_raw
total_kept = len(all_df)
print("="*60)
print("📈 最終統計摘要")
print("="*60)
print(f"📁 資料來源:")
print(f" • NASDAQ: {nasdaq_raw} → 保留 {nasdaq_kept} 檔普通股")
print(f" • NYSE: {nyse_raw} → 保留 {nyse_kept} 檔普通股")
print()
print(f"🗂️ 合計原始項目: {total_raw}")
print(f"✅ 最終高品質普通股: {total_kept}")
print(f"📉 過濾排除率: {(1 - total_kept/total_raw)*100:.1f}%")
print()
# 將清單賦值給全域變數 us_list
us_list = list(all_df[["Symbol", "Name"]].itertuples(index=False, name=None))
# ⬇️ 移除儲存為 Python 模組的程式碼
# with open("us_stocks.py", "w", encoding="utf-8") as f:
# ... (已移除)
# print(f"💾 已儲存至: us_stocks.py")
print(f"✅ 清單已儲存至全域變數 us_list (共 {len(us_list)} 檔)。")
print("\n🧾 前5筆範例:")
for item in us_list[:5]:
print(f" {item}")
return us_list
if __name__ == "__main__":
main()
⚙️ Cell 2:美股日K下載器主程式(AI 生成)
這段程式碼會根據 us_list 清單,自動執行以下流程:
- ✅ 預篩:檢查哪些 ticker 有資料可抓
- ✅ 建立 manifest:記錄每檔狀態(pending / done / failed)
- ✅ 批次下載:優先使用 Yahoo Finance 的 max/10y
- ✅ 單檔補救:批次失敗時自動 fallback
- ✅ 儲存 CSV:每檔儲存為 <ticker>.csv
- ✅ 更新狀態:每次下載後更新 manifest
- ✅ 輸出報告:顯示成功/失敗統計,儲存執行參數
✅ 程式碼(請放在 Cell 1 之後)
# === Cell 2: Download US daily bars via yfinance (Enhanced with JP-like features) ===
import os, time, random, warnings, logging, json
import pandas as pd
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor, as_completed # Keep ThreadPoolExecutor but use it for single fetch or prefilter
from pathlib import Path
from tqdm import tqdm
# 【免責聲明 / Disclaimer】 (Omitted for brevity in the response, but keep in the code)
# 降噪
for lg in ["yfinance", "urllib3", "requests"]:
logging.getLogger(lg).setLevel(logging.CRITICAL)
logging.getLogger(lg).propagate = False
warnings.filterwarnings("ignore")
# ========== 參數與路徑定義 ==========
MARKET_CODE = "us-share" # 資料夾名稱
DATA_SUBDIR = "dayK" # 日K子資料夾名
PROJECT_NAME = "美股日K資料下載器" # 專案名稱(用於 Log)
try:
from google.colab import drive
print("🔗 正在掛載 Google Drive...")
drive.mount('/content/drive', force_remount=False)
print("✅ Drive 已掛載")
BASE_DIR = "/content/drive/MyDrive/各國股票檔案"
except Exception:
BASE_DIR = os.path.abspath("./data")
print(f"⚠️ 未在 Colab 環境執行,使用本地路徑:{BASE_DIR}")
# 調整後的路徑
BASE_MARKET_DIR = f"{BASE_DIR}/{MARKET_CODE}"
DATA_DIR_US = f"{BASE_MARKET_DIR}/{DATA_SUBDIR}"
LIST_DIR = f'{BASE_MARKET_DIR}/lists' # 新增清單/Checkpoint 路徑
LOG_PARENT_DIR = f"{BASE_DIR}/Log"
LOG_DIR = f"{LOG_PARENT_DIR}/{PROJECT_NAME}"
# 建立所有需要的資料夾
os.makedirs(DATA_DIR_US, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(LIST_DIR, exist_ok=True) # 建立清單/Checkpoint 路徑
START_DATE = "2000-01-01"
END_DATE = "2099-09-30"
THREADS_US = 8 # 增加線程數用於預篩,正式下載改用批次
BATCH_SIZE = 60 # 批次下載大小
PAUSE_SEC = 5.0 # 批次下載間隔
SAMPLE_LIMIT_US = None
ts_tag = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
LOG_FILE = f'{LOG_DIR}/download_us_{ts_tag}.txt'
# Checkpoint 檔案 (Manifest/Resume)
MANIFEST_CSV = Path(LIST_DIR) / "us_manifest.csv"
STATE_JSON = Path(LIST_DIR) / "us_state.json"
def log_message(msg: str):
"""將訊息同時輸出到 console 與 log 檔案"""
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"{pd.Timestamp.now()}: {msg}\n")
print(msg)
# standardize_df 函數 (Unchanged)
def standardize_df(df: pd.DataFrame) -> pd.DataFrame:
# ... (Original standardize_df logic)
if df is None or df.empty:
return pd.DataFrame()
df = df.reset_index()
if 'Date' not in df.columns:
first_col = df.columns[0]
if str(first_col).lower().startswith("date"):
df.rename(columns={first_col: 'Date'}, inplace=True)
else:
return pd.DataFrame()
df['date'] = pd.to_datetime(df['Date'], errors='coerce', utc=True)
for _ in range(2):
try:
df['date'] = df['date'].dt.tz_convert(None)
except Exception:
try:
df['date'] = df['date'].dt.tz_localize(None)
except Exception:
pass
df = df.rename(columns={'Open':'open','High':'high','Low':'low','Close':'close','Volume':'volume'})
req = ['date','open','high','low','close','volume']
if not all(c in df.columns for c in req):
return pd.DataFrame()
df = df.dropna(subset=['date'])
for c in ['open','high','low','close','volume']:
df[c] = pd.to_numeric(df[c], errors='coerce')
df = df.dropna(subset=['open','high','low','close','volume'])
df = df[df['volume'] > 0]
df = df[(df['date'] >= pd.to_datetime(START_DATE)) & (df['date'] <= pd.to_datetime(END_DATE))]
df = df.sort_values('date').reset_index(drop=True)
return df[req]
# safe_history 函數 (Unchanged)
def safe_history(symbol: str, start: str, end: str, interval="1d", max_retries=6, base_delay=1.0):
periods = ["max", "10y", "5y", "2y", "1y"]
for i in range(max_retries):
try:
tk = yf.Ticker(symbol)
if i < len(periods):
p = periods[i]
df = tk.history(period=p, interval=interval, auto_adjust=False)
else:
df = tk.history(start=start, end=end, interval=interval, auto_adjust=False)
if df is not None and not df.empty:
return df
time.sleep(base_delay + 0.5*i + random.uniform(0, 0.7))
except Exception as e:
msg = str(e)
if any(k in msg for k in ["Invalid Crumb","Unauthorized","401"]):
time.sleep(8 + 2*i + random.uniform(0, 2))
else:
time.sleep(base_delay + 0.5*i + random.uniform(0, 1.0))
return None
def map_symbol_us(ticker: str) -> str:
"""YF 符號映射,主要用於確保 Ticker 物件能抓取"""
return str(ticker).upper().strip().replace(".", "-")
# is_valid_csv 函數 (Unchanged)
def is_valid_csv(file_path: str) -> bool:
# ... (Original is_valid_csv logic)
try:
df = pd.read_csv(file_path)
req = ['date','open','high','low','close','volume']
if not all(c in df.columns for c in req):
return False
# 由於 standardize_df 會使用 START_DATE/END_DATE,這裡用新的,可避免日期範圍改變造成的無效判斷
df2 = standardize_df(df)
return not df2.empty
except Exception:
return False
# ====== Manifest:逐檔狀態檔(resume 用)(Based on JP logic) ======
def build_manifest(ok_rows, force_rebuild=False):
"""建立或讀取 manifest。欄位:ticker,name,status,last_error,last_try"""
if (not force_rebuild) and MANIFEST_CSV.exists():
mf = pd.read_csv(MANIFEST_CSV)
need_cols = {"ticker","name","status","last_error","last_try"}
if need_cols.issubset(set(mf.columns)):
print(f"📄 讀取現有 manifest:{MANIFEST_CSV}({len(mf)} 列)")
return mf
else:
print("⚠️ 舊 manifest 欄位不完整,將重建")
# 新建
mf = pd.DataFrame(ok_rows, columns=["ticker","name"])
mf["status"] = "pending" # pending / done / failed / skipped
mf["last_error"] = ""
mf["last_try"] = ""
# 已存在檔案標記為 done
have = {f.split(".")[0] for f in os.listdir(DATA_DIR_US) if f.endswith(".csv")}
mf.loc[mf["ticker"].isin(have), ["status","last_error"]] = ["done",""]
mf.to_csv(MANIFEST_CSV, index=False)
print(f"💾 新建 manifest:{MANIFEST_CSV}({len(mf)} 列,已有 {len(have)} 檔標記 done)")
return mf
def save_manifest(mf):
mf.to_csv(MANIFEST_CSV, index=False)
# ====== 預篩 (Prefilter) - 簡易版 ======
def prefilter_us(rows):
def quick_check(ticker):
sym = map_symbol_us(ticker)
try:
tk = yf.Ticker(sym)
df = tk.history(period="1y", interval="1mo", auto_adjust=False)
if df is not None and not df.empty:
return "ok"
# 嘗試長週期
df = tk.history(period="5y", interval="3mo", auto_adjust=False)
if df is not None and not df.empty:
return "ok"
return "bad"
except Exception:
return "bad"
ok_rows = []
bad_rows = []
# 使用多執行緒進行預篩 (使用 THREADS_US)
log_message(f"🏃 正在對 {len(rows)} 檔進行預篩 (THREADS={THREADS_US})...")
with ThreadPoolExecutor(max_workers=THREADS_US) as ex:
futs = {ex.submit(quick_check, tkr): (tkr, name) for tkr, name in rows}
for f in tqdm(as_completed(futs), total=len(futs), desc="US 預篩進度"):
tkr, name = futs[f]
try:
result = f.result()
if result == "ok":
ok_rows.append((tkr, name))
else:
bad_rows.append((tkr, name))
except Exception as e:
bad_rows.append((tkr, name))
log_message(f"✅ 預篩結果:ok={len(ok_rows)}, bad={len(bad_rows)}")
return ok_rows
# ====== 批次下載與存檔 (Based on JP logic) ======
def download_batch_us(tickers):
syms = [map_symbol_us(t) for t in tickers]
df = None
try:
df = yf.download(syms, period="max", interval="1d", group_by="ticker", auto_adjust=False, threads=False)
except Exception as e:
log_message(f"[download] 批次失敗({len(syms)}): {e} → fallback 10y")
time.sleep(PAUSE_SEC + random.uniform(0, 1.5))
try:
df = yf.download(syms, period="10y", interval="1d", group_by="ticker", auto_adjust=False, threads=False)
except Exception as e2:
log_message(f"[download] 10y仍失敗,跳過此批:{e2}")
return None
return df
def write_one_from_multi(df_multi, tkr):
sym = map_symbol_us(tkr)
try:
if isinstance(df_multi.columns, pd.MultiIndex):
sub = df_multi[sym].copy()
else:
# 只有單檔時 df_multi.columns 是單層
if tkr in sym and len(df_multi.columns)==6:
sub = df_multi.copy()
else:
return False
if sub is None or sub.empty:
return False
# 這裡使用 standardize_df 來確保欄位和日期範圍正確性
sub = standardize_df(sub)
if sub.empty:
return False
out = os.path.join(DATA_DIR_US, f"{tkr}.csv")
sub.to_csv(out, index=False)
return True
except Exception as e:
log_message(f"儲存/標準化失敗 {tkr}: {e}")
return False
def resume_download_loop(mf):
# 只挑選 pending/failed/(以及沒有檔案的 skipped)
# 確保不會重複下載已存在的 valid 檔案
have = {f.split(".")[0] for f in os.listdir(DATA_DIR_US) if f.endswith(".csv")}
mf.loc[mf["ticker"].isin(have), ["status","last_error","last_try"]] = ["done","","auto-detected"]
save_manifest(mf)
# 重新計算需要下載的清單
need_tickers = mf[mf["status"].isin(["pending","failed","skipped"]) & (~mf["ticker"].isin(have))]["ticker"].tolist()
if not need_tickers:
log_message("✅ 無需下載:manifest 已全部完成或檔案已存在")
return
total_batches = (len(need_tickers) + BATCH_SIZE - 1) // BATCH_SIZE
# 使用 tqdm 顯示進度
pbar = tqdm(total=len(need_tickers), desc="總下載進度", unit="檔")
for bi in range(0, len(need_tickers), BATCH_SIZE):
batch_tickers = need_tickers[bi:bi+BATCH_SIZE]
# 檢查是否還有未完成項
current_batch_in_need = mf[mf["ticker"].isin(batch_tickers) & (mf["status"] != "done")]["ticker"].tolist()
if not current_batch_in_need:
pbar.update(len(batch_tickers))
continue
pbar.set_description(f"[批次 {bi//BATCH_SIZE+1}/{total_batches}] 下載中")
df = download_batch_us(current_batch_in_need)
updated_count = 0
if df is None:
# 整批失敗→逐檔 fallback (單檔下載)
for tkr in current_batch_in_need:
sym = map_symbol_us(tkr)
ok = False
try:
d1 = safe_history(sym, START_DATE, END_DATE, "1d")
d1 = standardize_df(d1)
if d1 is not None and not d1.empty:
out = os.path.join(DATA_DIR_US, f"{tkr}.csv")
d1.to_csv(out, index=False)
ok = True
else:
raise Exception("empty_df_after_standardize")
except Exception as e:
mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["failed", str(e), "single-fallback"]
if ok:
mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["done","", "single-fallback"]
save_manifest(mf)
pbar.update(1)
updated_count += 1
time.sleep(PAUSE_SEC * 1.5 + random.uniform(0, 2))
continue
# 批次成功:把在這批有資料的寫出
for tkr in current_batch_in_need:
ok = write_one_from_multi(df, tkr)
if ok:
mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["done", "", "batch"]
else:
# 批次拉不到,再做單檔補拉
sym = map_symbol_us(tkr)
try:
d1 = safe_history(sym, START_DATE, END_DATE, "1d")
d1 = standardize_df(d1)
if d1 is not None and not d1.empty:
out = os.path.join(DATA_DIR_US, f"{tkr}.csv")
d1.to_csv(out, index=False)
mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["done", "", "single-after-batch"]
else:
mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["failed", "empty_df", "single-after-batch"]
except Exception as e:
mf.loc[mf["ticker"]==tkr, ["status","last_error","last_try"]] = ["failed", str(e), "single-after-batch"]
pbar.update(1)
updated_count += 1
save_manifest(mf)
time.sleep(PAUSE_SEC + random.uniform(0, 1.5))
pbar.close()
def main_us():
print("📁 目錄:")
print(f" BASE_DIR = {BASE_DIR}")
print(f" LIST_DIR = {LIST_DIR}")
print(f" {MARKET_CODE}/{DATA_SUBDIR} = {DATA_DIR_US}")
print(f" logs = {LOG_DIR}")
print("\n🚀 美股下載開始(含續跑機制)")
# 檢查 us_list
try:
global us_list
_ = us_list
except NameError:
log_message("❌ 找不到變數 `us_list`。請先定義美股清單再執行本段。")
us_list = []
if not us_list:
print("清單為空,程序結束。")
return
# 1) 清單
rows_all = us_list
if SAMPLE_LIMIT_US:
rows_all = rows_all[:SAMPLE_LIMIT_US]
log_message(f"🧾 讀到代碼數:{len(rows_all)}")
# 2) 預篩(用多執行緒快速過濾無效代碼)
ok_rows = prefilter_us(rows_all)
# 3) 建/讀 manifest(pending/done/failed/skipped)
mf = build_manifest(ok_rows)
# 4) 續跑-只補未完成 (批次下載循環)
resume_download_loop(mf)
# 5) 統計輸出
mf = pd.read_csv(MANIFEST_CSV)
tot = len(mf)
done = int((mf["status"]=="done").sum())
failed = int((mf["status"]=="failed").sum())
pending = int((mf["status"]=="pending").sum())
skipped = int((mf["status"]=="skipped").sum()) # 其實在 JP 邏輯中,這裡的 done 包含了原本的 skipped
# 重新統計 done / failed / skipped
final_have = {f.split(".")[0] for f in os.listdir(DATA_DIR_US) if f.endswith(".csv")}
succ = len(final_have.intersection(set(mf["ticker"]))) # 成功下載且檔案存在
fail_final = len(mf[(mf["status"]=="failed") & (~mf["ticker"].isin(final_have))])
skip_final = len(mf[mf["ticker"].isin(final_have) & (mf["status"]!="failed")])
log_message(f"📊 狀態統計:total={tot}, success={succ}, failed={fail_final}, skipped={skip_final}")
# 6) 存下執行參數(方便日後比對)
with open(STATE_JSON, "w", encoding="utf-8") as f:
json.dump({
"ts": ts_tag,
"start_date": START_DATE,
"end_date": END_DATE,
"batch_size": BATCH_SIZE,
"pause_sec": PAUSE_SEC,
"threads_us": THREADS_US,
"sample_limit": SAMPLE_LIMIT_US
}, f, ensure_ascii=False, indent=2)
print(f"💾 參數快照:{STATE_JSON}")
# 7) 報表路徑調整為新的 LOG_DIR
pd.DataFrame(mf).to_csv(f"{LOG_DIR}/logs_us_{ts_tag}.csv", index=False)
print(f"📄 詳細結果已儲存:{LOG_DIR}/logs_us_{ts_tag}.csv")
print("\n📍 建議:若遇到 Too Many Requests,降低 BATCH_SIZE 或增加 PAUSE_SEC,然後重新執行。")
if __name__ == "__main__":
main_us()
將上方程式碼逐個貼上colab cell執行即可。預設會在goole driver建立資料夾存放日K檔案。

如果複製程式碼貼到colab上方會出現如下空白,導致執行後發生錯誤訊息
File "<tokenize>", line 205 IndentationError: unindent does not match any outer indentation level
請選擇該處空白選取候用取代方式全部取代,再次執行即可

🧑🔬 作者身份與非專業聲明|AUTHOR'S STATUS AND INTENT 本報告的作者為獨立的、業餘數據研究愛好者,非專業量化分析師,亦不具備任何持牌金融顧問資格。本專題報告是作者利用全職工作外的個人時間完成。 The author of this report is an independent, amateur data researcher and NOT a professional quantitative analyst or a licensed financial advisor. This work is completed in the author's personal free time for statistical research purposes.
📊 數據來源與品質限制|DATA SOURCE LIMITATION 本報告所有歷史價格數據均來自免費公共資源(如 Yahoo Finance)。雖然作者已通過 V4.0 QA 系統盡力檢查並排除明顯錯誤,但由於數據源限制,作者不保證數據 100% 無誤。 All data is sourced from free public providers (e.g., Yahoo Finance). While the author uses the V4.0 QA System to minimize errors, the author offers NO WARRANTY of 100% accuracy. Data integrity is constrained by the free source.
🚫 無投資建議聲明|NO INVESTMENT ADVICE 本文內容、圖表及 AI 分析結果僅供研究參考與教學啟發之用,不構成任何投資買賣建議、諮詢或招攬。所有分析僅描述歷史統計規律。 This content is for statistical research and educational inspiration only. It does NOT constitute personalized financial advice, investment recommendations, or a solicitation to buy or sell securities.
⚠️ 風險與責任劃分|RISK & LIABILITY 股票市場投資涉及重大風險。您應自行判斷並承擔所有投資風險。作者(和平台)對您基於本報告所做出的任何投資決策和潛在損失,不承擔任何責任。 Stock market investing involves significant risk. The reader must exercise their own judgment. The author (and the platform) assumes NO LIABILITY for any financial losses incurred based on the information provided herein.

















