Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

async로 동시성 적용하기

이 절에서는 16장에서 스레드로 풀어 보았던 몇몇 동시성 문제를 async 로 다시 다뤄봅니다. 이미 거기서 핵심 아이디어 대부분을 설명했기 때문에, 여기서는 스레드와 퓨처 사이의 차이에 더 집중하겠습니다.

많은 경우, async 로 동시성을 다루는 API는 스레드를 사용할 때의 API와 매우 비슷합니다. 하지만 어떤 경우에는 꽤 다르기도 합니다. 심지어 스레드와 async 사이 API가 겉보기에는 비슷해 보이더라도, 실제 동작 방식은 다를 수 있고, 성능 특성은 거의 항상 다릅니다.

spawn_task 로 새 작업 만들기

16장의 spawn 으로 새 스레드 만들기” 절에서 처음 했던 작업은, 두 개의 별도 스레드에서 숫자를 세는 것이었습니다. 이번에는 같은 일을 async 로 해 봅시다. trpl 크레이트는 thread::spawn 과 아주 비슷한 spawn_task 함수를 제공하고, thread::sleep 의 async 버전인 sleep 함수도 제공합니다. 이 둘을 사용해 목록 17-6처럼 카운팅 예제를 구현할 수 있습니다.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}
Listing 17-6: 메인 작업이 다른 내용을 출력하는 동안 새 작업을 만들어 또 다른 내용을 출력하기

우리는 먼저 최상위 함수가 async 가 될 수 있도록, maintrpl::block_on 으로 감쌉니다.

Note: 이 장의 나머지 예제도 거의 모두 main 에서 같은 trpl::block_on 래핑 코드를 사용합니다. 앞으로는 main 을 자주 생략해 설명하겠지만, 실제 코드에는 여러분이 직접 포함해야 한다는 점을 기억하세요!

그다음 그 블록 안에 두 개의 루프를 씁니다. 각 루프 안에는 다음 메시지를 보내기 전에 0.5초(500밀리초)를 기다리는 trpl::sleep 호출이 있습니다. 한 루프는 trpl::spawn_task 의 본문 안에 두고, 다른 하나는 최상위 for 루프 안에 둡니다. 또한 sleep 호출 뒤에는 await 를 붙입니다.

이 코드는 스레드 기반 구현과 비슷하게 동작합니다. 여러분이 직접 실행했을 때는 출력 순서가 다를 수도 있다는 점까지도 포함해서요.

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

이 버전은 메인 async 블록 안의 for 루프가 끝나는 순간 종료됩니다. spawn_task 로 만든 작업은 main 함수가 끝날 때 함께 종료되기 때문입니다. 생성한 작업이 완전히 끝날 때까지 실행되게 하려면, 그 작업이 끝날 때까지 기다리는 join handle이 필요합니다. 스레드에서는 join 메서드로 스레드 종료를 기다렸습니다. 목록 17-7에서는 await 로 같은 일을 합니다. 작업 핸들 자체도 하나의 퓨처이기 때문입니다. 또한 그 Output 타입은 Result 이므로, await 한 뒤 unwrap 도 호출합니다.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}
Listing 17-7: join handle 에 await 해 작업이 끝까지 실행되게 하기

이 업데이트된 버전은 두 루프가 모두 끝날 때까지 실행됩니다.

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

지금까지 보면 async 와 스레드는 문법만 다를 뿐 비슷한 결과를 내는 것처럼 보일 수도 있습니다. 스레드의 join 대신 await 를 사용하고, sleep 호출에도 await 를 붙였을 뿐이죠.

하지만 더 큰 차이는 이것을 위해 운영체제 스레드를 하나 더 만들 필요조차 없었다는 점입니다. 사실 여기서는 별도 태스크를 만들 필요조차 없습니다. async 블록은 익명 퓨처로 컴파일되므로, 각 루프를 각자의 async 블록 안에 넣고 trpl::join 함수를 사용해 둘 다 끝까지 실행하게 할 수 있습니다.

16장의 “모든 스레드가 끝날 때까지 기다리기” 절에서 std::thread::spawn 이 반환한 JoinHandle 타입에 대해 join 을 사용하는 방법을 보았습니다. trpl::join 함수는 비슷하지만 퓨처를 위한 것입니다. 퓨처 두 개를 주면, 둘 다 완료된 뒤 각 퓨처의 출력값을 튜플로 담은 하나의 새 퓨처를 만들어 냅니다. 그래서 목록 17-8에서는 fut1, fut2 각각을 await 하는 대신, trpl::join 이 만든 새 퓨처 하나를 await 합니다. 그리고 출력은 그저 unit 값 둘을 담은 튜플일 뿐이므로 무시합니다.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}
Listing 17-8: trpl::join 으로 두 익명 퓨처를 함께 await 하기

