import talib import numpy as np import pandas as pd import sys def data_columns_init(data): # data.reset_index(level=0, inplace=True) t_col = [] for c in data.columns: t_col.append(c.lower().capitalize()) # data.reset_index(level=0, inplace=True) # t_col.insert(0, 'Date') data.columns = t_col def heikin_ashi(df): heikin_ashi_df = pd.DataFrame(index=df.index.values, columns=['open', 'high', 'low', 'close']) heikin_ashi_df['close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4 for i in range(len(df)): if i == 0: heikin_ashi_df.iat[0, 0] = df['open'].iloc[0] else: heikin_ashi_df.iat[i, 0] = (heikin_ashi_df.iat[i - 1, 0] + heikin_ashi_df.iat[i - 1, 3]) / 2 heikin_ashi_df['high'] = heikin_ashi_df.loc[:, ['open', 'close']].join(df['high']).max(axis=1) heikin_ashi_df['low'] = heikin_ashi_df.loc[:, ['open', 'close']].join(df['low']).min(axis=1) return heikin_ashi_df def get_candle_pattern_arr(data=None, use_patterns=talib.get_function_groups()['Pattern Recognition']): if data is None: return [] r_data = [None] * len(data.Close) for p in use_patterns: f = getattr(talib, p) res_arr = f(data.Open, data.High, data.Low, data.Close) # 100 is plus candle / -100 is minus candle for i in range(0, len(res_arr)): # print(p, res_arr[i]) if int(res_arr[i]) is not 0: r_data[i] = {p: int(res_arr[i])} return r_data def crossover(k, pre_k, d, pre_d) -> bool: try: return k > d and pre_k < pre_d except IndexError: return False def is_divergence(i, cdl_cnt, low, high, res_arr): # 저가 갱신 / 지표 저점 상승 : 매수 if min(low[i - cdl_cnt:i]) < min(low[i - (cdl_cnt * 2):i - cdl_cnt + 1]) and \ min(res_arr[i - cdl_cnt:i]) > min(res_arr[i - (cdl_cnt * 2):i - cdl_cnt + 1]): return True # 고가 갱신 / 지표 고점 하락 : 매도 elif max(high[i - cdl_cnt:i]) > max(high[i - (cdl_cnt * 2):i - cdl_cnt + 1]) and \ max(res_arr[i - cdl_cnt:i]) < max(res_arr[i - (cdl_cnt * 2):i - cdl_cnt + 1]): return False def moving_average(data, n=3) : return talib.EMA(data, timeperiod=n) # 이동평균 def stoch_rsi(data, rsi_period=14, stoch_period=14, slowk_period=3): # rsi = talib.RSI(close, 14) # rsi_fastd, rsi_slowd = talib.STOCH(rsi, rsi, rsi, fastk_period=14, slowk_period=3, slowd_period=3, # slowk_matype=0, slowd_matype=0) rsi = talib.RSI(data, rsi_period) return talib.STOCH(rsi, rsi, rsi, fastk_period=stoch_period, slowk_period=slowk_period, slowd_period=slowk_period, slowk_matype=0, slowd_matype=0) def is_trade_fence_pattern(pattern, value): p = pattern.upper() if p == "CDL3INSIDE": if int(value) < 0: return True elif int(value) > 0: return False elif p == "CDL3LINESTRIKE": if int(value) < 0: return True elif int(value) > 0: return False elif p == "CDL3OUTSIDE": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLABANDONEDBABY": # no data return None if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLBELTHOLD": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLBREAKAWAY": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLCLOSINGMARUBOZU": # 거래 빈도 높고 60퍼 이상의 확률 if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLCOUNTERATTACK": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLCONCEALBABYSWALL" or p == "CDLMATHOLD": return None if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLENGULFING": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLGAPSIDESIDEWHITE": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLHARAMI": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLHARAMICROSS": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLHIKKAKEMOD" or p == "CDLHIKKAKE": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLKICKING" or p == "CDLKICKINGBYLENGTH": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLPIERCING": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLRISEFALL3METHODS": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLSEPARATINGLINES": # 역추세로 활용 보류 return None if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLSTALLEDPATTERN": # 역추세 - 시그널 return None if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLTASUKIGAP": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLUNIQUE3RIVER": # 역추세 - 시그널 return None if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLXSIDEGAP3METHODS": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLSTICKSANDWICH": if int(value) > 0: return True elif int(value) < 0: return False elif p == "CDLTRISTAR": if int(value) > 0: return True elif int(value) < 0: return False return None def get_indicators_args(indicators): res_data = [] for i in indicators: if i is 'RSI' or i is 'RSI_DIV': # 베이스 지표 e_range = list(range(9, 31)) # e_range = [14] for v in e_range: res_data.append({i: v}) # res_data = list(range(9, 31)) elif i is 'STOCH': # 주도 지표 # res_data.append({i: [5, 3, 3]}) # continue f_range = list(range(5, 21)) for f_r in f_range: s_range = list(range(1, f_r + 1)) for s_r in s_range: t_range = list(range(1, s_r + 1)) for t_r in t_range: res_data.append({i: [f_r, s_r, t_r]}) elif i is 'ADX' or i is 'ADXR' or i is 'DMI': # 베이스 지표 e_range = list(range(9, 21)) # e_range = [14]DMI for v in e_range: res_data.append({i: v}) elif i is 'DI': # Directional Indicator Plus/Minus # 주도 지표 e_range = list(range(9, 31)) # e_range = [14] for v in e_range: res_data.append({i: v}) elif i is 'APO': # 주도 지표 f_range = list(range(17, 31)) for f_r in f_range: s_range = list(range(10, f_r + 1)) for s_r in s_range: res_data.append({i: [s_r, f_r]}) elif i is 'AROON': # 주도 지표 e_range = list(range(11, 31)) for v in e_range: res_data.append({i: v}) elif i is 'AROONOSC': # 베이스 지표 e_range = list(range(3, 21)) for v in e_range: res_data.append({i: v}) elif i is 'BOP': # 베이스 res_data.append({i: None}) elif i is 'CCI': # 베이스 e_range = list(range(11, 31)) for v in e_range: res_data.append({i: v}) elif i is 'CMO': # 베이스 e_range = list(range(9, 31)) for v in e_range: res_data.append({i: v}) # elif i is 'DX': # 베이스 / 추세 추종 # e_range = list(range(9, 31)) # for v in e_range: # res_data.append({i: v}) elif i is 'MACD': # 주도 지표 f_range = list(range(11, 31)) for f_r in f_range: s_range = list(range(9, f_r + 1)) for s_r in s_range: t_range = list(range(7, s_r + 1)) for t_r in t_range: res_data.append({i: [s_r, f_r, t_r]}) elif i is 'MACDFIX': # 주도 지표 e_range = list(range(5, 21)) for v in e_range: res_data.append({i: v}) elif i is 'MFI': # 베이스 / 추세 e_range = list(range(9, 31)) for v in e_range: res_data.append({i: v}) elif i is 'MOM': # 베이스 / 역추세 e_range = list(range(9, 31)) for v in e_range: res_data.append({i: v}) elif i is 'PPO': # 주도 지표 f_range = list(range(9, 26)) for f_r in f_range: s_range = list(range(10, f_r + 1)) for s_r in s_range: res_data.append({i: [s_r, f_r]}) elif i is 'ROC' or i is 'ROCP' or i is 'WILLR' or i is 'MIDPOINT' or i is 'MIDPRICE': # 베이스 지표 / 추세 e_range = list(range(9, 31)) for v in e_range: res_data.append({i: v}) # elif i is 'ROCR':# 베이스 지표 / 추세 => 보류 # e_range = list(range(9, 31)) # for v in e_range: # res_data.append({i: v} # elif i is 'ROCR100':# 베이스 지표 / 추세 => 보류 # e_range = list(range(9, 31)) # for v in e_range: # res_data.append({i: v}) elif i is 'STOCHF': # 주도 지표 f_range = list(range(5, 21)) for f_r in f_range: s_range = list(range(3, f_r + 1)) for s_r in s_range: res_data.append({i: [f_r, s_r]}) elif i is 'STOCHRSI': # 주도 지표 f_range = list(range(5, 21)) for f_r in f_range: s_range = list(range(3, f_r + 1)) for s_r in s_range: t_range = list(range(3, s_r + 1)) for t_r in t_range: res_data.append({i: [f_r, s_r, t_r]}) elif i is 'TRIX': # 베이스 지표 / 역추세 f_range = list(range(3, 36)) for f_r in f_range: s_range = list(range(2, f_r + 1)) for s_r in s_range: res_data.append({i: [f_r, s_r]}) elif i is 'ULTOSC': # 주도 지표 f_range = list(range(7, 31)) for f_r in f_range: s_range = list(range(5, f_r + 1)) for s_r in s_range: t_range = list(range(5, s_r + 1)) for t_r in t_range: res_data.append({i: [t_r, s_r, f_r]}) elif i is 'BBANDS': # 베이스 지표 f_range = list(range(9, 31)) for f_r in f_range: s_range = list(range(1, f_r + 1)) for s_r in s_range: res_data.append({i: [f_r, s_r]}) elif i is 'EMA' or i is 'DEMA' or i is 'MA' or i is 'SMA': # 주도 지표 f_range = list(range(9, 36)) for f_r in f_range: s_range = list(range(5, f_r + 1)) for s_r in s_range: res_data.append({i: [f_r, s_r]}) # elif i is 'KAMA': # 베이스 지표 / 추세 => 사용법 모름 # e_range = list(range(9, 36)) # for v in e_range: # res_data.append({i: v}) elif i is 'MAMA': # 주도 지표 f_range = list(range(1, 10)) for f_r in f_range: s_range = list(range(1, 10)) for s_r in s_range: res_data.append({i: [f_r / 10, s_r / 100]}) # elif i is 'MAVP': # 주도 지표 # f_range = list(range(9, 36)) # for f_r in f_range: # s_range = list(range(10, f_r+1)) # for s_r in s_range: # res_data.append({i : [s_r, f_r]}) elif i is 'SAR': # 베이스 지표 / 역추세 => 거래 빈도만 줄이면 훌륭할 듯 e_range = list(range(1, 5)) for v in e_range: res_data.append({i: v}) elif i is 'T3': # 주도 지표 f_range = list(range(5, 31)) for f_r in f_range: s_range = list(range(3, f_r)) for s_r in s_range: res_data.append({i: [f_r, s_r]}) elif i is 'TEMA' or i is 'TRIMA' or i is 'WMA': # 주도 지표 f_range = list(range(15, 41)) for f_r in f_range: s_range = list(range(7, f_r)) for s_r in s_range: res_data.append({i: [f_r, s_r]}) elif i is 'AD': # 베이스 지표 - 거래량 res_data.append({i: None}) elif i is 'ADOSC': # 주도 지표 - 거래량 => 추세 f_range = list(range(7, 31)) for f_r in f_range: s_range = list(range(3, f_r)) for s_r in s_range: res_data.append({i: [s_r, f_r]}) elif i is 'OBV' or i is 'BOP_DIV': # 베이스 지표 - 거래량 기반 상승장, 하락장 지표 res_data.append({i: None}) # 다이버전스 주도 지표 / 역추세 elif i is 'ADX_DIV' or i is 'ADXR_DIV' or i is 'AROONOSC_DIV' or i is 'CCI_DIV' or i is 'CMO_DIV' \ or i is 'MFI_DIV' or i is 'MOM_DIV' or i is 'ROC_DIV' or i is 'ROCP_DIV' or i is 'ROCR_DIV' \ or i is 'TRIX_DIV' or i is 'WILLR_DIV': e_range = list(range(9, 31)) for v in e_range: res_data.append({i: v}) elif i is 'STOCH_DIV': f_range = list(range(5, 21)) for f_r in f_range: s_range = list(range(1, f_r + 1)) for s_r in s_range: t_range = list(range(1, s_r + 1)) for t_r in t_range: res_data.append({i: [f_r, s_r, t_r]}) elif i is 'STOCHF_DIV': f_range = list(range(7, 31)) for f_r in f_range: s_range = list(range(3, f_r)) for s_r in s_range: res_data.append({i: [f_r, s_r]}) elif i is 'STOCHRSI_DIV': f_range = list(range(9, 14)) for f_r in f_range: s_range = list(range(1, 5)) for s_r in s_range: t_range = list(range(1, 5)) for t_r in t_range: res_data.append({i: [f_r, s_r, t_r]}) return res_data ''' def is_divergence_v4(high, low, res_arr, date, cdl_cnt=50): m_idxs = [i for i, x in enumerate(res_arr) if res_arr[i - 1] < res_arr[i] and res_arr[i - 1] < res_arr[i - 2]] is_diver_long = [v for i, v in enumerate(m_idxs) if low[m_idxs[i]] < low[m_idxs[i-1]] and res_arr[m_idxs[i]] > res_arr[m_idxs[i-1]]] m_idxs = [i for i, x in enumerate(res_arr) if res_arr[i - 1] > res_arr[i] and res_arr[i - 1] > res_arr[i - 2]] is_diver_short = [v for i, v in enumerate(m_idxs) if high[m_idxs[i]] > high[m_idxs[i-1]] and res_arr[m_idxs[i]] < res_arr[m_idxs[i-1]]] ''' # 변곡점 캐치 다이버전스 함수 / 고점 및 저점 활용 def is_divergence_v2(i, high, low, res_arr, date, cdl_cnt=50): is_rsi_min_1 = res_arr[i - 1] < res_arr[i] and res_arr[i - 1] < res_arr[i - 2] is_rsi_max_1 = res_arr[i - 1] > res_arr[i] and res_arr[i - 1] > res_arr[i - 2] # 상승 다이버전스 if is_rsi_min_1: # 첫번째 저점 형성 rsi_min_1 = res_arr[i - 1] low_1 = low[i - 1] for s_i in range(i - 3, i - cdl_cnt, -1): # 두번째 저점 형성 is_rsi_min_2 = res_arr[s_i - 1] < res_arr[s_i] and res_arr[s_i - 1] < res_arr[s_i - 2] if is_rsi_min_2: rsi_min_2 = res_arr[s_i - 1] low_2 = low[s_i - 1] if low_1 < low_2: # 저점 갱신 if rsi_min_1 > rsi_min_2: # 지표 저점 상승 # print('매수 포지션', i, '-' * 50) # print(date[i - 1], '=>', date[s_i - 1]) # print(low_1, '=>', low_2) # print(rsi_min_1, '=>', rsi_min_2) # print('현재', res_arr[i-2], '>', res_arr[i - 1], '<', res_arr[i]) # print('이전', res_arr[s_i-2], '>', res_arr[s_i - 1], '<', res_arr[s_i]) return True return None # 하락 다이버전스 if is_rsi_max_1: rsi_max_1 = res_arr[i - 1] max_1 = high[i - 1] for s_i in range(i - 3, i - cdl_cnt, -1): is_rsi_max_2 = res_arr[s_i - 1] > res_arr[s_i] and res_arr[s_i - 1] > res_arr[s_i - 2] if is_rsi_max_2: rsi_max_2 = res_arr[s_i - 1] max_2 = high[s_i - 1] if max_1 > max_2: # 고점갱신 갱신 if rsi_max_1 < rsi_max_2 : # 지표 고점 하락 # print('매도 포지션', i, '-'*50) # print(date[i-1], '=>', date[s_i-1]) # print(max_1, '=>', max_2) # print(rsi_max_1, '=>', rsi_max_2) # print('현재',res_arr[i-2], '<', res_arr[i - 1], '>', res_arr[i]) # print('이전', res_arr[s_i-2], '<', res_arr[s_i - 1], '>', res_arr[s_i]) return False return None return None # def is_divergence_v2(i, high, low, res_arr, date, cdl_cnt=50): # is_rsi_min_1 = res_arr[i - 1] < res_arr[i] and res_arr[i - 1] < res_arr[i - 2] # is_rsi_max_1 = res_arr[i - 1] > res_arr[i] and res_arr[i - 1] > res_arr[i - 2] # # if is_rsi_min_1: # 상승 다이버전스 # rsi_min_1 = res_arr[i - 1] # low_1 = low[i - 1] # # for s_i in range(i - 3, i - cdl_cnt, -1): # # 두번째 저점 형성 # is_rsi_min_2 = res_arr[s_i - 1] < res_arr[s_i] and res_arr[s_i - 1] < res_arr[s_i - 2] # # if is_rsi_min_2: # rsi_min_2 = res_arr[s_i - 1] # low_2 = low[s_i - 1] # # if low_1 < low_2: # 저점 갱신 # # if rsi_min_1 > rsi_min_2 and res_arr[i - 2] > res_arr[s_i - 2]: # 지표 저점 상승 # if rsi_min_1 > rsi_min_2: # 지표 저점 상승 # # print('매수 포지션', i, '-' * 50) # # print(date[i - 1], '=>', date[s_i - 1]) # # print(low_1, '=>', low_2) # # print(rsi_min_1, '=>', rsi_min_2) # # print('현재', res_arr[i-2], '>', res_arr[i - 1], '<', res_arr[i]) # # print('이전', res_arr[s_i-2], '>', res_arr[s_i - 1], '<', res_arr[s_i]) # return True # return None # # if is_rsi_max_1: # 하락 다이버전스 # rsi_max_1 = res_arr[i - 1] # max_1 = high[i-1] # # for s_i in range(i - 3, i - cdl_cnt, -1): # is_rsi_max_2 = res_arr[s_i - 1] > res_arr[s_i] and res_arr[s_i - 1] > res_arr[s_i - 2] # # if is_rsi_max_2: # rsi_max_2 = res_arr[s_i - 1] # max_2 = high[s_i - 1] # # if max_1 > max_2: # 고점갱신 갱신 # if rsi_max_1 < rsi_max_2: # 지표 고점 하락 # # print('매도 포지션', i, '-'*50) # # print(date[i-1], '=>', date[s_i-1]) # # print(max_1, '=>', max_2) # # print(rsi_max_1, '=>', rsi_max_2) # # print('현재',res_arr[i-2], '<', res_arr[i - 1], '>', res_arr[i]) # # print('이전', res_arr[s_i-2], '<', res_arr[s_i - 1], '>', res_arr[s_i]) # return False # return None # # return None # 변곡점 캐치 다이버전스 함수 / 종가 활용 def is_divergence_v3(i, close, res_arr, date, cdl_cnt=50): high = close low = close is_rsi_min_1 = res_arr[i - 1] < res_arr[i] and res_arr[i - 1] < res_arr[i - 2] is_rsi_max_1 = res_arr[i - 1] > res_arr[i] and res_arr[i - 1] > res_arr[i - 2] # 상승 다이버전스 if is_rsi_min_1: rsi_min_1 = res_arr[i - 1] low_1 = low[i - 1] for s_i in range(i - 3, i - cdl_cnt, -1): is_rsi_min_2 = res_arr[s_i - 1] < res_arr[s_i] and res_arr[s_i - 1] < res_arr[s_i - 2] if is_rsi_min_2: rsi_min_2 = res_arr[s_i - 1] low_2 = low[s_i - 1] if low_1 < low_2: # 저점 갱신 if rsi_min_1 > rsi_min_2: # 지표 저점 상승 return True return None # 하락 다이버전스 if is_rsi_max_1: rsi_max_1 = res_arr[i - 1] max_1 = high[i - 1] for s_i in range(i - 3, i - cdl_cnt, -1): is_rsi_max_2 = res_arr[s_i - 1] > res_arr[s_i] and res_arr[s_i - 1] > res_arr[s_i - 2] if is_rsi_max_2: rsi_max_2 = res_arr[s_i - 1] max_2 = high[s_i - 1] if max_1 > max_2: # 고점갱신 갱신 if rsi_max_2 > rsi_max_1: # 지표 고점 하락 return False return None return None # get pivo percent def pivo(n = 1000): if n is 0: return pivo_arr = [0, 1] res_arr = [] while pivo_arr[len(pivo_arr)-2] + pivo_arr[len(pivo_arr)-1] <= n: pivo_arr.append(pivo_arr[len(pivo_arr)-2] + pivo_arr[len(pivo_arr)-1]) # for percent # calculate percent for pivo_val in pivo_arr[2:]: res_arr.append(pivo_val/100) return res_arr