Skip to content

Commit

Permalink
abu量化交易系统0.10
Browse files Browse the repository at this point in the history
  • Loading branch information
bbfamily committed Aug 23, 2017
1 parent f4b7bda commit 815d6c3
Show file tree
Hide file tree
Showing 302 changed files with 273,072 additions and 2 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.ipynb_checkpoints*
.idea
.DS_Store
logging.log
p.sh
*.pyc
tmp
test.py
8 changes: 8 additions & 0 deletions abupy/AlphaBu/ABuAlpha.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from __future__ import absolute_import

# noinspection PyUnresolvedReferences
from .ABuPickStockExecute import do_pick_stock_work
# noinspection PyUnresolvedReferences
from .ABuPickTimeExecute import do_symbols_with_same_factors, do_symbols_with_diff_factors
# noinspection all
from . import ABuPickTimeWorker as pick_time_worker
63 changes: 63 additions & 0 deletions abupy/AlphaBu/ABuPickBase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# -*- encoding:utf-8 -*-
"""
择时与选股抽象基类
"""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division

from abc import ABCMeta, abstractmethod

from ..CoreBu.ABuFixes import six
from ..CoreBu.ABuBase import AbuParamBase

__author__ = '阿布'
__weixin__ = 'abu_quant'


class AbuPickTimeWorkBase(six.with_metaclass(ABCMeta, AbuParamBase)):
"""择时抽象基类"""

@abstractmethod
def fit(self, *args, **kwargs):
"""
fit在整个项目中的意义为开始对象最重要的工作,
对于择时对象即为开始择时操作,或者从字面理解
开始针对交易数据进行拟合择时操作
"""
pass

@abstractmethod
def init_sell_factors(self, *args, **kwargs):
"""
初始化择时卖出因子
"""
pass

@abstractmethod
def init_buy_factors(self, *args, **kwargs):
"""
初始化择时买入因子
"""
pass


class AbuPickStockWorkBase(six.with_metaclass(ABCMeta, AbuParamBase)):
"""选股抽象基"""

@abstractmethod
def fit(self, *args, **kwargs):
"""
fit在整个项目中的意义为开始对象最重要的工作,
对于选股对象即为开始选股操作,或者从字面理解
开始针对交易数据进行拟合选股操作
"""
pass

@abstractmethod
def init_stock_pickers(self, *args, **kwargs):
"""
初始化选股因子
"""
pass
51 changes: 51 additions & 0 deletions abupy/AlphaBu/ABuPickStockExecute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# -*- encoding:utf-8 -*-
"""
包装选股worker进行,完善前后工作
"""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division

from .ABuPickStockWorker import AbuPickStockWorker
from ..CoreBu.ABuEnvProcess import add_process_env_sig
from ..MarketBu.ABuMarket import split_k_market
from ..TradeBu.ABuKLManager import AbuKLManager
from ..CoreBu.ABuFixes import ThreadPoolExecutor

__author__ = '阿布'
__weixin__ = 'abu_quant'


@add_process_env_sig
def do_pick_stock_work(choice_symbols, benchmark, capital, stock_pickers):
"""
包装AbuPickStockWorker进行选股
:param choice_symbols: 初始备选交易对象序列
:param benchmark: 交易基准对象,AbuBenchmark实例对象
:param capital: 资金类AbuCapital实例化对象
:param stock_pickers: 选股因子序列
:return:
"""
kl_pd_manager = AbuKLManager(benchmark, capital)
stock_pick = AbuPickStockWorker(capital, benchmark, kl_pd_manager, choice_symbols=choice_symbols,
stock_pickers=stock_pickers)
stock_pick.fit()
return stock_pick.choice_symbols


@add_process_env_sig
def do_pick_stock_thread_work(choice_symbols, benchmark, capital, stock_pickers, n_thread):
"""包装AbuPickStockWorker启动线程进行选股"""
result = []

def when_thread_done(r):
result.extend(r.result())

with ThreadPoolExecutor(max_workers=n_thread) as pool:
thread_symbols = split_k_market(n_thread, market_symbols=choice_symbols)
for symbols in thread_symbols:
future_result = pool.submit(do_pick_stock_work, symbols, benchmark, capital, stock_pickers)
future_result.add_done_callback(when_thread_done)

return result
108 changes: 108 additions & 0 deletions abupy/AlphaBu/ABuPickStockMaster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# -*- encoding:utf-8 -*-
"""
选股并行多任务调度模块
"""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division

import itertools
import logging


from .ABuPickStockExecute import do_pick_stock_work, do_pick_stock_thread_work
from ..CoreBu import ABuEnv
from ..CoreBu.ABuEnv import EMarketDataFetchMode
from ..CoreBu.ABuEnvProcess import AbuEnvProcess
from ..MarketBu.ABuMarket import split_k_market, all_symbol
from ..MarketBu import ABuMarket
from ..CoreBu.ABuFixes import partial
from ..CoreBu.ABuParallel import delayed, Parallel
from ..CoreBu.ABuDeprecated import AbuDeprecated


