1042 lines
36 KiB
Python
1042 lines
36 KiB
Python
from backtesting import Backtest, Strategy
|
|
from trade.candle import CandlePatterns
|
|
import talib, numpy, math
|
|
from signal_helper import *
|
|
|
|
from pyti import stochrsi
|
|
from pyti import chande_momentum_oscillator
|
|
|
|
import numpy as np
|
|
from scipy.signal import argrelextrema
|
|
|
|
import sys
|
|
|
|
|
|
class StrategyCandlePattern(Strategy):
|
|
use_indicators = None
|
|
indicators_data = None
|
|
_use_patterns = None
|
|
pattern_data = None
|
|
_sl_percent = 0.03 # 4%
|
|
_r_sl_percent = 0.05 # 4%
|
|
|
|
# Stop Profit/Loss
|
|
up_target = None
|
|
down_target = None
|
|
|
|
# 캔들 패턴
|
|
cp = CandlePatterns()
|
|
|
|
# 매수 패턴
|
|
_positive_patterns = cp.get_long_patterns()
|
|
_negative_patterns = cp.get_short_patterns()
|
|
_fence_patterns = cp.get_fence_patterns()
|
|
|
|
data_len = None
|
|
|
|
# position_closed_time
|
|
position_closed_time = None
|
|
|
|
def init(self):
|
|
date = self.data.Date
|
|
close = self.data.Close
|
|
low = self.data.Low
|
|
high = self.data.High
|
|
open = self.data.Open
|
|
volume = self.data.Volume
|
|
|
|
self.data_len = len(close) - 1
|
|
|
|
data = [list()] * len(self.data.Close)
|
|
|
|
for p in self.use_indicators:
|
|
# for test
|
|
print('입력된 보조지표 :', p)
|
|
|
|
indicator = None
|
|
values = None
|
|
f = None
|
|
|
|
indicator, values = list(p.items())[0]
|
|
|
|
if indicator != 'DI' and hasattr(talib, indicator):
|
|
f = getattr(talib, indicator)
|
|
|
|
if indicator is 'CDLPTN':
|
|
|
|
for i in range(0, len(close)):
|
|
t_d = []
|
|
|
|
if self.data.Candle_pattern[i] is None:
|
|
continue
|
|
|
|
pattern, value = list(self.data.Candle_pattern[i].items())[0]
|
|
|
|
if pattern in self._fence_patterns: # 중립 패턴
|
|
t_d.append(is_trade_fence_pattern(pattern, value))
|
|
elif pattern in self._positive_patterns: # 매수 패턴
|
|
t_d.append(True)
|
|
elif pattern in self._negative_patterns: # 매도 패턴
|
|
t_d.append(False)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
# 보조지표 추가 시작
|
|
|
|
elif indicator is 'STOCH': # 주도 지표
|
|
slowk, slowd = f(high, low, close, fastk_period=values[0],
|
|
slowk_period=values[1], slowd_period=values[2])
|
|
|
|
start = 1 + numpy.isnan(slowk).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(slowk)):
|
|
t_d = []
|
|
|
|
if crossover(slowk[i], slowk[i - 1], slowd[i], slowd[i - 1]) and slowd[i] < 50:
|
|
is_trade = True
|
|
elif crossover(slowd[i], slowd[i - 1], slowk[i], slowk[i - 1]) and slowd[i] > 50:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'ADX' or indicator is 'ADXR': # 베이스 지표
|
|
res_arr = f(high, low, close, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if res_arr[i] < 13:
|
|
t_d.append(True)
|
|
elif res_arr[i] > 45:
|
|
t_d.append(False)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'DI': # 주도 지표
|
|
res_arr_plus = talib.PLUS_DI(high, low, close, timeperiod=values)
|
|
res_arr_minus = talib.MINUS_DI(high, low, close, timeperiod=values)
|
|
|
|
start = 1 + numpy.isnan(res_arr_plus).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr_plus)):
|
|
t_d = []
|
|
|
|
if crossover(res_arr_plus[i], res_arr_plus[i - 1], res_arr_minus[i], res_arr_minus[i - 1]):
|
|
is_trade = True
|
|
elif crossover(res_arr_minus[i], res_arr_minus[i - 1], res_arr_plus[i], res_arr_plus[i - 1]):
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'APO': # 주도 지표
|
|
res_arr = f(close, fastperiod=values[0], slowperiod=values[1], matype=0)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if res_arr[i] > 0:
|
|
is_trade = True
|
|
elif res_arr[i] < 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'AROON': # 주도 지표 => 추격
|
|
aroondown, aroonup = f(high, low, timeperiod=values)
|
|
start = 1 + numpy.isnan(aroondown).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(aroondown)):
|
|
t_d = []
|
|
|
|
if crossover(aroonup[i], aroonup[i - 1], aroondown[i], aroondown[i - 1]):
|
|
is_trade = True
|
|
elif crossover(aroondown[i], aroondown[i - 1], aroonup[i], aroonup[i - 1]):
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'AROONOSC': # 베이스 지표 - 추세
|
|
res_arr = f(high, low, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
# 0 초과 상승추세, 0 미만 하락 추세
|
|
if res_arr[i] > 0 and res_arr[i-1] < 0:
|
|
is_trade = True
|
|
elif res_arr[i] < 0 and res_arr[i-1] > 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
# print(date[i], res_arr[i])
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'BOP': # 베이스 지표
|
|
res_arr = f(open, high, low, close)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if res_arr[i] > 0 and res_arr[i - 1] < 0:
|
|
is_trade = True
|
|
elif res_arr[i] < 0 and res_arr[i - 1] > 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'CCI': # 베이스 지표 / 추세
|
|
res_arr = f(close, close, close, timeperiod=values)
|
|
# res_arr = f(high, low, close, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
# 베이스
|
|
if res_arr[i] < -100: # 과매도 -100 / 과매수 +100
|
|
is_trade = True
|
|
elif res_arr[i] > 100:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'CMO': # 베이스 지표 / 추세 => 수치 불일치
|
|
# res_arr = f(close, timeperiod=values)
|
|
res_arr = chande_momentum_oscillator.chande_momentum_oscillator(close, values)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if res_arr[i] > 0: # 과매도 -100 / 과매수 +100
|
|
is_trade = True
|
|
elif res_arr[i] < 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'MACD': # 주도 지표
|
|
macd, macdsignal, macdhist = f(close,
|
|
fastperiod=values[0],
|
|
slowperiod=values[1],
|
|
signalperiod=values[2])
|
|
|
|
start = 1 + numpy.isnan(macd).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(macd)):
|
|
t_d = []
|
|
|
|
if crossover(macd[i], macd[i - 1], macdsignal[i], macdsignal[i - 1]):
|
|
is_trade = True
|
|
elif crossover(macdsignal[i], macdsignal[i - 1], macd[i], macd[i - 1]):
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'MACDFIX': # 주도 지표
|
|
macd, macdsignal, macdhist = f(close, signalperiod=values)
|
|
start = 1 + numpy.isnan(macd).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(macd)):
|
|
t_d = []
|
|
|
|
if crossover(macd[i], macd[i - 1], macdsignal[i], macdsignal[i - 1]):
|
|
is_trade = True
|
|
elif crossover(macdsignal[i], macdsignal[i - 1], macd[i], macd[i - 1]):
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'MFI': # 베이스 지표 / 역추세
|
|
res_arr = f(close, close, close, volume, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
# 베이스
|
|
if res_arr[i] > 20 and res_arr[i-1] < 20:
|
|
is_trade = True
|
|
elif res_arr[i] > 80 and res_arr[i-1] < 80:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'MOM': # 베이스 지표 / 추세
|
|
res_arr = f(close, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
# 베이스
|
|
if res_arr[i] > 0 and res_arr[i-1] < 0:
|
|
is_trade = True
|
|
elif res_arr[i] < 0 and res_arr[i-1] > 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'PPO': # 주도 지표
|
|
ppo = f(close, fastperiod=values[0], slowperiod=values[1], matype=1)
|
|
ppo_slow = moving_average(ppo, values[2])
|
|
|
|
start = 1 + numpy.isnan(ppo_slow).sum()
|
|
|
|
is_trade = None
|
|
for i in range(start, len(ppo)):
|
|
t_d = []
|
|
|
|
if crossover(ppo[i], ppo[i - 1], ppo_slow[i], ppo_slow[i - 1]):
|
|
is_trade = True
|
|
elif crossover(ppo_slow[i], ppo_slow[i - 1], ppo[i], ppo[i - 1]):
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'ROC': # 베이스 지표 / 추세
|
|
res_arr = f(close, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if res_arr[i] > 0 and res_arr[i-1] < 0:
|
|
is_trade = True
|
|
elif res_arr[i] < 0 and res_arr[i-1] > 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'ROCP' or indicator is 'ROCR' or indicator is 'ROCR100': # 베이스 지표 / 추세
|
|
res_arr = f(close, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
# 베이스
|
|
if res_arr[i] > 0:
|
|
t_d.append(True)
|
|
elif res_arr[i] < 0:
|
|
t_d.append(False)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'STOCHF': # 주도 지표
|
|
fastk, fastd = f(high, low, close, fastk_period=values[0], fastd_period=values[1], fastd_matype=0)
|
|
start = 1 + numpy.isnan(fastk).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(fastk)):
|
|
t_d = []
|
|
|
|
# 높은 거래 빈도로 인해 매매조건 추가
|
|
if fastd[i] < 40 and crossover(fastk[i], fastk[i - 1], fastd[i], fastd[i - 1]):
|
|
is_trade = True
|
|
elif fastd[i] > 60 and crossover(fastd[i], fastd[i - 1], fastk[i], fastk[i - 1]):
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'STOCHRSI': # 주도 지표
|
|
rsi_k, rsi_d = stoch_rsi(close, values[0], values[1], values[2])
|
|
|
|
start = 1 + numpy.isnan(rsi_d).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(rsi_d)):
|
|
t_d = []
|
|
|
|
if rsi_k[i] < 25 and crossover(rsi_k[i], rsi_k[i-1], rsi_d[i], rsi_d[i-1]):
|
|
is_trade = True
|
|
elif rsi_k[i] > 75 and crossover(rsi_d[i], rsi_d[i-1], rsi_k[i], rsi_k[i-1]):
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'TRIX': # 베이스 지표 / 역추세 => 매수/매도 시점 괜찮다
|
|
trix = f(close, timeperiod=values[0])
|
|
trix_signal = moving_average(trix, values[1])
|
|
|
|
start = 1 + numpy.isnan(trix_signal).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(trix_signal)):
|
|
t_d = []
|
|
|
|
if crossover(trix[i], trix[i-1], trix_signal[i], trix_signal[i-1]):
|
|
is_trade = True
|
|
elif crossover(trix_signal[i], trix_signal[i-1], trix[i], trix[i-1]):
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'ULTOSC': # 주도 지표
|
|
res_arr = f(high, low, close, timeperiod1=values[0], timeperiod2=values[1], timeperiod3=values[2])
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if res_arr[i] < 30:
|
|
is_trade = True
|
|
elif res_arr[i] > 70:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'WILLR': # 베이스 지표 => 강세장에서 효과를 발휘
|
|
res_arr = f(high, low, close, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
# print(date[i], res_arr[i])
|
|
if res_arr[i] > -20:
|
|
is_trade = True
|
|
elif res_arr[i] < -80:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'BBANDS': # 베이스 지표 / 역추세
|
|
upperband, middleband, lowerband = f(close, timeperiod=values[0], nbdevup=values[1], nbdevdn=values[1],
|
|
matype=0)
|
|
|
|
start = 1 + numpy.isnan(upperband).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(upperband)):
|
|
t_d = []
|
|
|
|
if high[i] > upperband[i] and upperband[i] > upperband[i-1] and low[i] > middleband[i]:
|
|
is_trade = True
|
|
elif low[i] < lowerband[i] and lowerband[i] < lowerband[i-1] and high[i] < middleband[i]:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'EMA' or \
|
|
indicator is 'DEMA' or \
|
|
indicator is 'MA' or \
|
|
indicator is 'SMA': # 주도 지표 / 추세
|
|
|
|
res_arr = f(close, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if close[i] > res_arr[i]:
|
|
is_trade = True
|
|
elif close[i] < res_arr[i]:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'MAMA': # 주도 지표 / 추세
|
|
mama, fama = f(close, fastlimit=values[0], slowlimit=values[1])
|
|
|
|
start = 1 + numpy.isnan(mama).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(mama)):
|
|
t_d = []
|
|
|
|
if mama[i] > fama[i]:
|
|
is_trade = True
|
|
elif mama[i] < fama[i]:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'MIDPOINT': # 주도 지표 => 추세
|
|
res_arr = f(close, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if close[i] > res_arr[i]:
|
|
t_d.append(True)
|
|
elif close[i] < res_arr[i]:
|
|
t_d.append(False)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'MIDPRICE': # 주도 지표 => 추세
|
|
res_arr = f(high, low, timeperiod=values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if close[i] > res_arr[i]:
|
|
t_d.append(True)
|
|
elif close[i] < res_arr[i]:
|
|
t_d.append(False)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'SAR': # 주도 지표 => 역추세
|
|
res_arr = f(high, low, acceleration=values[0], maximum=values[1])
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if close[i] > res_arr[i] and close[i-1] < res_arr[i-1]:
|
|
is_trade = True
|
|
elif res_arr[i] > close[i] and res_arr[i-1] < close[i-1]:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'T3': #indicator is 'WMA': # 주도 지표 => 추세
|
|
long = f(close, timeperiod=values[0])
|
|
short = f(close, timeperiod=values[1])
|
|
|
|
start = 1 + numpy.isnan(long).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(long)):
|
|
t_d = []
|
|
|
|
if short[i] > long[i]:
|
|
is_trade = True
|
|
elif short[i] < long[i]:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'TRIMA' or indicator is 'WMA': # 주도 지표 => 추세
|
|
res_arr = f(close, timeperiod=values)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if close[i] > res_arr[i]:
|
|
is_trade = True
|
|
elif close[i] < res_arr[i]:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'AD': # 베이스 지표 - 거래량
|
|
res_arr = f(high, low, close, volume)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
# 가격-거래량 반비례 => 추세 반전
|
|
if close[i] > close[i - 1] and res_arr[i] < res_arr[i - 1]:
|
|
is_trade = False
|
|
elif close[i] < close[i - 1] and res_arr[i] > res_arr[i - 1]:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
|
|
elif indicator is 'ADOSC_DIV': # 베이스 지표 - 거래량
|
|
res_arr = talib.ADOSC(high, low, close, volume, fastperiod=values[0], slowperiod=values[1])
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
|
|
if is_divergence is True:
|
|
is_trade = True
|
|
elif is_divergence is False:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'DMI': # 주도 지표 - 추세(추매)
|
|
adx = talib.ADX(high, low, close, timeperiod=values)
|
|
plus_di = talib.PLUS_DI(high, low, close, timeperiod=values)
|
|
minus_di = talib.MINUS_DI(high, low, close, timeperiod=values)
|
|
|
|
start = 1 + numpy.isnan(adx).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(adx)):
|
|
t_d = []
|
|
|
|
if adx[i] > adx[i - 1]:
|
|
if plus_di[i] > minus_di[i] and adx[i] > minus_di[i]:
|
|
is_trade = True
|
|
elif minus_di[i] > minus_di[i - 1]:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'OBV': # 베이스 지표 / 역추세
|
|
res_arr = f(close, volume)
|
|
cdl_cnt = 20
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
if start < cdl_cnt:
|
|
start = cdl_cnt
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
# 고가 갱신 시 매도 / 저가 갱신 시 매수
|
|
if res_arr[i] > max(res_arr[i - cdl_cnt:i]):
|
|
t_d.append(False)
|
|
elif res_arr[i] < min(res_arr[i - cdl_cnt:i]):
|
|
t_d.append(True)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
# 보조지표 추가 끝
|
|
|
|
elif indicator is 'RSI': # 베이스 지표
|
|
res_arr = f(close, values)
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
if res_arr[i] < 30:
|
|
is_trade = True
|
|
elif res_arr[i] > 70:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'RSI_DIV': # 주도 지표 / 역추세 - 다이버전스
|
|
f = getattr(talib, 'RSI')
|
|
res_arr = f(close, values)
|
|
|
|
cdl_cnt = 100
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
if start < cdl_cnt:
|
|
start = cdl_cnt
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
if is_divergence is True and res_arr[i - 1] < 50:
|
|
is_trade = True
|
|
elif is_divergence is False and res_arr[i - 1] > 50:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'OBV_DIV':
|
|
f = getattr(talib, 'OBV')
|
|
res_arr = f(close, volume)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
|
|
if is_divergence is True:
|
|
is_trade = True
|
|
elif is_divergence is False:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'WILLR_DIV':
|
|
f = getattr(talib, 'WILLR')
|
|
res_arr = f(high, low, close, timeperiod=values)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
|
|
if is_divergence is True and res_arr[i - 1] < -50:
|
|
is_trade = True
|
|
elif is_divergence is False and res_arr[i - 1] > -50:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'ADX_DIV':
|
|
f = getattr(talib, 'ADX')
|
|
res_arr = f(high, low, close, timeperiod=values)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
|
|
if is_divergence is True:
|
|
is_trade = True
|
|
elif is_divergence is False:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'BOP_DIV':
|
|
f = getattr(talib, 'BOP')
|
|
res_arr = f(open, high, low, close)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
|
|
if is_divergence is True:
|
|
is_trade = True
|
|
elif is_divergence is False:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'CCI_DIV':
|
|
f = getattr(talib, 'CCI')
|
|
res_arr = f(high, low, close, timeperiod=values)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
|
|
if is_divergence is True and res_arr[i - 1] < -100:
|
|
is_trade = True
|
|
elif is_divergence is False and res_arr[i - 1] > 100:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'MFI_DIV':
|
|
f = getattr(talib, 'MFI')
|
|
res_arr = f(close, close, close, volume, timeperiod=values)
|
|
# res_arr = f(high, low, close, volume, timeperiod=values)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
# is_divergence = is_divergence_v3(i, close, res_arr, date)
|
|
|
|
if is_divergence is True and res_arr[i - 1]:
|
|
is_trade = True
|
|
elif is_divergence is False and res_arr[i - 1]:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
''' CMO 지표 수치 불일치 '''
|
|
if indicator is 'CMO_DIV':
|
|
res_arr = chande_momentum_oscillator.chande_momentum_oscillator(close, 9)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
|
|
if is_divergence is True and res_arr[i - 1] < 0:
|
|
is_trade = True
|
|
elif is_divergence is False and res_arr[i - 1] > 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'MOM_DIV':
|
|
f = getattr(talib, 'MOM')
|
|
res_arr = f(close, timeperiod=values)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
|
|
if is_divergence is True: # and res_arr[i-1] < 0:
|
|
is_trade = True
|
|
elif is_divergence is False: # and res_arr[i-1] > 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'ROC_DIV':
|
|
f = getattr(talib, 'ROC')
|
|
res_arr = f(close, timeperiod=values)
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
|
|
if is_divergence is True: # and round(res_arr[i-1]) < 0:
|
|
is_trade = True
|
|
elif is_divergence is False: # and round(res_arr[i-1]) > 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'STOCH_DIV':
|
|
f = getattr(talib, 'STOCH')
|
|
|
|
# res_arr, slowd = f(high, low, close,
|
|
slowk, res_arr = f(high, low, close,
|
|
fastk_period=values[0],
|
|
slowk_period=values[1],
|
|
slowd_period=values[2])
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
|
|
if is_divergence is True: # and round(res_arr[i-1]) < 0:
|
|
is_trade = True
|
|
elif is_divergence is False: # and round(res_arr[i-1]) > 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
if indicator is 'STOCHRSI_DIV':
|
|
rsi_k, res_arr = stoch_rsi(close, values[0], values[1], values[2])
|
|
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
i = i - 1
|
|
|
|
is_divergence = is_divergence_v3(i, close, res_arr, date)
|
|
if is_divergence is True:
|
|
is_trade = True
|
|
elif is_divergence is False:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
elif indicator is 'ULTOSC_DIV': # 주도 지표
|
|
res_arr = talib.ULTOSC(high, low, close, timeperiod1=values[0], timeperiod2=values[1], timeperiod3=values[2])
|
|
start = 1 + numpy.isnan(res_arr).sum()
|
|
is_trade = None
|
|
|
|
for i in range(start, len(res_arr)):
|
|
t_d = []
|
|
|
|
is_divergence = is_divergence_v2(i, high, low, res_arr, date)
|
|
if is_divergence is True: # and round(res_arr[i-1]) < 0:
|
|
is_trade = True
|
|
elif is_divergence is False: # and round(res_arr[i-1]) > 0:
|
|
is_trade = False
|
|
|
|
t_d.append(is_trade)
|
|
|
|
data[i] = data[i] + t_d
|
|
|
|
self.indicators_data = data
|
|
|
|
def next(self):
|
|
idx = (self._broker._i) - 1
|
|
|
|
sl = self.data.Close[-1] - (self.data.Close[-1] * self._sl_percent)
|
|
r_sl = self.data.Close[-1] + (self.data.Close[-1] * self._r_sl_percent)
|
|
date = self.data.Date[-1]
|
|
|
|
# 배열 동시 시그널 매매
|
|
if len(self.indicators_data[idx]) == len(self.use_indicators):
|
|
# None value pass
|
|
if not (None in self.indicators_data[idx]):
|
|
if all(self.indicators_data[idx]):
|
|
# only short test
|
|
# if not self.orders.is_short:
|
|
# self.position.close()
|
|
|
|
if len(self.use_indicators) is 1: # 단일 지표일 경우
|
|
if not self.orders.is_long and not any(self.indicators_data[idx - 1]):
|
|
# self.buy()
|
|
# self.buy(tp=r_sl)
|
|
# self.buy(sl=sl, tp=r_sl)
|
|
self.buy(sl=sl)
|
|
elif len(self.use_indicators) > 1: # 여러 지표일 경우
|
|
if not self.orders.is_long:
|
|
# self.buy()
|
|
# self.buy(tp=r_sl)
|
|
# self.buy(sl=sl, tp=r_sl)
|
|
self.buy(sl=sl)
|
|
|
|
elif not any(self.indicators_data[idx]):
|
|
# short test
|
|
# if self.orders.is_long is None or self.orders.is_short is False:
|
|
# self.sell(sl=r_sl, tp=sl)
|
|
|
|
# only long test
|
|
if self.orders.is_long:
|
|
self.position.close()
|
|
|
|
# 시뮬레이터 종료 시 포지션 종료
|
|
if idx == self.data_len - 2:
|
|
self.position.close()
|