메인
투자 노트

코드 실행을 다른 CPU로 보내버리기

@8/17/2022
Python 병렬 프로그래밍

상황 설정

이렇게 3초, 5초, 7초, 10초가 걸리는 함수가 있다.
import time def spend_3s(): print("3s start") time.sleep(3) print("3s done") return 3 def spend_5s(): print("5s start") time.sleep(5) print("5s done") return 5 def spend_7s(): print("7s start") time.sleep(7) print("7s done") return 7 def spend_10s(): print("10s start") time.sleep(10) print("10s done") return 10
Python
복사
이 함수들이 모두 실행되려면 총 25초가 필요하다
first = spend_3s() second = spend_5s() third = spend_7s() last = spend_10s() print(f"모든 작업이 완료되었습니다. 작업 결과: {first} , {second} , {third} , {last} ")
Python
복사
3초 + 5초 + 7초 + 10초 = 25초
순차적으로 실행하면 25초 가량 걸린다
각 함수들이 독립적인 작업이라 순차적으로 실행될 필요가 없는데 하나하나 기다리는건 너무 오래걸린다. 그냥 4개의 CPU에게 4개의 함수를 던져줘서 10초만에 모두 끝내버리면 좋겠다.

준비물

이렇게 ProcessingUnitFunc이라는 데코레이터를 만들어주면 함수 위에 달아서 다른 CPU로 보내버릴 수 있다.
from multiprocessing import Process, Queue, set_start_method set_start_method("fork") class ProcessingUnitFunc: """ 함수를 다른 CPU로 보내버리는 데코레이터 함수가 callable 객체를 반환하게 됩니다. 함수가 반환하는 객체를 call 할때 다른 CPU로부터 함수의 반환값을 가져옵니다. """ def __init__(self, func): self.func = func self.queue = Queue(1) def _func(self, queue, *args, **kwargs): return_data = self.func(*args, **kwargs) queue.put(return_data) def __call__(self, *args, **kwargs): self.proc = Process(target=self._func, args=(self.queue, *args), kwargs=kwargs) self.proc.start() def receive(): return self.queue.get() return receive
Python
복사
parallel.py

기존 함수에 적용

정말 쉽다. 그냥 함수 위에 @ProcessingUnitFunc 만 달아주면 된다
import time from parallel import ProcessingUnitFunc @ProcessingUnitFunc def spend_3s(): print("3s start") time.sleep(3) print("3s done") return 3 @ProcessingUnitFunc def spend_5s(): print("5s start") time.sleep(5) print("5s done") return 5 @ProcessingUnitFunc def spend_7s(): print("10s start") time.sleep(5) print("10s done") return 7 @ProcessingUnitFunc def spend_10s(): print("10s start") time.sleep(5) print("10s done") return 10
Python
복사
이제 이 함수는 기존과 좀 다르게 동작한다.
쉽게 표현하자면 함수를 끈이 묶인 갈고리에 걸어서 다른 CPU에게 던져버리는 것이다. 함수는 갈고리에 연결된 끈을 반환하고 다른 CPU로 던져진다. 그리고 자신의 코드를 모두 실행한 뒤 반환값을 갈고리에 걸어두고 죽는다.
메인CPU는 함수와 연결된 끈을 반환받는다. 이제 이 끈을 당기면 함수가 다른 CPU에서 갈고리에 걸어둔 반환값을 가져올 수 있다.
# 작업 던져주기 current CPU >>>>>>> other CPUs spend_3s_receiver = spend_3s() spend_5s_receiver = spend_5s() # spend_5s_receiver가 spend_5s 함수와 연결된 끈이다 spend_7s_receiver = spend_7s() spend_10s_receiver = spend_10s() # 작업 결과 회수 current CPU <<<<<<< other CPUs first = spend_3s_receiver() second = spend_5s_receiver() # 끈을 당겨서 반환값을 가져온다 third = spend_7s_receiver() last = spend_10s_receiver() print(f"모든 작업이 완료되었습니다. 작업 결과: {first} , {second} , {third} , {last} ")
Python
복사
만약 함수가 종료되지 않아서 아직 갈고리에 반환값을 걸지 못했는데 끈을 당기면 어떻게 되는가? 그러면 이제 갈고리에 반환값이 걸릴때까지 꼼짝없이 끈을 잡고 기다려야 한다. (해당 라인에서 Blocking 된다.)
이젠 10초면 모두 끝난다

