본문 바로가기
python

Python - 멀티스레드 사용, 웹 스크래핑 비동기 처리 ( pandas_reader 주식 데이터 스크랩 )

by 맑은안개 2021. 2. 24.

들어가며..  

  프로그래밍에서 병렬처리를 위해 멀티스레드를 다루는 일은 쉬운일은 아니다. 스레드의 동작 원리, 다중 스레드에서 공유되는 Shared value 처리, 그 과정에서 발생하는 Race condition, Dead Lock 문제 등을 고려하여 처리해야 하기 때문이다. 

 

  Shared value 처리를 하지 않고 병렬처리가 요구 되는 비교적 단순한 프로세스라면 멀티스레딩으로 처리 하는 것이 더 효율적일 수 있다. 

 

  주식의 일 데이터 업데이트를 하기 위해 멀티스레딩 환경을 구축했다.  Pandas_reader 라이브러리를 사용해서 국내 모든 상장기업을 스크래핑 한다. 동기적으로 처리하면 종목코드 수에 따라 선형적으로 비례하여 느려질 수 밖에 없다. 이를 멀티스레드 처리 하여 시간을 단축한다. 

 

  시작하기 전에 Python의 스레딩 처리 구조의 한계를 이야기 하고 싶다. Python은 인터프리터(Interpreter)로 코드를 실행한다. 구조상 복합적인 문제가 발생하는데 이 문제가 스레딩 처리 구조에도 영향을 미친다. 실제 n개의 스레드를 실행해도 한 개의 CPU 코어에서 실행된다.     CPU 멀티 코어를 사용해야 하는 고비용 연산 처리 작업을 하는 프로세스 라면 Python 스레딩 사용에 더욱 신중할 필요가 있다. 

 

개발환경

  • Python3.2 이상 - thread의 리턴값을 받을 future 객체 라이브러리가 3.2에서 업데이트 되었다.
  • pandas_reader library - 웹 스크래핑 라이브러리
  • matploblib - 멀티스레딩 worker수에 따른 처리 속도 변화 시각화용

라이브러리 설치

pip install pandas_datareader
pip install matplotlib

 

개발코드

import threading
import time
import concurrent.futures
import pandas_datareader as web

stocks = [
    "000020","000030","000040","000050","000060"
    ,"000070","000080","000100","000120","000140"
    ,"000150","000180","000210","000220","000230"
    ,"000240","000250","000270","000300","000320"
    ,"000370","000390","000400","000430","000440"
]

def get_data(code):
    return web.naver.NaverDailyReader(code).read().astype(int)

25개의 종목코드를 get_data함수를 호출하여 "모든 기간"의 주식데이터를 가져온다. 

 

웹 스크래핑 실행 함수 생성

def normal(cnt):
    tot = []
    for _ in range(0, cnt):
        start = time.time()
        rs = []
        for stock in stocks:
            df = get_data(stock)
            rs.append(df)

        tot.append(f'{time.time() - start:.2f}')
    return tot
            
def using_thread(cnt, worker=1):
    tot = []
    for _ in range(0, cnt):
        start = time.time()

        result_df = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=worker) as executor:
            # for n in executor.submit(get_data, stocks) # 단건 실행.
            for df in executor.map(get_data, stocks): # map으로 n 건 실행.
                result_df.append(df)

        tot.append(f'{time.time() - start:.2f}')
    return tot
  • 각 함수는 속도 테스트를 위해 cnt를 인자로 받는다.
  • 각 함수는 cnt만큼 처리하면서 소비한 처리시간을 리스트로 리턴한다.
  • normal 함수는 stocks 리스트에 정의된 종목코드 수 만큼 동기처리 한다.
  • using_thread 함수는 스레드를 사용하여 병렬처리 한다.  max_worker는 우선 1로 설정한다.
  • concurrent.futures 모듈은 Python 3.2 버전에서 추가 된 고수준 스레드 인터페이스 환경을 제공한다.
  • executor.map 은 .join() 을 실행하여 각 스레드가 종료되길 기다리고 결과 값을 리턴한다.

동기 vs 비동기 평균 처리 소요시간 비교

n = np.array(normal(10)).astype(float).mean()
t = np.array(using_thread(10)).astype(float).mean()

print(f'normal: {n:.2f}sec, thread: {t:.2f}sec')

# OUTPUT
'normal: 2.26sec, thread: 1.07sec'

10번 반복하여 평균한 값을 리턴하여 동기와 비동기 처리 시간을 비교하였다.

!! 여기서 주의 할 것은 웹 스크래핑은 네트워크 환경에 따라 처리 시간이 매번 다르게 나타날 수 있다.

 

Thread Pool의 Max_worker 수에 따른 처리시간 변화

workers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
result = []
for max_worker in workers:
    t = np.array(using_thread(10, max_worker)).astype(float).mean()
    print(f'max_worker {max_worker}: {t:.2f}sec')
    result.append([max_worker, t])

n_result = np.array(result)

최적의 max_worker를 찾는 것실행되는 함수의 수, 반복 횟수 등에 따라 가변적이므로 테스트 횟수를 늘려가며 최적의 값을 찾는 것이 좋다. 

위의 n_result 값을 T 함수를 사용하여 그래프 x, y로 사용될 데이터로 변환한다.

n_result = n_result.T

print(n_result)

# OUTPUT
array([[1.   , 2.   , 3.   , 4.   , 5.   , 6.   , 7.   , 8.   , 9.   ],
       [1.627, 0.941, 0.754, 0.495, 0.468, 0.407, 0.419, 0.477, 0.494]])

 

Matplotlib 사용하여 추이 분석

import matplotlib.pyplot as plt

fig, ax = plt.subplots()
ax.plot(_1st[0], _1st[1], label='1st')
ax.plot(_2nd[0], _2nd[1], label='2nd')
ax.plot(_3rd[0], _3rd[1], label='3rd')
ax.set_title("Web scraping using Thread")
ax.set_xlabel("max_worker")
ax.set_ylabel("secs")
ax.legend()

3회 반복 했으며 각 결과 값을 _1st, _2nd, _3rd 변수에 담았다. 

 

결과

3회 반복 결과

Max_worker의 수가 4까지 가면서 처리 속도가 확연히 줄어든 것을 확인 할 수있다. ( 2번째 부터는 웹 캐싱의 영향으로 더 급감한 부분이 있을 수 있다. ) 4 이상 부터는 비슷한 처리결과를 리턴하였다. 이는 종목코드 25개로 한정되어 진행된 부분이라 실제 처리 할 종목코드 수로 확인하는 것이 더 확실하다.

 

마무리하며..

Shared value를 공유하는 멀티스레딩 환경이 아니여서 쉽게 처리 된 것 같다. 위 프로세스에 더해서 실제 I/O bound가 일어나야 한다. 얻은 데이터를 DB에 적재해야 하는데 이는 Python에서 제공하는 asyncio를 사용하여 처리해본다. 

반응형