import baostock as bs import pandas as pd import numpy as np import time import random import logging import sys import os import argparse from datetime import datetime, timedelta from tqdm import tqdm
配置日志
logging.basicConfig(level=logging.INFO, format=’%(asctime)s - %(levelname)s - %(message)s’)
def calculate_brooks_strategy(df, daily_ema10, target_time_marker=None): “”” 修改点:增加 target_time_marker 参数,并实现基于时间点的信号判断 “”” # 1. 预处理数据类型 df[[‘open’, ‘close’, ‘high’, ‘low’]] = df[[‘open’, ‘close’, ‘high’, ‘low’]].astype(float)
# 预计算 (必须基于全量数据,保证上下文准确)
df['ema20'] = df['close'].ewm(span=20, adjust=False).mean()
df['ema50'] = df['close'].ewm(span=50, adjust=False).mean()
df['body_size'] = abs(df['close'] - df['open'])
df['avg_body_5'] = df['body_size'].rolling(5).mean()
df['is_bull'] = df['close'] > df['open']
df['daily_pct'] = (df['close'] - df['close'].shift(1)) / df['close'].shift(1) * 100
df['is_breakout'] = df['high'] > df['high'].shift(1)
# 2. 状态机逻辑 (保持原样)
res, sup = df['high'].iloc[0], df['low'].iloc[0]
df['state'], df['bull_count'], df['range_test_count'], df['trade_count'] = 1, 0, 0, 0
df['is_in_cool_down'] = False
prev_state, in_cool_down = 1, False
ma20 = df['close'].rolling(20).mean()
atr20 = (df['high'] - df['low']).rolling(20).mean()
for i in range(20, len(df)):
threshold = atr20.iloc[i] * 1.5 if not pd.isna(atr20.iloc[i]) else 0
is_in_range = (df['high'].iloc[i] < (ma20.iloc[i] + threshold)) and (df['low'].iloc[i] > (ma20.iloc[i] - threshold))
state = 1 if is_in_range else (2 if df['close'].iloc[i] > ma20.iloc[i] else 3)
df.loc[df.index[i], 'state'] = state
res, sup = max(res, df['high'].iloc[i]), min(sup, df['low'].iloc[i])
df.loc[df.index[i], ['dynamic_res', 'dynamic_sup']] = [res, sup]
# 冷却与计数
if prev_state == 3 and state == 2: in_cool_down = True
elif in_cool_down and state != 2: in_cool_down = False
df.loc[df.index[i], 'is_in_cool_down'] = in_cool_down
if state != prev_state: df.loc[df.index[i], 'trade_count'] = 0
else: df.loc[df.index[i], 'trade_count'] = df.loc[df.index[i-1], 'trade_count'] + (1 if df['is_bull'].iloc[i] else 0)
if state == 2 and not df['is_bull'].iloc[i]: df.loc[df.index[i], 'bull_count'] = df.loc[df.index[i-1], 'bull_count'] + 1
elif df['is_bull'].iloc[i]: df.loc[df.index[i], 'bull_count'] = 0
prev_state = state
# 3. 信号组件
common_filters = (df['body_size'] / (df['high'] - df['low']).replace(0, 0.0001) > 0.4) & \
((df['close'] - df['open']) / df['open'] * 100).between(1.01, 6.99) & \
df['is_breakout'] & (df['daily_pct'] <= 7)
momentum_signal = (df['state'] == 2) & (df['is_breakout'] & (df['daily_pct'] <= 7)).rolling(3).apply(lambda x: all(x), raw=True)
h_signals = (df['state'] == 2) & df['is_bull'] & (df['bull_count'].shift(1) >= 1) & df['is_breakout'] & common_filters
is_hammer_bar = df['is_bull'] & ((np.minimum(df['open'], df['close']) - df['low']) > df['body_size'] * 2) & (df['low'] <= df['dynamic_sup'])
# 4. 拆解逻辑 (为了调试)
initial_breakout = (df['state'] == 2) & (~df['is_in_cool_down']) & ((df['close']-df['open'])/df['open']*100 > 1.01) & (df['daily_pct'] < 9.95) & (df['close'] >= df['dynamic_res'])
cond_state = (df['state'] != 3)
cond_ema_price = (df['close'] > daily_ema10) & (df['close'] > df['ema20'])
cond_ema_trend = (df['ema20'] > df['ema50'])
is_signal_triggered = (initial_breakout |
((df['state'] == 1) & (df['range_test_count'].shift(1) >= 1) & ((df['body_size']/(df['high']-df['low']) > 0.9) | is_hammer_bar)) |
(h_signals | momentum_signal | is_hammer_bar))
go_long = cond_state & cond_ema_price & cond_ema_trend & is_signal_triggered
# --- 诊断打印 ---
if target_time_marker:
mask = df['time'].str.contains(target_time_marker)
if not mask.any(): return False
idx = df.index[mask][-1]
print(f"\n>>> DEBUG: {df.loc[idx, 'time']} | 状态:{df.loc[idx, 'state']} | go_long:{go_long.loc[idx]}")
print(f"状态有效 (state!=3): {cond_state.loc[idx]}")
print(f"均线价格支撑 (price > ema10 & ema20): {cond_ema_price.loc[idx]}")
print(f"多头趋势 (ema20 > ema50): {cond_ema_trend.loc[idx]}")
print(f"具体触发信号 (is_signal_triggered): {is_signal_triggered.loc[idx]}")
print(f" - initial_breakout: {initial_breakout.loc[idx]}")
print(f" - momentum_signal: {momentum_signal.loc[idx]}")
print(f" - h_signals: {h_signals.loc[idx]}")
print(f" - is_hammer_bar: {is_hammer_bar.loc[idx]}\n")
return go_long.loc[idx]
return go_long.iloc[-1]
— 工具:消息推送 (带去重逻辑) —
def send_to_phone(title, content, code): # 简单的本地去重:如果最近发过该股票,则跳过 file_path = “last_sent.txt” # 获取当前小时标识 current_hour_id = f”{code}_{datetime.now().strftime(‘%Y%m%d%H’)}”
if os.path.exists(file_path):
with open(file_path, "r") as f:
if current_hour_id in f.read():
return # 该小时已发过,不再推送
token = '7e7cf8f4208a41f88876783ef54e01a0'
url = f"http://www.pushplus.plus/send?token={token}&title={title}&content={content}&template=html"
try:
requests.get(url)
with open(file_path, "a") as f:
f.write(current_hour_id + "\n")
except Exception as e:
print(f"推送失败: {e}")
— 增加一个工具函数来提取 BaoStock 数据 —
def fetch_baostock_data(rs): data_list = [] while (rs.error_code == ‘0’) & rs.next(): data_list.append(rs.get_row_data()) return pd.DataFrame(data_list, columns=rs.fields)
— 环境检测与交互逻辑 —
def get_config(): # 使用 argparse 来控制是否开启交互 parser = argparse.ArgumentParser() parser.add_argument(‘–manual’, action=’store_true’, help=”强制开启手动输入模式”) args, unknown = parser.parse_known_args()
# 只有当显式指定了 --manual 参数时,才弹出提示
if args.manual:
print("-" * 30)
user_input = input("请输入股票代码 (回车全市场扫描): ").strip()
time_input = input("请输入回测时间 (格式: 2026-05-26 10:30, 回车跳过): ").strip()
target_date, target_time_marker = None, None
if time_input:
parts = time_input.split()
target_date = parts[0]
if len(parts) > 1:
target_time_marker = parts[1].replace(":", "")
return user_input, target_date, target_time_marker
# 否则直接进入自动模式
print(">>> [自动模式] 未检测到 --manual 参数,使用最新数据自动扫描。")
return None, None, None
— 主程序 —
def main(): if not bs.login().error_code == ‘0’: print(“登录失败,请检查网络。”) return
target_code_in, target_date, target_time_marker = get_config()
# 2. 处理股票代码 (兼容6位,补全为9位)
target_code = None
if target_code_in:
# 去掉可能存在的空格
code_str = target_code_in.strip()
# 判断前缀:60, 68, 58 开头加 sh.,其他加 sz.
if code_str.startswith(('60', '68', '58')):
target_code = f"sh.{code_str}"
else:
target_code = f"sz.{code_str}"
# 强制校验长度
if len(target_code) != 9:
print(f"错误:转换后的代码 {target_code} 长度不为 9,请检查输入!")
return
# 时间基准
today = datetime.now()
today_str = target_date if target_date else today.strftime("%Y-%m-%d")
today_dt = datetime.strptime(today_str, "%Y-%m-%d")
index_start = (today_dt - timedelta(days=60)).strftime("%Y-%m-%d")
stock_start = (today_dt - timedelta(days=30)).strftime("%Y-%m-%d")
# 4. 获取股票列表
if target_code:
stocks = pd.DataFrame({'code': [target_code], 'code_name': ['指定分析']})
else:
print(">>> 正在加载全市场股票...")
rs = bs.query_all_stock(day=today_str)
stocks = fetch_baostock_data(rs)
# --- 过滤逻辑 ---
# 1. 除去科创板(688开头)
stocks = stocks[~stocks['code'].str.contains('688')]
# 2. 除去 ETF (以 51 或 15 开头)
# 注意:sh.51xxxx, sz.15xxxx 是常见的 ETF 代码
# 更加稳健的写法:判断代码是否以特定前缀开头
# 这样完全避开了正则表达式,不会有任何警告
is_etf = stocks['code'].str.startswith(('sh.51', 'sh.52', 'sh.53', 'sh.55', 'sh.56', 'sh.58', 'sz.15'))
stocks = stocks[~is_etf]
# 3. 除去所有包含 'ST' 的股票名称
stocks = stocks[~stocks['code_name'].str.contains('ST')]
# 4. 仅保留主板和创业板 (sh/sz)
stocks = stocks[stocks['code'].str.contains('sh\.|sz\.')]
# --- 扫描前环境信息 ---
print("-" * 50)
print(f"分析日期: {today_str}")
print(f"扫描模式: {'单只' if target_code else '全市场'} ({len(stocks)} 只)")
print("-" * 50)
# 初始化容器
signal_list = []
# 5. 执行分析
for _, row in tqdm(stocks.iterrows(), total=len(stocks), desc="Brooks 扫描中"):
try:
rs_hourly = bs.query_history_k_data_plus(row['code'], "date,time,open,high,low,close",
start_date=stock_start, end_date=today_str,
frequency="60", adjustflag="3")
df_hourly = fetch_baostock_data(rs_hourly)
if len(df_hourly) < 50: continue
# --- 调整后的逻辑 ---
if target_time_marker:
# 过滤出符合条件的行
target_df = df_hourly[df_hourly['time'].str.contains(target_time_marker)]
# 取最后一个符合条件的时间点 (即该小时的 K 线)
target_bar = target_df.iloc[-1]
else:
# 默认扫描最新行情
target_bar = df_hourly.iloc[-1]
# 从目标行提取信息
last_time_str = target_bar['time']
formatted_time = f"{last_time_str[8:10]}:{last_time_str[10:12]}"
last_price = target_bar['close']
# 打印信息
status_info = f"[{row.get('code_name', 'N/A')} {row['code']}] 时间: {formatted_time} 价格: {last_price}"
tqdm.write(f"正在扫描: {status_info}")
# 在循环内部,获取该个股的日线数据用于计算自己的 EMA10
rs_daily = bs.query_history_k_data_plus(row['code'], "close",
start_date=(today_dt - timedelta(days=60)).strftime("%Y-%m-%d"),
end_date=today_str, frequency="d")
df_daily = fetch_baostock_data(rs_daily)
# 计算个股自己的日线 EMA10
individual_daily_ema10 = df_daily['close'].astype(float).ewm(span=10, adjust=False).mean().iloc[-1]
# 判断策略
if calculate_brooks_strategy(df_hourly, individual_daily_ema10, target_time_marker):
# 将信号存入列表
signal_info = {
'name': row.get('code_name', 'N/A'),
'code': row['code'],
'time': formatted_time,
'price': last_price
}
signal_list.append(signal_info)
green_start = "\033[92m"
reset = "\033[0m"
tqdm.write(f"{green_start}✅ 发现信号: {row.get('code_name', 'N/A')} ({row['code']}){reset}")
# --- 【新增】发送到手机 ---
msg = f"股票: {signal_info['name']}<br>代码: {signal_info['code']}<br>时间: {formatted_time}<br>价格: {last_price}"
send_to_phone("Brooks 策略信号提醒", msg)
time.sleep(0.05)
except Exception:
continue
bs.logout()
# --- 扫描结束后的汇总列表 ---
print("\n" + "="*50)
print(f"扫描完成,共发现 {len(signal_list)} 只符合条件的股票:")
print(f"{'名称':<10} | {'代码':<12} | {'最新时间':<10} | {'价格':<8}")
print("-" * 50)
for sig in signal_list:
print(f"{sig['name']:<10} | {sig['code']:<12} | {sig['time']:<10} | {sig['price']:<8}")
print("="*50)
if name == “main”: main()