class AbuPickStockMaster(object):
"""选股并行多任务调度类"""

@classmethod
def do_pick_stock_with_process(cls, capital, benchmark, stock_pickers, choice_symbols=None,
n_process_pick_stock=ABuEnv.g_cpu_cnt,
callback=None):
"""
选股并行多任务对外接口
:param capital: 资金类AbuCapital实例化对象
:param benchmark: 交易基准对象,AbuBenchmark实例对象
:param stock_pickers: 选股因子序列
:param choice_symbols: 初始备选交易对象序列
:param n_process_pick_stock: 控制启动多少进程并行选股操作
:param callback: 并行选股工作函数
:return: 最终选股结果序列
"""
input_choice_symbols = True
if choice_symbols is None or len(choice_symbols) == 0:
choice_symbols = all_symbol()
input_choice_symbols = False

if n_process_pick_stock <= 0:
# 因为下面要n_process > 1做判断而且要根据n_process_pick_stock来split_k_market
n_process_pick_stock = ABuEnv.g_cpu_cnt
if stock_pickers is not None:

# TODO 需要区分hdf5和csv不同存贮情况,csv存贮模式下可以并行读写
# 只有E_DATA_FETCH_FORCE_LOCAL才进行多任务模式,否则回滚到单进程模式n_process = 1
if n_process_pick_stock > 1 and ABuEnv.g_data_fetch_mode != EMarketDataFetchMode.E_DATA_FETCH_FORCE_LOCAL:
# 1. hdf5多进程容易写坏数据,所以只多进程读取,不并行写入
# 2. MAC OS 10.9 之后并行联网+numpy 系统bug crash,卡死等问题
logging.info('batch get only support E_DATA_FETCH_FORCE_LOCAL for Parallel!')
n_process_pick_stock = 1

# 根据输入的choice_symbols和要并行的进程数,分配symbol到n_process_pick_stock进程中
process_symbols = split_k_market(n_process_pick_stock, market_symbols=choice_symbols)

# 因为切割会有余数,所以将原始设置的进程数切换为分割好的个数, 即32 -> 33 16 -> 17
if n_process_pick_stock > 1:
n_process_pick_stock = len(process_symbols)

parallel = Parallel(
n_jobs=n_process_pick_stock, verbose=0, pre_dispatch='2*n_jobs')

if callback is None:
callback = do_pick_stock_work

# do_pick_stock_work被装饰器add_process_env_sig装饰,需要进程间内存拷贝对象AbuEnvProcess
p_nev = AbuEnvProcess()
# 开始并行任务执行
out_choice_symbols = parallel(delayed(callback)(choice_symbols, benchmark,
capital,
stock_pickers, env=p_nev)
for choice_symbols in process_symbols)

# 将每一个进程返回的选股序列合并成一个序列
choice_symbols = list(itertools.chain.from_iterable(out_choice_symbols))

"""
如下通过env中的设置来切割训练集,测试集数据,或者直接使用训练集,测试集,
注意现在的设置有优先级,即g_enable_last_split_test > g_enable_last_split_train > g_enable_train_test_split
TODO: 使用enum替换g_enable_last_split_test, g_enable_last_split_train, g_enable_train_test_split设置
"""
if not input_choice_symbols and ABuEnv.g_enable_last_split_test:
# 只使用上次切割好的测试集交易对象
choice_symbols = ABuMarket.market_last_split_test()
elif not input_choice_symbols and ABuEnv.g_enable_last_split_train:
# 只使用上次切割好的训练集交易对象
choice_symbols = ABuMarket.market_last_split_train()
elif ABuEnv.g_enable_train_test_split:
# 切割训练集交易对象与测试集交易对象,返回训练集交易对象
choice_symbols = ABuMarket.market_train_test_split(ABuEnv.g_split_tt_n_folds, choice_symbols)

return choice_symbols

@classmethod
@AbuDeprecated('hdf5 store mode will crash or dead!')
def do_pick_stock_with_process_mix_thread(cls, capital, benchmark, stock_pickers, choice_symbols=None, n_process=8,
n_thread=10):
"""Deprecated不应该使用,因为默认hdf5多线程读取会有问题"""
callback = partial(do_pick_stock_thread_work, n_thread=n_thread)
return cls.do_pick_stock_with_process(capital, benchmark, stock_pickers, choice_symbols=choice_symbols,
n_process_pick_stock=n_process, callback=callback)
143 changes: 143 additions & 0 deletions abupy/AlphaBu/ABuPickStockWorker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# -*- encoding:utf-8 -*-
"""
选股具体工作者,整合金融时间序列,选股因子,资金类进行
选股操作,在择时金融时间序列之前一段时间上迭代初始交易对象
进行选股因子的拟合操作
"""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division

import copy

from .ABuPickBase import AbuPickStockWorkBase
from ..MarketBu.ABuMarket import all_symbol
from ..PickStockBu.ABuPickStockBase import AbuPickStockBase
from ..UtilBu.ABuProgress import AbuMulPidProgress

__author__ = '阿布'
__weixin__ = 'abu_quant'