이 코드를 실행하면 두 퓨처가 모두 끝까지 실행됩니다.

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

이제는 항상 같은 순서 를 보게 됩니다. 이는 스레드나 목록 17-7의 trpl::spawn_task 와는 상당히 다릅니다. 그 이유는 trpl::join 함수가 공정(fair) 하기 때문입니다. 즉, 각 퓨처를 번갈아가며 동일하게 자주 확인하고, 둘 중 하나가 준비되었다고 해서 다른 하나를 굶기지 않습니다. 반면 스레드에서는 운영체제가 어느 스레드를 언제 얼마나 오래 실행시킬지를 결정합니다. async 러스트에서는 어떤 태스크를 언제 확인할지 런타임이 결정합니다. (물론 실제로는 런타임이 내부적으로 운영체제 스레드를 쓰기도 하므로 세부는 더 복잡하지만, 개념적으로는 그렇습니다.) 모든 런타임이 어떤 연산에 대해서도 공정성을 보장해야 하는 것은 아니며, 공정성을 원하는지 여부를 선택할 수 있게 여러 API를 제공하는 경우도 많습니다.

이제 다음과 같은 변형들을 직접 시도해 보며 어떤 결과가 나오는지 살펴보세요.

  • 두 루프 중 하나, 혹은 둘 모두에서 async 블록을 제거하기
  • 각 async 블록을 정의하자마자 곧바로 await 하기
  • 첫 번째 루프만 async 블록으로 감싸고, 두 번째 루프 본문이 끝난 뒤에 그 퓨처를 await 하기

추가 도전 과제로, 코드를 실행해 보기 에 각 경우의 출력이 어떻게 될지 먼저 예상해 보세요!

메시지 전달로 두 태스크 사이 데이터 보내기

퓨처들 사이 데이터 공유 역시 익숙한 방식으로 할 수 있습니다. 이번에도 메시지 전달을 사용하되, 이번에는 타입과 함수의 async 버전을 사용합니다. 16장의 “메시지 전달로 스레드 사이에 데이터 보내기” 절과는 약간 다른 길을 택해, 스레드 기반 동시성과 퓨처 기반 동시성의 핵심 차이를 더 잘 드러내 보겠습니다. 목록 17-9에서는 별도 태스크를 만들지 않고, 하나의 async 블록부터 시작합니다.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("received '{received}'");
    });
}
Listing 17-9: async 채널을 만들고 양쪽 절반을 tx, rx 에 대입하기

여기서 우리는 16장에서 사용했던 다중 생산자, 단일 소비자 채널 API의 async 버전인 trpl::channel 을 사용합니다. 이 API는 스레드 기반 버전과 아주 비슷하지만, 수신자 rx 는 불변이 아니라 가변이고, recv 메서드는 값을 바로 주는 대신 우리가 await 해야 하는 퓨처를 만든다는 점이 다릅니다. 이제 송신자에서 수신자로 메시지를 보낼 수 있습니다. 별도 스레드나 태스크를 만들 필요가 없다는 점에도 주목하세요. rx.recv 호출만 await 하면 됩니다.

std::mpsc::channel 의 동기식 Receiver::recv 메서드는 메시지를 받을 때까지 현재 스레드를 블록합니다. 반면 trpl::Receiver::recv 메서드는 async 이기 때문에 블록하지 않습니다. 메시지가 도착하거나 채널의 송신 쪽이 닫힐 때까지 런타임에 제어권을 돌려주는 것입니다. 반대로 send 호출은 await 하지 않습니다. 이 채널은 unbounded 채널이라 send 가 블록할 필요가 없기 때문입니다.

Note: 이 async 코드 전체가 trpl::block_on 안의 async 블록에서 실행되기 때문에, 그 안에서는 블록을 피할 수 있습니다. 하지만 그 바깥 코드는 block_on 함수가 끝날 때까지 블록됩니다. 이것이 바로 trpl::block_on 의 역할입니다. async 코드를 어디서부터 어디까지 블록할지, 즉 sync 코드와 async 코드의 경계를 어디에 둘지를 여러분이 정하게 해 줍니다.

이 예제에서 두 가지 점에 주목하세요. 첫째, 메시지는 즉시 도착합니다. 둘째, 비록 퓨처를 사용하고는 있지만 아직은 아무 동시성도 없습니다. 이 목록 안의 모든 것은 퓨처가 없었어도 그랬을 것처럼 그냥 순서대로 실행됩니다.