1 CPU = 1 Function

모든 함수에 @ProcessingUnitFunc 를 달아서 싹다 병렬로 돌려버릴 수 있으면 얼마나 좋을까? 하지만 컴퓨터 CPU갯수는 무제한이 아니기 때문에 현실적으로 힘들다. 또한 절대로 모든 함수가 독립적일 수 없다. A함수가 실행되어야지 B함수가 작동할 수 있는 경우가 많다.
>>> import os >>> os.cpu_count()
Python
복사
CPU 갯수 확인하는 방법
다시 처음처럼 모든 함수들에서 데코레이터를 없애고 시작한다.
import time from parallel import ProcessingUnitFunc def spend_3s(): print("3s start") time.sleep(3) print("3s done") return 3 def spend_5s(): print("5s start") time.sleep(5) print("5s done") return 5 def spend_7s(): print("7s start") time.sleep(7) print("7s done") return 7 def spend_10s(): print("10s start") time.sleep(10) print("10s done") return 10
Python
복사

Case1 : 함수는 4개 , CPU는 2개

이럴때는 3초, 10초를 묶어서 다른 CPU로 보내버리고 나머지 5초, 7초는 메인 CPU에서 실행시키면 된다. 이렇게 하면 CPU가 두개 뿐이어도 총 실행시간이 13초로 모든 함수를 병렬처리 했을때랑 3초밖에 차이가 안난다.
@ProcessingUnitFunc def cpu_01(): first = spend_3s() last = spend_10s() return first , last # 작업 던져주기 current CPU >>>>>>> other CPU cpu_01_receiver = cpu_01() # 메인 CPU에서 실행 second = spend_5s() third = spend_7s() # 작업 결과 회수 current CPU <<<<<<< other CPU first, last = cpu_01_receiver() print(f"모든 작업이 완료되었습니다. 작업 결과: {first} , {second} , {third} , {last} ")
Python
복사
CPU 함수 네이밍은 그냥 cpu_01 , cpu_02 , cpu_03 .. 이렇게 짓는다
말하고 싶은건 아무리 독립적인 함수들이어도 소요 시간을 잘 분배해서 최소한의 CPU로 최고 속도를 낼 수 있는 방법을 찾아야 한다는 것이다.

Case2 : 의존성을 가지는 함수들

CPU가 아무리 많아도 의존성을 가지는 함수들은 때어놓을 수 없다. spend_5s 함수가 spend_3s의 반환값을 받도록 만들면 무조건 spend_3s → spend_5s 순서로 실행되어야 한다
def spend_5s(first:int): print("5s start" , first) time.sleep(5) print("5s done") return 5
Python
복사
이제 이 spend_5s가 살행되기 전에 무조건 spend_3s가 실행되어서 first인자를 받아야 한다.
그러면 3초(spend_3s) 5초(spend_5s)를 무조건 한 CPU에 넣어서 순차 실행되도록 해야 하고 남은 7초 10초를 개별적인 CPU에서 실행시키면 된다.
@ProcessingUnitFunc def cpu_01(): first = spend_3s() second = spend_5s(first) return first , second @ProcessingUnitFunc def cpu_02(): third = spend_7s() return third # 작업 던져주기 current CPU >>>>>>> other CPUs cpu_01_receiver = cpu_01() cpu_02_receiver = cpu_02() # 메인 CPU에서 10초짜리 함수 실행 last = spend_10s() # 작업 결과 회수 current CPU <<<<<<< other CPUs first, second = cpu_01_receiver() third = cpu_02_receiver() print(f"모든 작업이 완료되었습니다. 작업 결과: {first} , {second} , {third} , {last} ")
Python
복사