Python의 Threading과 ThreadPoolExecutor 비교
Python에서 동시성 구현할 때 필요한 두 가지 중요한 도구인 threading
모듈과 concurrent.futures
모듈의 ThreadPoolExecutor
클래스에 대하여 설명한다. 이 두 도구는 각각 다른 특징과 사용 목적을 가지고 있으며, 적절한 상황에서 활용할 때 최적의 성능을 얻을 수 있다.
개요: 스레드가 필요한 이유
파이썬으로 개발 시 웹 페이지 여러 개 다운로드하거나, 여러 파일 동시 처리하거나, API 요청 동시에 보내야 하는 경우가 발생한다. 이러한 I/O 작업은 대기 시간이 길어서 순차적으로 처리하면 시간이 과도하게 소요된다. 이러한 경우 동시성(Concurrency) 구현이 필요하며, 본 문서에서는 그 방법 중 두 가지를 비교한다.
1. Threading 모듈: 기본 스레드 구현법
개념
threading
모듈은 파이썬에서 스레드 기반 병렬 처리를 위한 기본 모듈이다. 스레드는 프로세스 내에서 실행되는 작업 단위로, 같은 메모리 공간을 공유한다. Python에서 스레드는 운영체제 수준의 네이티브 스레드를 사용한다.
주요 특징
- 단일 스레드 관리: 각 스레드 객체는 보통 하나의 작업만 처리함
- 저수준 API: 스레드의 생성, 시작, 종료를 직접 관리해야 하므로 코드가 길어질 수 있음
- 동질적 작업에 적합: 비슷한 유형의 작업 여러 번 실행할 때 적합함
- 일회성 사용: 스레드 객체는 한 번 사용 후 재사용 불가능 (매번 새로 생성 필요)
- 결과 반환: 기본적으로 결과 반환 메커니즘이 없으며, 필요할 경우 직접 구현해야 함
사용 방법
Thread
클래스를 사용하는 방법은 두 가지가 있다:
1) 함수 실행 방식 (간단한 작업에 적합)
이 방식은 외부 상태와 상호작용이 적고, 값을 반환하지 않는 일회성 작업에 적합하다.
import threading
import requests
def download_image(url, filename):
# 이미지 다운로드 작업
print(f"{filename} 다운로드 시작...")
response = requests.get(url)
with open(filename, 'wb') as f:
f.write(response.content)
print(f"{filename} 다운로드 완료!")
# 다운로드할 이미지 목록
image_urls = [
("https://example.com/image1.jpg", "image1.jpg"),
("https://example.com/image2.jpg", "image2.jpg"),
("https://example.com/image3.jpg", "image3.jpg")
]
# 스레드 생성 및 실행
threads = []
for url, filename in image_urls:
thread = threading.Thread(target=download_image, args=(url, filename))
threads.append(thread)
thread.start()
# 모든 스레드가 완료될 때까지 기다리기
for thread in threads:
thread.join()
print("모든 이미지 다운로드 완료!")
위 코드 실행 시 여러 이미지를 동시에 다운로드하는 것을 확인할 수 있다. 순차적 처리 대비 처리 시간이 단축된다.
2) 클래스 상속 방식 (복잡한 기능이 필요한 경우 적합)
이 방식은 더 많은 유연성을 제공하며, 상태 관리가 필요하거나 여러 메서드를 구현해야 하는 경우에 유용하다. 수명이 긴 작업이나 애플리케이션 내의 서비스에 적합하다.
import threading
import requests
class ImageDownloader(threading.Thread):
def __init__(self, url, filename, callback=None):
super().__init__()
self.url = url
self.filename = filename
self.callback = callback
self.success = False
def run(self):
try:
print(f"{self.filename} 다운로드 시작...")
response = requests.get(self.url)
with open(self.filename, 'wb') as f:
f.write(response.content)
self.success = True
print(f"{self.filename} 다운로드 완료!")
# 콜백 함수가 있으면 실행
if self.callback:
self.callback(self.filename, self.success)
except Exception as e:
print(f"{self.filename} 다운로드 실패: {str(e)}")
self.success = False
# 다운로드 완료 후 처리하는 콜백 함수
def process_downloaded_image(filename, success):
if success:
print(f"✅ {filename} 처리 중...")
# 여기서 이미지 처리 작업을 수행할 수 있음 (크기 조정, 필터 적용 등)
# 다운로드할 이미지 목록
image_urls = [
("https://example.com/image1.jpg", "image1.jpg"),
("https://example.com/image2.jpg", "image2.jpg"),
("https://example.com/image3.jpg", "image3.jpg")
]
# 스레드 생성 및 실행
downloaders = []
for url, filename in image_urls:
downloader = ImageDownloader(url, filename, callback=process_downloaded_image)
downloaders.append(downloader)
downloader.start()
# 모든 스레드가 완료될 때까지 기다리기
for downloader in downloaders:
downloader.join()
# 성공적으로 다운로드된 이미지 수 확인
success_count = sum(1 for d in downloaders if d.success)
print(f"다운로드 완료! ({success_count}/{len(downloaders)} 성공)")
이 방식은 상태 관리가 필요하거나 여러 메서드를 구현해야 하는 경우에 유용하다. 콜백 함수 사용이 가능하며 복잡한 로직 구현도 가능하다.
2. ThreadPoolExecutor: 스레드 관리 간소화
개념
ThreadPoolExecutor
는 concurrent.futures
모듈의 일부로, 스레드 풀을 사용하여 비동기 작업을 관리하는 고수준 인터페이스이다. 스레드 관리를 자동화하여 개발자가 작업 자체에 집중할 수 있도록 한다. Python 3.2부터 도입되었으며, 스레드 풀 패턴을 쉽게 구현할 수 있게 해준다.
주요 특징
- 스레드 풀 관리: 작업자 스레드 풀을 자동으로 생성 및 관리
- 작업 재사용: 스레드를 재사용하여 오버헤드 감소
- 고수준 API: 작업 제출 및 결과 수집이 간편함
- 이질적 작업에 적합: 다양한 유형의 작업 처리 가능
- 결과 처리: Future 객체를 통한 작업 결과 및 상태 확인 용이
- 다중 작업 관리: 다수의 작업을 일괄 제출 및 관리 가능
- 작업 취소: Future 객체를 통해 실행 전 작업 취소 가능
사용 방법
ThreadPoolExecutor는 두 가지 주요 방식으로 사용할 수 있다: submit() 메서드와 map() 메서드이다.
1) submit() 메서드 사용
from concurrent.futures import ThreadPoolExecutor
import requests
import time
def download_image(url_info):
url, filename = url_info
start_time = time.time()
try:
print(f"{filename} 다운로드 시작...")
response = requests.get(url)
with open(filename, 'wb') as f:
f.write(response.content)
elapsed = time.time() - start_time
return f"✅ {filename} 다운로드 완료! ({elapsed:.2f}초 소요)"
except Exception as e:
return f"❌ {filename} 다운로드 실패: {str(e)}"
# 다운로드할 이미지 목록
image_urls = [
("https://example.com/image1.jpg", "image1.jpg"),
("https://example.com/image2.jpg", "image2.jpg"),
("https://example.com/image3.jpg", "image3.jpg"),
("https://example.com/image4.jpg", "image4.jpg"),
("https://example.com/image5.jpg", "image5.jpg")
]
# 스레드 풀 생성 (최대 작업자 수 지정)
start_time = time.time()
with ThreadPoolExecutor(max_workers=3) as executor: # 최대 3개의 스레드만 사용
# submit 사용하기
futures = [executor.submit(download_image, url_info) for url_info in image_urls]
# 완료된 작업 결과 확인하기
for future in futures:
print(future.result())
total_time = time.time() - start_time
print(f"모든 이미지 다운로드 완료! (총 {total_time:.2f}초 소요)")
print(f"참고: ThreadPoolExecutor는 max_workers={executor._max_workers}개의 스레드만 사용함")
2) map() 메서드 사용
from concurrent.futures import ThreadPoolExecutor
import time
def print_sum(num1, num2):
time.sleep(1) # 작업 시간을 시뮬레이션
return f"{num1} + {num2} = {num1 + num2}"
# 계산할 숫자 쌍
numbers1 = [1, 2, 3, 4, 5]
numbers2 = [10, 20, 30, 40, 50]
start_time = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
# map 함수로 일괄 처리하기
results = executor.map(print_sum, numbers1, numbers2)
for result in results:
print(result)
total_time = time.time() - start_time
print(f"모든 계산 완료! (총 {total_time:.2f}초 소요)")
ThreadPoolExecutor 사용 시 코드가 간결해지고, 스레드 개수 제한이 가능하여 시스템 리소스 관리에 유리하다. 위 예제에서는 최대 3개의 스레드만 사용하도록 제한했다.
3. 주요 차이점 비교
두 방식의 차이점을 표로 정리하면 다음과 같다:
특성 | Threading | ThreadPoolExecutor |
---|---|---|
사용 난이도 | 상대적으로 복잡함 | 상대적으로 간단함 |
작업 유형 | 동질적 작업(비슷한 작업) | 이질적 작업(다양한 작업) |
스레드 사용 | 일회성 (매번 새로 생성) | 재사용 (효율적) |
작업 관리 | 직접 관리 필요 | 자동 관리 제공 |
결과 처리 | 직접 구현 필요 | Future 객체로 처리 |
상태 확인 | 구현 필요 | done(), running() 등 메서드 제공 |
작업 취소 | 구현 필요 | Future 객체의 cancel() 메서드로 가능 |
코드 양 | 상대적으로 많음 | 상대적으로 적음 |
동시 작업 관리 | 개발자가 직접 구현 | 내장 기능으로 제공 |
4. 성능 비교
Thread를 직접 사용하는 것과 ThreadPoolExecutor를 사용하는 것 사이에는 성능 차이가 발생할 수 있다. 실제 프로젝트에서 Thread에서 ThreadPoolExecutor로 리팩토링한 결과, URL 파싱 작업 시간이 7초에서 3초대로 크게 단축된 사례가 있다.
이러한 성능 향상의 주요 원인은 다음과 같다:
- 스레드 생성 오버헤드 감소: ThreadPoolExecutor는 스레드를 재사용하여 새 스레드 생성 및 소멸에 따른 오버헤드를 줄인다.
- 작업 관리 최적화: 큐에 작업을 추가하고 스레드 풀에서 처리하는 방식으로 효율적인 작업 분배가 가능하다.
- 리소스 제한: 최대 작업자 수를 제한하여 과도한 스레드 생성을 방지하고 시스템 자원을 효율적으로 사용한다.
다음은 간단한 성능 비교 예제이다:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def process_item(item):
# 작업을 시뮬레이션하기 위한 sleep
time.sleep(0.1)
return item * 2
# 1. Threading 직접 사용
def test_threading():
results = [None] * 100
def worker(i):
results[i] = process_item(i)
start_time = time.time()
threads = []
for i in range(100):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
end_time = time.time()
return end_time - start_time
# 2. ThreadPoolExecutor 사용
def test_threadpool():
start_time = time.time()
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(executor.map(process_item, range(100)))
end_time = time.time()
return end_time - start_time
# 성능 비교
thread_time = test_threading()
pool_time = test_threadpool()
print(f"Threading 직접 사용: {thread_time:.4f}초")
print(f"ThreadPoolExecutor 사용: {pool_time:.4f}초")
print(f"성능 향상: {((thread_time - pool_time) / thread_time * 100):.2f}%")
위 예제를 실행하면 일반적으로 ThreadPoolExecutor가 더 나은 성능을 보이는 것을 확인할 수 있다.
5. 적합한 사용 상황
실제 프로젝트에서 어떤 상황에 어떤 도구를 사용하는 것이 적합한지 정리하면 다음과 같다.
Threading 사용이 적합한 경우
- 세밀한 제어 필요: 스레드 동작을 세밀하게 제어해야 하는 경우
- 장기 실행 작업: 백그라운드에서 지속적으로 실행되어야 하는 작업
- 복잡한 상태 관리: 스레드마다 상태를 유지하고 여러 메서드가 필요한 경우
- 사용자 정의 기능: Thread 클래스를 확장하여 특별한 기능을 추가해야 하는 경우
- 적은 수의 스레드: 소수의 스레드만 필요하고 동시 실행되는 스레드 수가 제한적인 경우
예시: 웹 크롤러에서 각 스레드가 특정 사이트를 담당하고 쿠키나 세션을 유지해야 하는 경우 Thread 클래스 상속 방식이 적합하다.
ThreadPoolExecutor 사용이 적합한 경우
- 간단한 병렬 처리: 코드를 간결하게 유지해야 하는 경우
- 다수의 작업 처리: 비슷한 작업을 다수 동시 실행해야 하는 경우
- 결과 수집: 작업 결과를 쉽게 모으고 처리해야 하는 경우
- 리소스 제한: 동시 실행 스레드 수를 제한해야 하는 경우
- 작업 취소 필요: 실행 전 작업 취소 기능이 필요한 경우
- 동일 함수 호출: 동일한 함수를 다양한 인자로 여러 번 호출해야 하는 경우
- I/O 바운드 작업: 네트워크 요청, 파일 처리 등 I/O 바운드 작업을 효율적으로 처리해야 하는 경우
예시: 여러 API에서 데이터를 가져오거나 다수의 파일을 동시에 처리하는 경우 ThreadPoolExecutor가 코드 간결성과 관리 용이성 측면에서 유리하다.
6. 주의사항
공통 주의사항
- GIL(Global Interpreter Lock): 두 방식 모두 Python의 GIL 제약으로 인해 CPU 바운드 작업보다는 I/O 바운드 작업에 적합하다.
- 스레드 안전성: 여러 스레드에서 데이터 공유 시 Lock, Semaphore 등의 동기화 메커니즘으로 보호해야 한다.
- 데드락 방지: 스레드 간 상호 대기 상황이 발생하지 않도록 주의해야 한다.
스레드 안전성에 대한 예제:
# 스레드 안전하게 데이터 공유하기 예시
import threading
# 공유 변수와 락
counter = 0
counter_lock = threading.Lock()
def increment_counter():
global counter
for _ in range(1000000):
# 락 획득
with counter_lock:
counter += 1
# 두 스레드에서 동시에 카운터 증가시키기
t1 = threading.Thread(target=increment_counter)
t2 = threading.Thread(target=increment_counter)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"최종 카운터 값: {counter}") # 예상: 2000000
이와 같이 Lock을 사용하여 데이터 일관성을 유지할 수 있다. 그렇지 않으면 레이스 컨디션이 발생하여 예상치 못한 결과가 나올 수 있다.
ThreadPoolExecutor 사용 시 주의사항
- 스레드 풀 크기: 과도한 작업자 수는 성능 저하를 초래할 수 있다. CPU 코어 수의 몇 배 정도가 적절하다.
- 작업 크기: 작은 작업들을 ThreadPoolExecutor에 제출 시 오버헤드가 증가할 수 있다.
- 예외 처리: 작업에서 발생한 예외를 적절히 처리해야 한다. try-except 구문 활용이 필요하다.
- 컨텍스트 관리자 활용: 리소스 누수 방지를 위해 with 문을 사용한 컨텍스트 관리자 패턴을 활용하는 것이 좋다.
7. ThreadPoolExecutor와 AsyncIO 비교
비동기 프로그래밍에 사용되는 AsyncIO와의 비교 분석이다.
AsyncIO 개요
asyncio
는 Python의 비동기 프로그래밍을 위한 라이브러리로, 코루틴(Coroutine)을 사용하여 동시성을 구현한다. async/await
구문으로 비동기 코드를 작성한다.
주요 차이점
특성 | ThreadPoolExecutor | AsyncIO |
---|---|---|
기반 기술 | 시스템 스레드 | 코루틴(단일 스레드) |
멀티태스킹 방식 | 선점형(OS가 관리) | 협력형(명시적 양보) |
구현 수준 | 시스템 수준 | 파이썬 수준 |
작업 유형 | 일반 함수 | async 함수 필수 |
동시 작업 개수 | 수백 개 정도 | 수천~수만 개 가능 |
스레드 안전성 | 고려 필요 | 단일 스레드로 불필요 |
적합한 작업 | 블로킹 I/O | 논블로킹 I/O |
코드 예시
# AsyncIO 예시
import asyncio
import aiohttp
import time
async def fetch_url(session, url, filename):
"""URL에서 콘텐츠를 비동기적으로 가져와 파일에 저장"""
print(f"{filename} 다운로드 시작...")
start_time = time.time()
try:
async with session.get(url) as response:
content = await response.read()
with open(filename, 'wb') as f:
f.write(content)
elapsed = time.time() - start_time
return f"✅ {filename} 다운로드 완료! ({elapsed:.2f}초 소요)"
except Exception as e:
return f"❌ {filename} 다운로드 실패: {str(e)}"
async def main():
# 다운로드할 이미지 목록
image_urls = [
("https://example.com/image1.jpg", "async_image1.jpg"),
("https://example.com/image2.jpg", "async_image2.jpg"),
("https://example.com/image3.jpg", "async_image3.jpg"),
("https://example.com/image4.jpg", "async_image4.jpg"),
("https://example.com/image5.jpg", "async_image5.jpg")
]
start_time = time.time()
# aiohttp 세션 생성
async with aiohttp.ClientSession() as session:
# 모든 다운로드 작업을 동시에 실행
tasks = [fetch_url(session, url, filename) for url, filename in image_urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
total_time = time.time() - start_time
print(f"모든 이미지 다운로드 완료! (총 {total_time:.2f}초 소요)")
print(f"참고: AsyncIO는 단일 스레드에서 {len(image_urls)}개 작업을 동시에 처리함")
# 비동기 이벤트 루프 실행
asyncio.run(main())
AsyncIO는 코드 구조가 다소 다르지만, 대규모 동시성 처리에 효율적이다. 수천 개의 동시 연결도 처리 가능하다.
선택 기준
- ThreadPoolExecutor 선택 조건:
- 기존 동기 코드를 최소한의 수정으로 병렬 실행해야 하는 경우
- 블로킹 라이브러리(requests 등) 사용이 필요한 경우
- 동시성에 대한 깊은 이해 없이 간단히 사용해야 하는 경우
- AsyncIO 선택 조건:
- 다수(수천~수만 개)의 작업을 동시에 처리해야 하는 경우
- 전체 코드를 비동기 방식으로 작성할 수 있는 경우
- 고성능 네트워크 애플리케이션(서버 등) 개발 시
- 세밀한 제어가 필요한 경우
결론: 상황에 맞는 선택
Python에서 동시성 프로그래밍 시 상황에 맞는 도구 선택이 중요하다:
- threading: 세밀한 제어가 필요하거나 복잡한 스레드 로직이 필요한 경우
- ThreadPoolExecutor: 다수의 I/O 작업을 간단하게 동시 처리해야 하는 경우
- asyncio: 대량의 I/O 작업을 동시에 처리하거나 비동기 패러다임을 활용해야 하는 경우
세 가지 모두 I/O 바운드 작업에 최적화되어 있으며, CPU 집약적 작업에는 multiprocessing
모듈이나 ProcessPoolExecutor
가 더 적합하다.
기술을 프로젝트에 적용할 때는 해당 기술의 설계 의도를 이해하고 사용하는 것이 중요하다. 이를 통해 더 효율적인 코드를 작성하고 성능을 최적화할 수 있다.
'Python' 카테고리의 다른 글
curl_cffi 도입기: 웹 스크래핑 문제 해결하기 (0) | 2025.05.11 |
---|---|
Python - 이벤트 루프 (0) | 2025.04.20 |