첫 번째 커밋

This commit is contained in:
javamon
2025-12-06 22:31:19 +09:00
commit 849a100fa9
33 changed files with 6613 additions and 0 deletions

667
signal_helper.py Normal file
View File

@@ -0,0 +1,667 @@
import talib
import numpy as np
import pandas as pd
import sys
def data_columns_init(data):
# data.reset_index(level=0, inplace=True)
t_col = []
for c in data.columns:
t_col.append(c.lower().capitalize())
# data.reset_index(level=0, inplace=True)
# t_col.insert(0, 'Date')
data.columns = t_col
def heikin_ashi(df):
heikin_ashi_df = pd.DataFrame(index=df.index.values, columns=['open', 'high', 'low', 'close'])
heikin_ashi_df['close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4
for i in range(len(df)):
if i == 0:
heikin_ashi_df.iat[0, 0] = df['open'].iloc[0]
else:
heikin_ashi_df.iat[i, 0] = (heikin_ashi_df.iat[i - 1, 0] + heikin_ashi_df.iat[i - 1, 3]) / 2
heikin_ashi_df['high'] = heikin_ashi_df.loc[:, ['open', 'close']].join(df['high']).max(axis=1)
heikin_ashi_df['low'] = heikin_ashi_df.loc[:, ['open', 'close']].join(df['low']).min(axis=1)
return heikin_ashi_df
def get_candle_pattern_arr(data=None, use_patterns=talib.get_function_groups()['Pattern Recognition']):
if data is None:
return []
r_data = [None] * len(data.Close)
for p in use_patterns:
f = getattr(talib, p)
res_arr = f(data.Open, data.High, data.Low, data.Close)
# 100 is plus candle / -100 is minus candle
for i in range(0, len(res_arr)):
# print(p, res_arr[i])
if int(res_arr[i]) is not 0:
r_data[i] = {p: int(res_arr[i])}
return r_data
def crossover(k, pre_k, d, pre_d) -> bool:
try:
return k > d and pre_k < pre_d
except IndexError:
return False
def is_divergence(i, cdl_cnt, low, high, res_arr):
# 저가 갱신 / 지표 저점 상승 : 매수
if min(low[i - cdl_cnt:i]) < min(low[i - (cdl_cnt * 2):i - cdl_cnt + 1]) and \
min(res_arr[i - cdl_cnt:i]) > min(res_arr[i - (cdl_cnt * 2):i - cdl_cnt + 1]):
return True
# 고가 갱신 / 지표 고점 하락 : 매도
elif max(high[i - cdl_cnt:i]) > max(high[i - (cdl_cnt * 2):i - cdl_cnt + 1]) and \
max(res_arr[i - cdl_cnt:i]) < max(res_arr[i - (cdl_cnt * 2):i - cdl_cnt + 1]):
return False
def moving_average(data, n=3) :
return talib.EMA(data, timeperiod=n) # 이동평균
def stoch_rsi(data, rsi_period=14, stoch_period=14, slowk_period=3):
# rsi = talib.RSI(close, 14)
# rsi_fastd, rsi_slowd = talib.STOCH(rsi, rsi, rsi, fastk_period=14, slowk_period=3, slowd_period=3,
# slowk_matype=0, slowd_matype=0)
rsi = talib.RSI(data, rsi_period)
return talib.STOCH(rsi, rsi, rsi, fastk_period=stoch_period, slowk_period=slowk_period, slowd_period=slowk_period,
slowk_matype=0, slowd_matype=0)
def is_trade_fence_pattern(pattern, value):
p = pattern.upper()
if p == "CDL3INSIDE":
if int(value) < 0:
return True
elif int(value) > 0:
return False
elif p == "CDL3LINESTRIKE":
if int(value) < 0:
return True
elif int(value) > 0:
return False
elif p == "CDL3OUTSIDE":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLABANDONEDBABY": # no data
return None
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLBELTHOLD":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLBREAKAWAY":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLCLOSINGMARUBOZU": # 거래 빈도 높고 60퍼 이상의 확률
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLCOUNTERATTACK":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLCONCEALBABYSWALL" or p == "CDLMATHOLD":
return None
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLENGULFING":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLGAPSIDESIDEWHITE":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLHARAMI":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLHARAMICROSS":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLHIKKAKEMOD" or p == "CDLHIKKAKE":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLKICKING" or p == "CDLKICKINGBYLENGTH":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLPIERCING":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLRISEFALL3METHODS":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLSEPARATINGLINES": # 역추세로 활용 보류
return None
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLSTALLEDPATTERN": # 역추세 - 시그널
return None
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLTASUKIGAP":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLUNIQUE3RIVER": # 역추세 - 시그널
return None
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLXSIDEGAP3METHODS":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLSTICKSANDWICH":
if int(value) > 0:
return True
elif int(value) < 0:
return False
elif p == "CDLTRISTAR":
if int(value) > 0:
return True
elif int(value) < 0:
return False
return None
def get_indicators_args(indicators):
res_data = []
for i in indicators:
if i is 'RSI' or i is 'RSI_DIV': # 베이스 지표
e_range = list(range(9, 31))
# e_range = [14]
for v in e_range:
res_data.append({i: v})
# res_data = list(range(9, 31))
elif i is 'STOCH': # 주도 지표
# res_data.append({i: [5, 3, 3]})
# continue
f_range = list(range(5, 21))
for f_r in f_range:
s_range = list(range(1, f_r + 1))
for s_r in s_range:
t_range = list(range(1, s_r + 1))
for t_r in t_range:
res_data.append({i: [f_r, s_r, t_r]})
elif i is 'ADX' or i is 'ADXR' or i is 'DMI': # 베이스 지표
e_range = list(range(9, 21))
# e_range = [14]DMI
for v in e_range:
res_data.append({i: v})
elif i is 'DI': # Directional Indicator Plus/Minus # 주도 지표
e_range = list(range(9, 31))
# e_range = [14]
for v in e_range:
res_data.append({i: v})
elif i is 'APO': # 주도 지표
f_range = list(range(17, 31))
for f_r in f_range:
s_range = list(range(10, f_r + 1))
for s_r in s_range:
res_data.append({i: [s_r, f_r]})
elif i is 'AROON': # 주도 지표
e_range = list(range(11, 31))
for v in e_range:
res_data.append({i: v})
elif i is 'AROONOSC': # 베이스 지표
e_range = list(range(3, 21))
for v in e_range:
res_data.append({i: v})
elif i is 'BOP': # 베이스
res_data.append({i: None})
elif i is 'CCI': # 베이스
e_range = list(range(11, 31))
for v in e_range:
res_data.append({i: v})
elif i is 'CMO': # 베이스
e_range = list(range(9, 31))
for v in e_range:
res_data.append({i: v})
# elif i is 'DX': # 베이스 / 추세 추종
# e_range = list(range(9, 31))
# for v in e_range:
# res_data.append({i: v})
elif i is 'MACD': # 주도 지표
f_range = list(range(11, 31))
for f_r in f_range:
s_range = list(range(9, f_r + 1))
for s_r in s_range:
t_range = list(range(7, s_r + 1))
for t_r in t_range:
res_data.append({i: [s_r, f_r, t_r]})
elif i is 'MACDFIX': # 주도 지표
e_range = list(range(5, 21))
for v in e_range:
res_data.append({i: v})
elif i is 'MFI': # 베이스 / 추세
e_range = list(range(9, 31))
for v in e_range:
res_data.append({i: v})
elif i is 'MOM': # 베이스 / 역추세
e_range = list(range(9, 31))
for v in e_range:
res_data.append({i: v})
elif i is 'PPO': # 주도 지표
f_range = list(range(9, 26))
for f_r in f_range:
s_range = list(range(10, f_r + 1))
for s_r in s_range:
res_data.append({i: [s_r, f_r]})
elif i is 'ROC' or i is 'ROCP' or i is 'WILLR' or i is 'MIDPOINT' or i is 'MIDPRICE': # 베이스 지표 / 추세
e_range = list(range(9, 31))
for v in e_range:
res_data.append({i: v})
# elif i is 'ROCR':# 베이스 지표 / 추세 => 보류
# e_range = list(range(9, 31))
# for v in e_range:
# res_data.append({i: v}
# elif i is 'ROCR100':# 베이스 지표 / 추세 => 보류
# e_range = list(range(9, 31))
# for v in e_range:
# res_data.append({i: v})
elif i is 'STOCHF': # 주도 지표
f_range = list(range(5, 21))
for f_r in f_range:
s_range = list(range(3, f_r + 1))
for s_r in s_range:
res_data.append({i: [f_r, s_r]})
elif i is 'STOCHRSI': # 주도 지표
f_range = list(range(5, 21))
for f_r in f_range:
s_range = list(range(3, f_r + 1))
for s_r in s_range:
t_range = list(range(3, s_r + 1))
for t_r in t_range:
res_data.append({i: [f_r, s_r, t_r]})
elif i is 'TRIX': # 베이스 지표 / 역추세
f_range = list(range(3, 36))
for f_r in f_range:
s_range = list(range(2, f_r + 1))
for s_r in s_range:
res_data.append({i: [f_r, s_r]})
elif i is 'ULTOSC': # 주도 지표
f_range = list(range(7, 31))
for f_r in f_range:
s_range = list(range(5, f_r + 1))
for s_r in s_range:
t_range = list(range(5, s_r + 1))
for t_r in t_range:
res_data.append({i: [t_r, s_r, f_r]})
elif i is 'BBANDS': # 베이스 지표
f_range = list(range(9, 31))
for f_r in f_range:
s_range = list(range(1, f_r + 1))
for s_r in s_range:
res_data.append({i: [f_r, s_r]})
elif i is 'EMA' or i is 'DEMA' or i is 'MA' or i is 'SMA': # 주도 지표
f_range = list(range(9, 36))
for f_r in f_range:
s_range = list(range(5, f_r + 1))
for s_r in s_range:
res_data.append({i: [f_r, s_r]})
# elif i is 'KAMA': # 베이스 지표 / 추세 => 사용법 모름
# e_range = list(range(9, 36))
# for v in e_range:
# res_data.append({i: v})
elif i is 'MAMA': # 주도 지표
f_range = list(range(1, 10))
for f_r in f_range:
s_range = list(range(1, 10))
for s_r in s_range:
res_data.append({i: [f_r / 10, s_r / 100]})
# elif i is 'MAVP': # 주도 지표
# f_range = list(range(9, 36))
# for f_r in f_range:
# s_range = list(range(10, f_r+1))
# for s_r in s_range:
# res_data.append({i : [s_r, f_r]})
elif i is 'SAR': # 베이스 지표 / 역추세 => 거래 빈도만 줄이면 훌륭할 듯
e_range = list(range(1, 5))
for v in e_range:
res_data.append({i: v})
elif i is 'T3': # 주도 지표
f_range = list(range(5, 31))
for f_r in f_range:
s_range = list(range(3, f_r))
for s_r in s_range:
res_data.append({i: [f_r, s_r]})
elif i is 'TEMA' or i is 'TRIMA' or i is 'WMA': # 주도 지표
f_range = list(range(15, 41))
for f_r in f_range:
s_range = list(range(7, f_r))
for s_r in s_range:
res_data.append({i: [f_r, s_r]})
elif i is 'AD': # 베이스 지표 - 거래량
res_data.append({i: None})
elif i is 'ADOSC': # 주도 지표 - 거래량 => 추세
f_range = list(range(7, 31))
for f_r in f_range:
s_range = list(range(3, f_r))
for s_r in s_range:
res_data.append({i: [s_r, f_r]})
elif i is 'OBV' or i is 'BOP_DIV': # 베이스 지표 - 거래량 기반 상승장, 하락장 지표
res_data.append({i: None})
# 다이버전스 주도 지표 / 역추세
elif i is 'ADX_DIV' or i is 'ADXR_DIV' or i is 'AROONOSC_DIV' or i is 'CCI_DIV' or i is 'CMO_DIV' \
or i is 'MFI_DIV' or i is 'MOM_DIV' or i is 'ROC_DIV' or i is 'ROCP_DIV' or i is 'ROCR_DIV' \
or i is 'TRIX_DIV' or i is 'WILLR_DIV':
e_range = list(range(9, 31))
for v in e_range:
res_data.append({i: v})
elif i is 'STOCH_DIV':
f_range = list(range(5, 21))
for f_r in f_range:
s_range = list(range(1, f_r + 1))
for s_r in s_range:
t_range = list(range(1, s_r + 1))
for t_r in t_range:
res_data.append({i: [f_r, s_r, t_r]})
elif i is 'STOCHF_DIV':
f_range = list(range(7, 31))
for f_r in f_range:
s_range = list(range(3, f_r))
for s_r in s_range:
res_data.append({i: [f_r, s_r]})
elif i is 'STOCHRSI_DIV':
f_range = list(range(9, 14))
for f_r in f_range:
s_range = list(range(1, 5))
for s_r in s_range:
t_range = list(range(1, 5))
for t_r in t_range:
res_data.append({i: [f_r, s_r, t_r]})
return res_data
'''
def is_divergence_v4(high, low, res_arr, date, cdl_cnt=50):
m_idxs = [i for i, x in enumerate(res_arr) if
res_arr[i - 1] < res_arr[i] and res_arr[i - 1] < res_arr[i - 2]]
is_diver_long = [v for i, v in enumerate(m_idxs) if low[m_idxs[i]] < low[m_idxs[i-1]] and res_arr[m_idxs[i]] > res_arr[m_idxs[i-1]]]
m_idxs = [i for i, x in enumerate(res_arr) if
res_arr[i - 1] > res_arr[i] and res_arr[i - 1] > res_arr[i - 2]]
is_diver_short = [v for i, v in enumerate(m_idxs) if high[m_idxs[i]] > high[m_idxs[i-1]] and res_arr[m_idxs[i]] < res_arr[m_idxs[i-1]]]
'''
# 변곡점 캐치 다이버전스 함수 / 고점 및 저점 활용
def is_divergence_v2(i, high, low, res_arr, date, cdl_cnt=50):
is_rsi_min_1 = res_arr[i - 1] < res_arr[i] and res_arr[i - 1] < res_arr[i - 2]
is_rsi_max_1 = res_arr[i - 1] > res_arr[i] and res_arr[i - 1] > res_arr[i - 2]
# 상승 다이버전스
if is_rsi_min_1: # 첫번째 저점 형성
rsi_min_1 = res_arr[i - 1]
low_1 = low[i - 1]
for s_i in range(i - 3, i - cdl_cnt, -1):
# 두번째 저점 형성
is_rsi_min_2 = res_arr[s_i - 1] < res_arr[s_i] and res_arr[s_i - 1] < res_arr[s_i - 2]
if is_rsi_min_2:
rsi_min_2 = res_arr[s_i - 1]
low_2 = low[s_i - 1]
if low_1 < low_2: # 저점 갱신
if rsi_min_1 > rsi_min_2: # 지표 저점 상승
# print('매수 포지션', i, '-' * 50)
# print(date[i - 1], '=>', date[s_i - 1])
# print(low_1, '=>', low_2)
# print(rsi_min_1, '=>', rsi_min_2)
# print('현재', res_arr[i-2], '>', res_arr[i - 1], '<', res_arr[i])
# print('이전', res_arr[s_i-2], '>', res_arr[s_i - 1], '<', res_arr[s_i])
return True
return None
# 하락 다이버전스
if is_rsi_max_1:
rsi_max_1 = res_arr[i - 1]
max_1 = high[i - 1]
for s_i in range(i - 3, i - cdl_cnt, -1):
is_rsi_max_2 = res_arr[s_i - 1] > res_arr[s_i] and res_arr[s_i - 1] > res_arr[s_i - 2]
if is_rsi_max_2:
rsi_max_2 = res_arr[s_i - 1]
max_2 = high[s_i - 1]
if max_1 > max_2: # 고점갱신 갱신
if rsi_max_1 < rsi_max_2 : # 지표 고점 하락
# print('매도 포지션', i, '-'*50)
# print(date[i-1], '=>', date[s_i-1])
# print(max_1, '=>', max_2)
# print(rsi_max_1, '=>', rsi_max_2)
# print('현재',res_arr[i-2], '<', res_arr[i - 1], '>', res_arr[i])
# print('이전', res_arr[s_i-2], '<', res_arr[s_i - 1], '>', res_arr[s_i])
return False
return None
return None
# def is_divergence_v2(i, high, low, res_arr, date, cdl_cnt=50):
# is_rsi_min_1 = res_arr[i - 1] < res_arr[i] and res_arr[i - 1] < res_arr[i - 2]
# is_rsi_max_1 = res_arr[i - 1] > res_arr[i] and res_arr[i - 1] > res_arr[i - 2]
#
# if is_rsi_min_1: # 상승 다이버전스
# rsi_min_1 = res_arr[i - 1]
# low_1 = low[i - 1]
#
# for s_i in range(i - 3, i - cdl_cnt, -1):
# # 두번째 저점 형성
# is_rsi_min_2 = res_arr[s_i - 1] < res_arr[s_i] and res_arr[s_i - 1] < res_arr[s_i - 2]
#
# if is_rsi_min_2:
# rsi_min_2 = res_arr[s_i - 1]
# low_2 = low[s_i - 1]
#
# if low_1 < low_2: # 저점 갱신
# # if rsi_min_1 > rsi_min_2 and res_arr[i - 2] > res_arr[s_i - 2]: # 지표 저점 상승
# if rsi_min_1 > rsi_min_2: # 지표 저점 상승
# # print('매수 포지션', i, '-' * 50)
# # print(date[i - 1], '=>', date[s_i - 1])
# # print(low_1, '=>', low_2)
# # print(rsi_min_1, '=>', rsi_min_2)
# # print('현재', res_arr[i-2], '>', res_arr[i - 1], '<', res_arr[i])
# # print('이전', res_arr[s_i-2], '>', res_arr[s_i - 1], '<', res_arr[s_i])
# return True
# return None
#
# if is_rsi_max_1: # 하락 다이버전스
# rsi_max_1 = res_arr[i - 1]
# max_1 = high[i-1]
#
# for s_i in range(i - 3, i - cdl_cnt, -1):
# is_rsi_max_2 = res_arr[s_i - 1] > res_arr[s_i] and res_arr[s_i - 1] > res_arr[s_i - 2]
#
# if is_rsi_max_2:
# rsi_max_2 = res_arr[s_i - 1]
# max_2 = high[s_i - 1]
#
# if max_1 > max_2: # 고점갱신 갱신
# if rsi_max_1 < rsi_max_2: # 지표 고점 하락
# # print('매도 포지션', i, '-'*50)
# # print(date[i-1], '=>', date[s_i-1])
# # print(max_1, '=>', max_2)
# # print(rsi_max_1, '=>', rsi_max_2)
# # print('현재',res_arr[i-2], '<', res_arr[i - 1], '>', res_arr[i])
# # print('이전', res_arr[s_i-2], '<', res_arr[s_i - 1], '>', res_arr[s_i])
# return False
# return None
#
# return None
# 변곡점 캐치 다이버전스 함수 / 종가 활용
def is_divergence_v3(i, close, res_arr, date, cdl_cnt=50):
high = close
low = close
is_rsi_min_1 = res_arr[i - 1] < res_arr[i] and res_arr[i - 1] < res_arr[i - 2]
is_rsi_max_1 = res_arr[i - 1] > res_arr[i] and res_arr[i - 1] > res_arr[i - 2]
# 상승 다이버전스
if is_rsi_min_1:
rsi_min_1 = res_arr[i - 1]
low_1 = low[i - 1]
for s_i in range(i - 3, i - cdl_cnt, -1):
is_rsi_min_2 = res_arr[s_i - 1] < res_arr[s_i] and res_arr[s_i - 1] < res_arr[s_i - 2]
if is_rsi_min_2:
rsi_min_2 = res_arr[s_i - 1]
low_2 = low[s_i - 1]
if low_1 < low_2: # 저점 갱신
if rsi_min_1 > rsi_min_2: # 지표 저점 상승
return True
return None
# 하락 다이버전스
if is_rsi_max_1:
rsi_max_1 = res_arr[i - 1]
max_1 = high[i - 1]
for s_i in range(i - 3, i - cdl_cnt, -1):
is_rsi_max_2 = res_arr[s_i - 1] > res_arr[s_i] and res_arr[s_i - 1] > res_arr[s_i - 2]
if is_rsi_max_2:
rsi_max_2 = res_arr[s_i - 1]
max_2 = high[s_i - 1]
if max_1 > max_2: # 고점갱신 갱신
if rsi_max_2 > rsi_max_1: # 지표 고점 하락
return False
return None
return None
# get pivo percent
def pivo(n = 1000):
if n is 0:
return
pivo_arr = [0, 1]
res_arr = []
while pivo_arr[len(pivo_arr)-2] + pivo_arr[len(pivo_arr)-1] <= n:
pivo_arr.append(pivo_arr[len(pivo_arr)-2] + pivo_arr[len(pivo_arr)-1]) # for percent
# calculate percent
for pivo_val in pivo_arr[2:]:
res_arr.append(pivo_val/100)
return res_arr