이제 첫 번째 문제부터 해결하기 위해, 메시지를 여러 개 보내고 그 사이사이에 잠깐 쉬는 코드를 목록 17-10처럼 써 봅시다.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}
Listing 17-10: async 채널로 여러 메시지를 보내고, 메시지 사이마다 await 와 함께 잠시 쉬기

메시지를 보내는 것만이 아니라, 그것들을 받아야도 합니다. 지금은 몇 개 메시지가 올지 알고 있으므로 rx.recv().await 를 네 번 수동으로 호출해도 됩니다. 하지만 현실에서는 메시지 수가 몇 개인지 모르는 경우가 대부분이므로, 더 이상 메시지가 오지 않는다는 사실이 확인될 때까지 계속 기다릴 필요가 있습니다.

목록 16-10에서는 동기 채널에서 받은 모든 항목을 처리하기 위해 for 루프를 사용했습니다. 하지만 러스트에는 아직 비동기적으로 생성되는 항목 시퀀스를 for 로 순회하는 방법이 없으므로, 대신 while let 조건 루프를 사용합니다. 이는 6장의 if letlet...else 로 더 간결하게 제어 흐름 쓰기” 절에서 본 if let 의 루프 버전입니다. 루프는 지정한 패턴이 계속 맞는 한 반복을 이어 갑니다.

rx.recv 호출은 퓨처를 만들고, 우리는 그것을 await 합니다. 런타임은 이 퓨처가 준비될 때까지 실행을 잠시 멈춥니다. 메시지가 도착하면 퓨처는 도착한 횟수만큼 Some(message) 로 해결됩니다. 채널이 닫히면, 메시지가 아예 오지 않았더라도 퓨처는 None 으로 해결되어 더 이상 기다릴 값이 없다는 뜻이 됩니다.

while let 루프는 이 모든 것을 한데 묶습니다. rx.recv().await 의 결과가 Some(message) 이면 메시지에 접근할 수 있고, 그것을 루프 본문 안에서 사용할 수 있습니다. 이는 if let 과 같은 방식입니다. 결과가 None 이면 루프는 끝납니다. 루프 본문이 한 번 끝날 때마다 다시 await 지점에 도달하므로, 런타임은 다른 메시지가 도착할 때까지 이 루프를 다시 멈춥니다.

이제 코드는 모든 메시지를 성공적으로 보내고 받습니다. 하지만 아직 문제가 둘 남아 있습니다. 첫째, 메시지가 0.5초 간격으로 오지 않습니다. 프로그램 시작 후 2초 (2000밀리초) 가 지나서야 한꺼번에 도착합니다. 둘째, 이 프로그램은 끝나지 않습니다. 새 메시지를 영원히 기다리며 멈춰 있기 때문입니다. 사용자가 ctrl-C 로 강제로 종료해야 합니다.

하나의 async 블록 안 코드는 선형적으로 실행된다

먼저, 왜 메시지가 기대한 것처럼 중간중간 지연되며 오지 않고 전체 지연이 끝난 뒤 한꺼번에 도착하는지 살펴봅시다. 어떤 async 블록 안에서든, 코드에 await 키워드가 등장하는 순서가 실제 프로그램 실행 시 그 코드가 처리되는 순서입니다.

목록 17-10에는 async 블록이 단 하나뿐이므로, 그 안의 모든 코드는 선형적으로 실행됩니다. 즉, 여전히 동시성은 없습니다. tx.send 호출이 모두 실행되고, 그 사이에 trpl::sleep 과 그에 대응하는 await 지점도 모두 실행됩니다. 그 뒤에야 while let 루프가 recv 호출의 await 지점들에 도달할 수 있습니다.

우리가 원하는, 즉 각 메시지 사이에 실제로 지연이 일어나면서 두 쪽이 동시에 진행되는 동작을 얻으려면, txrx 로직을 각각 자기만의 async 블록 안에 두어야 합니다. 목록 17-11이 그 예입니다. 그리고 목록 17-8에서 했던 것처럼, 둘을 trpl::join 으로 함께 실행하게 합니다. 다시 말해 fut1fut2 를 개별적으로 await 하지 않고, trpl::join 이 만들어 주는 하나의 퓨처를 await 하는 것입니다. 그렇지 않으면 다시 순차 실행으로 돌아가, 바로 지금 피하려는 상황이 되어 버립니다.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-11: sendrecv 를 각자 async 블록으로 분리하고, 그 퓨처들을 await 하기

이렇게 업데이트한 목록 17-11에서는, 이제 메시지가 2초 뒤 한꺼번에 몰려오지 않고 500밀리초 간격으로 출력됩니다.

async 블록 안으로 소유권 이동하기

