들어가며..
프로그래밍에서 병렬처리를 위해 멀티스레드를 다루는 일은 쉬운일은 아니다. 스레드의 동작 원리, 다중 스레드에서 공유되는 Shared value 처리, 그 과정에서 발생하는 Race condition, Dead Lock 문제 등을 고려하여 처리해야 하기 때문이다.
Shared value 처리를 하지 않고 병렬처리가 요구 되는 비교적 단순한 프로세스라면 멀티스레딩으로 처리 하는 것이 더 효율적일 수 있다.
주식의 일 데이터 업데이트를 하기 위해 멀티스레딩 환경을 구축했다. Pandas_reader 라이브러리를 사용해서 국내 모든 상장기업을 스크래핑 한다. 동기적으로 처리하면 종목코드 수에 따라 선형적으로 비례하여 느려질 수 밖에 없다. 이를 멀티스레드 처리 하여 시간을 단축한다.
시작하기 전에 Python의 스레딩 처리 구조의 한계를 이야기 하고 싶다. Python은 인터프리터(Interpreter)로 코드를 실행한다. 구조상 복합적인 문제가 발생하는데 이 문제가 스레딩 처리 구조에도 영향을 미친다. 실제 n개의 스레드를 실행해도 한 개의 CPU 코어에서 실행된다. CPU 멀티 코어를 사용해야 하는 고비용 연산 처리 작업을 하는 프로세스 라면 Python 스레딩 사용에 더욱 신중할 필요가 있다.
개발환경
- Python3.2 이상 - thread의 리턴값을 받을 future 객체 라이브러리가 3.2에서 업데이트 되었다.
- pandas_reader library - 웹 스크래핑 라이브러리
- matploblib - 멀티스레딩 worker수에 따른 처리 속도 변화 시각화용
라이브러리 설치
pip install pandas_datareader
pip install matplotlib
개발코드
import threading
import time
import concurrent.futures
import pandas_datareader as web
stocks = [
"000020","000030","000040","000050","000060"
,"000070","000080","000100","000120","000140"
,"000150","000180","000210","000220","000230"
,"000240","000250","000270","000300","000320"
,"000370","000390","000400","000430","000440"
]
def get_data(code):
return web.naver.NaverDailyReader(code).read().astype(int)
25개의 종목코드를 get_data함수를 호출하여 "모든 기간"의 주식데이터를 가져온다.
웹 스크래핑 실행 함수 생성
def normal(cnt):
tot = []
for _ in range(0, cnt):
start = time.time()
rs = []
for stock in stocks:
df = get_data(stock)
rs.append(df)
tot.append(f'{time.time() - start:.2f}')
return tot
def using_thread(cnt, worker=1):
tot = []
for _ in range(0, cnt):
start = time.time()
result_df = []
with concurrent.futures.ThreadPoolExecutor(max_workers=worker) as executor:
# for n in executor.submit(get_data, stocks) # 단건 실행.
for df in executor.map(get_data, stocks): # map으로 n 건 실행.
result_df.append(df)
tot.append(f'{time.time() - start:.2f}')
return tot
- 각 함수는 속도 테스트를 위해 cnt를 인자로 받는다.
- 각 함수는 cnt만큼 처리하면서 소비한 처리시간을 리스트로 리턴한다.
- normal 함수는 stocks 리스트에 정의된 종목코드 수 만큼 동기처리 한다.
- using_thread 함수는 스레드를 사용하여 병렬처리 한다. max_worker는 우선 1로 설정한다.
- concurrent.futures 모듈은 Python 3.2 버전에서 추가 된 고수준 스레드 인터페이스 환경을 제공한다.
- executor.map 은 .join() 을 실행하여 각 스레드가 종료되길 기다리고 결과 값을 리턴한다.
동기 vs 비동기 평균 처리 소요시간 비교
n = np.array(normal(10)).astype(float).mean()
t = np.array(using_thread(10)).astype(float).mean()
print(f'normal: {n:.2f}sec, thread: {t:.2f}sec')
# OUTPUT
'normal: 2.26sec, thread: 1.07sec'
10번 반복하여 평균한 값을 리턴하여 동기와 비동기 처리 시간을 비교하였다.
!! 여기서 주의 할 것은 웹 스크래핑은 네트워크 환경에 따라 처리 시간이 매번 다르게 나타날 수 있다.
Thread Pool의 Max_worker 수에 따른 처리시간 변화
workers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
result = []
for max_worker in workers:
t = np.array(using_thread(10, max_worker)).astype(float).mean()
print(f'max_worker {max_worker}: {t:.2f}sec')
result.append([max_worker, t])
n_result = np.array(result)
최적의 max_worker를 찾는 것은 실행되는 함수의 수, 반복 횟수 등에 따라 가변적이므로 테스트 횟수를 늘려가며 최적의 값을 찾는 것이 좋다.
위의 n_result 값을 T 함수를 사용하여 그래프 x, y로 사용될 데이터로 변환한다.
n_result = n_result.T
print(n_result)
# OUTPUT
array([[1. , 2. , 3. , 4. , 5. , 6. , 7. , 8. , 9. ],
[1.627, 0.941, 0.754, 0.495, 0.468, 0.407, 0.419, 0.477, 0.494]])
Matplotlib 사용하여 추이 분석
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot(_1st[0], _1st[1], label='1st')
ax.plot(_2nd[0], _2nd[1], label='2nd')
ax.plot(_3rd[0], _3rd[1], label='3rd')
ax.set_title("Web scraping using Thread")
ax.set_xlabel("max_worker")
ax.set_ylabel("secs")
ax.legend()
3회 반복 했으며 각 결과 값을 _1st, _2nd, _3rd 변수에 담았다.
결과
Max_worker의 수가 4까지 가면서 처리 속도가 확연히 줄어든 것을 확인 할 수있다. ( 2번째 부터는 웹 캐싱의 영향으로 더 급감한 부분이 있을 수 있다. ) 4 이상 부터는 비슷한 처리결과를 리턴하였다. 이는 종목코드 25개로 한정되어 진행된 부분이라 실제 처리 할 종목코드 수로 확인하는 것이 더 확실하다.
마무리하며..
Shared value를 공유하는 멀티스레딩 환경이 아니여서 쉽게 처리 된 것 같다. 위 프로세스에 더해서 실제 I/O bound가 일어나야 한다. 얻은 데이터를 DB에 적재해야 하는데 이는 Python에서 제공하는 asyncio를 사용하여 처리해본다.
'python' 카테고리의 다른 글
Python - pip 설치 라이브러리 및 Path 확인, 버전 업데이트 하기 (0) | 2021.03.25 |
---|---|
jupyter lab(notebook) 유용한 기능 매직명령어 10가지 및 단축키 (0) | 2021.03.24 |
Python( mariaDB, MySql ) - DB접속, 데이터 조회/변경 하기 ( Pandas DataFrame -> DB -> DataFrame ) (0) | 2021.02.22 |
Python - datetime 날짜 한국시간(timezone) 설정, naive vs aware datetime 이해 (0) | 2021.02.09 |
Python - BeautifulSoup 객체(ResultSet, Tag) Dict, XML로 변환하기 (0) | 2021.02.08 |