class AbuPickStockWorker(AbuPickStockWorkBase):
"""选股类"""

def __init__(self, capital, benchmark, kl_pd_manager, choice_symbols=None, stock_pickers=None):
"""
:param capital: 资金类AbuCapital实例化对象
:param benchmark: 交易基准对象,AbuBenchmark实例对象
:param kl_pd_manager: 金融时间序列管理对象,AbuKLManager实例
:param choice_symbols: 初始备选交易对象序列
:param stock_pickers: 选股因子序列
"""
self.capital = capital
self.benchmark = benchmark
self.choice_symbols = choice_symbols
self.kl_pd_manager = kl_pd_manager
self.stock_pickers = []
self.first_stock_pickers = []
self.init_stock_pickers(stock_pickers)

def __str__(self):
"""打印对象显示:选股因子序列+选股交易对象"""
return 'stock_pickers:{}\nchoice_symbols:{}'.format(self.stock_pickers, self.choice_symbols)

__repr__ = __str__

def init_stock_pickers(self, stock_pickers):
"""
通过stock_pickers实例化各个选股因子
:param stock_pickers:list中元素为dict,每个dict为因子的构造元素,如class,构造参数等
:return:
"""
if stock_pickers is not None:
for picker_class in stock_pickers:
if picker_class is None:
continue

if 'class' not in picker_class:
# 必须要有需要实例化的类信息
raise ValueError('picker_class class key must name class !!!')

picker_class_cp = copy.deepcopy(picker_class)
# pop出类信息后剩下的都为类需要的参数
class_fac = picker_class_cp.pop('class')
# 整合capital,benchmark等实例化因子对象
picker = class_fac(self.capital, self.benchmark, **picker_class_cp)

if not isinstance(picker, AbuPickStockBase):
# 因子对象类型检测
raise TypeError('factor must base AbuPickStockBase')

if 'first_choice' in picker_class and picker_class['first_choice']:
# 如果参数设置first_choice且是True, 添加到first_stock_pickers选股序列
self.first_stock_pickers.append(picker)
else:
self.stock_pickers.append(picker)
if self.choice_symbols is None or len(self.choice_symbols) == 0:
# 如果参数中初始备选交易对象序列为none, 从对应市场中获取所有的交易对象,详情查阅all_symbol
self.choice_symbols = all_symbol()

def fit(self):
"""
选股开始工作,与择时不同,选股是数据多对多,
即多个交易对象对多个选股因子配合资金基准等参数工作
:return:
"""

def _first_batch_fit():
"""
first_choice选股:针对备选池进行选股,迭代选股因子,使用因子的fit_first_choice方法
即因子内部提供批量选股高效的首选方法
:return:
"""
if self.first_stock_pickers is None or len(self.first_stock_pickers) == 0:
# 如果没有first_stock_picker要返回self.choice_symbols,代表没有投任何反对票,全部通过
return self.choice_symbols

# 首选将所有备选对象赋予inner_first_choice_symbols
inner_first_choice_symbols = self.choice_symbols
with AbuMulPidProgress(len(self.first_stock_pickers), 'pick first_choice stocks complete') as progress:
for epoch, first_choice in enumerate(self.first_stock_pickers):
progress.show(epoch + 1)
# 每一个选股因子通过fit_first_choice对inner_first_choice_symbols进行筛选,滤网一层一层过滤
inner_first_choice_symbols = first_choice.fit_first_choice(self, inner_first_choice_symbols)
return inner_first_choice_symbols

def _batch_fit():
"""
普通选股:针对备选池进行选股,迭代初始选股序列,在迭代中再迭代选股因子,选股因子决定是否对
symbol投出反对票,一旦一个因子投出反对票,即筛出序列,一票否决
:return:
"""
if self.stock_pickers is None or len(self.stock_pickers) == 0:
# 如果没有stock_pickers要返回self.choice_symbols,代表没有投任何反对票,全部通过
return self.choice_symbols

with AbuMulPidProgress(len(self.choice_symbols), 'pick stocks complete') as progress:
# 启动选股进度显示
inner_choice_symbols = []
for epoch, target_symbol in enumerate(self.choice_symbols):
progress.show(epoch + 1)

add = True
for picker in self.stock_pickers:
kl_pd = self.kl_pd_manager.get_pick_stock_kl_pd(target_symbol, picker.xd, picker.min_xd)
if kl_pd is None:
# 注意get_pick_stock_kl_pd内部对选股金融序列太少的情况进行过滤,详情get_pick_stock_kl_pd
add = False
break
sub_add = picker.fit_pick(kl_pd, target_symbol)
if sub_add is False:
# 只要一个选股因子投了反对票,就刷出
add = False
break
if add:
inner_choice_symbols.append(target_symbol)
return inner_choice_symbols

# 筛选各个因first_choice序列,返回给self.choice_symbols,_batch_fit继续晒
self.choice_symbols = _first_batch_fit()
# 通过两次迭代继续筛选
self.choice_symbols = _batch_fit()
Loading

0 comments on commit 815d6c3

Please sign in to comment.