그래도 프로그램은 여전히 끝나지 않습니다. 이유는 while let 루프와 trpl::join 사이의 상호작용 때문입니다.

  • trpl::join 이 반환한 퓨처는, 전달된 퓨처가 모두 끝나야만 완료됩니다.
  • tx_fut 퓨처는 vals 의 마지막 메시지를 보낸 뒤 마지막 sleep 이 끝나면 완료됩니다.
  • rx_fut 퓨처는 while let 루프가 끝나야 완료됩니다.
  • while let 루프는 rx.recv().awaitNone 을 반환해야만 끝납니다.
  • rx.recv().awaitNone 을 반환하려면 채널의 반대편, 즉 송신 쪽이 닫혀야 합니다.
  • 채널은 우리가 rx.close 를 호출하거나, 송신 쪽 tx 가 drop 될 때만 닫힙니다.
  • 하지만 우리는 rx.close 를 어디서도 호출하지 않으며, txtrpl::block_on 에 넘긴 가장 바깥 async 블록이 끝날 때까지 drop 되지 않습니다.
  • 그런데 그 블록은 trpl::join 이 끝날 때까지 기다리고 있으므로, 다시 처음으로 돌아가게 됩니다.

지금 상태에서는 메시지를 보내는 async 블록이 tx빌려만 쓰고 있습니다. 메시지를 보내는 데 소유권 자체는 필요하지 않기 때문입니다. 하지만 만약 tx 를 그 async 블록 안으로 이동 시킬 수 있다면, 그 블록이 끝날 때 tx 도 함께 drop 될 것입니다. 13장의 “참조를 캡처하거나 소유권을 이동하기” 절에서 클로저에 move 키워드를 쓰는 법을 배웠고, 16장의 “스레드와 함께 move 클로저 사용하기” 절에서 스레드와 함께 쓸 때 왜 자주 필요한지도 보았습니다. 같은 기본 원리가 async 블록에도 적용되므로, move 키워드는 클로저와 똑같이 async 블록에서도 사용할 수 있습니다.

목록 17-12에서는 메시지를 보내는 블록을 단순 async 에서 async move 로 바꿉니다.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-12: 완료 시 정상 종료되도록 목록 17-11을 수정한 버전

이 버전의 코드를 실행하면 마지막 메시지를 모두 보내고 받은 뒤 깔끔하게 종료됩니다. 다음으로는 여러 퓨처에서 데이터를 보내려면 무엇이 달라져야 하는지 보겠습니다.

join! 매크로로 여러 퓨처 묶기

이 async 채널도 다중 생산자 채널이므로, 여러 퓨처에서 메시지를 보내고 싶다면 tx 에 대해 clone 을 호출하면 됩니다. 목록 17-13을 보세요.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}
Listing 17-13: 여러 생산자가 async 블록에서 메시지를 보내기

우리는 첫 번째 spawned task를 만들기 전에 먼저 tx 를 clone 해 tx1 을 얻습니다. 그리고 앞에서 했던 것처럼 tx1 을 그 블록 안으로 이동시킵니다. 그런 뒤 조금 더 느린 간격으로 메시지를 보내는 async 블록 안으로 원래의 tx 도 이동시킵니다. 이 새 블록은 수신 블록 뒤에 두었지만, 앞에 두어도 상관없습니다. 중요한 것은 생성 순서가 아니라, 퓨처들을 어떻게 await 하느냐입니다.

메시지를 보내는 두 async 블록 모두 async move 여야 합니다. 그래야 그 블록들이 끝날 때 txtx1 이 함께 drop 됩니다. 그렇지 않으면 방금 해결했던 무한 대기 문제가 다시 생깁니다.

마지막으로, 추가된 퓨처를 처리하기 위해 trpl::join 대신 trpl::join! 매크로로 바꿉니다. join! 매크로는, 컴파일 시점에 개수가 정해져 있는 여러 퓨처를 함께 기다립니다. 나중에 이 장에서 개수를 미리 모르는 퓨처 집합을 기다리는 방법도 다루겠습니다.

이제 두 송신 퓨처가 보내는 메시지가 모두 출력되고, 각각의 지연 시간이 조금 다르기 때문에 메시지 수신도 그 다른 간격을 반영하게 됩니다.

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

지금까지 우리는 메시지 전달을 이용해 퓨처 사이로 데이터를 보내는 법, 하나의 async 블록 안 코드가 순차적으로 실행된다는 사실, async 블록 안으로 소유권을 이동시키는 법, 그리고 여러 퓨처를 함께 묶는 법을 살펴보았습니다. 다음으로는 런타임에게 “이제 다른 작업을 해도 된다”고 알려 줘야 하는 이유와 방법을 이야기해 보겠습니다.