Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

단일 스레드 서버를 멀티스레드 서버로 바꾸기

지금 서버는 각 요청을 차례대로 처리합니다. 즉, 첫 번째 연결 처리가 끝나기 전까지는 두 번째 연결을 처리하지 않습니다. 요청이 점점 많아질수록 이런 직렬 실행 방식은 점점 비효율적이 됩니다. 처리 시간이 오래 걸리는 요청이 하나 들어오면, 뒤따르는 요청이 빠르게 처리할 수 있는 요청이라 해도 그 긴 요청이 끝날 때까지 기다려야 합니다. 이를 고쳐야 합니다. 하지만 먼저, 이 문제가 실제로 어떻게 드러나는지 살펴보겠습니다.

느린 요청 시뮬레이션하기

느리게 처리되는 요청이 현재 서버 구현에서 다른 요청들에 어떤 영향을 주는지 살펴보겠습니다. 목록 21-10은 /sleep 요청을 처리하도록 구현한 코드로, 응답하기 전에 서버가 5초 동안 잠들게 만들어 느린 응답을 흉내 냅니다.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-10: 5초 동안 잠들어 느린 요청을 시뮬레이션하기

이제 경우의 수가 세 가지가 되었기 때문에 if 대신 match 로 바꿨습니다. 문자열 리터럴 값에 대해 패턴 매칭하려면 request_line 의 슬라이스에 대해 명시적으로 매칭해야 합니다. match 는 동등성 비교 메서드와 달리 자동으로 참조/역참조를 해주지 않기 때문입니다.

첫 번째 팔은 목록 21-9의 if 블록과 같습니다. 두 번째 팔은 /sleep 요청과 매칭됩니다. 이 요청을 받으면 서버는 성공 HTML 페이지를 렌더링하기 전에 5초 동안 잠듭니다. 세 번째 팔은 목록 21-9의 else 블록과 같습니다.

이 서버가 얼마나 원시적인지 알 수 있습니다. 실제 라이브러리라면 여러 요청을 훨씬 덜 장황한 방식으로 구분할 것입니다.

cargo run 으로 서버를 시작하세요. 그런 다음 브라우저 창 두 개를 열고, 하나는 http://127.0.0.1:7878, 다른 하나는 http://127.0.0.1:7878/sleep 를 띄웁니다. 이전처럼 / URI를 몇 번 입력해 보면 빠르게 응답합니다. 하지만 /sleep 을 입력한 뒤 / 를 다시 열어 보면, /sleep 이 5초를 전부 자고 난 뒤에야 로드되는 것을 볼 수 있습니다.

느린 요청 뒤로 다른 요청이 밀리는 일을 막는 방법은 여러 가지가 있습니다. 17장에서 사용했던 async 도 그중 하나입니다. 여기서는 스레드 풀을 구현해 보겠습니다.

스레드 풀로 처리량 개선하기

스레드 풀 은 작업을 처리할 준비를 마치고 대기 중인 스레드들의 집합입니다. 프로그램이 새 작업을 받으면, 풀 안의 스레드 하나를 그 작업에 할당하고 그 스레드가 작업을 처리합니다. 나머지 스레드들은 첫 번째 스레드가 작업을 처리하는 동안 들어오는 다른 작업을 처리할 수 있습니다. 첫 번째 스레드가 작업을 끝내면, 다시 놀고 있는 스레드들의 풀로 돌아가 새 작업을 맡을 준비를 합니다. 스레드 풀을 사용하면 연결을 동시에 처리할 수 있으므로 서버의 처리량이 늘어납니다.

풀 안의 스레드 수는 작게 제한해서 DoS 공격으로부터 보호하겠습니다. 요청이 올 때마다 새 스레드를 만드는 프로그램이라면, 누군가 서버에 1천만 개의 요청을 보내기만 해도 모든 서버 자원을 소모시켜 요청 처리를 사실상 멈춰 세울 수 있습니다.

따라서 무제한으로 스레드를 생성하는 대신, 풀에서 대기 중인 고정 개수의 스레드를 둘 것입니다. 들어온 요청은 풀로 보내져 처리됩니다. 풀은 들어오는 요청의 큐를 유지합니다. 풀의 각 스레드는 큐에서 요청 하나를 꺼내 처리한 다음, 큐에 다음 요청을 요구합니다. 이 설계에서는 N 이 스레드 수일 때 최대 N 개의 요청을 동시에 처리할 수 있습니다. 각 스레드가 오래 걸리는 요청을 처리하는 중이라면 뒤따르는 요청은 여전히 큐에 쌓일 수 있지만, 그런 지점에 도달하기 전까지 감당할 수 있는 장기 실행 요청 수는 늘어납니다.

이 기법은 웹 서버 처리량을 늘리는 방법 중 하나일 뿐입니다. fork/join 모델, 단일 스레드 async I/O 모델, 멀티스레드 async I/O 모델 같은 다른 방법도 있습니다. 이 주제가 흥미롭다면 다른 해법을 더 읽고 직접 구현해 볼 수 있습니다. 러스트처럼 저수준 언어라면 이런 선택지가 모두 가능합니다.

스레드 풀 구현을 시작하기 전에, 이 풀을 사용하는 모습이 어떠해야 하는지 먼저 이야기해 봅시다. 코드를 설계할 때는 클라이언트 인터페이스를 먼저 써 보는 것이 설계를 이끄는 데 도움이 됩니다. 즉, 코드를 어떤 방식으로 호출하고 싶은지에 맞춰 API를 먼저 작성하고, 그 구조 안에 기능을 구현하는 편이 기능을 먼저 만든 뒤 공개 API를 설계하는 것보다 낫습니다.

12장의 프로젝트에서 테스트 주도 개발을 사용했던 것과 비슷하게, 여기서는 컴파일러 주도 개발을 사용하겠습니다. 우리가 원하는 함수를 호출하는 코드를 먼저 쓰고, 그런 다음 컴파일러 오류를 보면서 무엇을 바꿔야 코드가 동작하게 되는지 결정할 것입니다. 다만 그 전에, 출발점으로 삼되 საბოლო적인 해법으로는 쓰지 않을 기법부터 살펴보겠습니다.

요청마다 스레드 하나씩 생성하기

먼저, 연결마다 새 스레드를 만든다면 코드가 어떤 모습일지 살펴보겠습니다. 앞에서 말했듯, 무제한으로 스레드를 만들 가능성이 있다는 문제가 있으므로 이것은 최종 계획이 아닙니다. 하지만 우선 동작하는 멀티스레드 서버를 만든 뒤, 그다음 개선책으로 스레드 풀을 추가하면 두 해법을 비교하기 쉬워집니다.

목록 21-11은 for 루프 안에서 각 스트림을 처리할 새 스레드를 생성하도록 main 을 바꾸는 방법을 보여 줍니다.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-11: 각 스트림마다 새 스레드 생성하기

16장에서 배웠듯 thread::spawn 은 새 스레드를 만든 뒤, 클로저 안의 코드를 그 새 스레드에서 실행합니다. 이 코드를 실행하고 브라우저에서 /sleep 을 연 다음, 다른 브라우저 탭 두 개에서 / 를 열어 보면 / 요청이 /sleep 이 끝날 때까지 기다리지 않는다는 것을 확인할 수 있습니다. 하지만 앞에서 말했듯, 이렇게 하면 새 스레드를 제한 없이 만들게 되어 결국 시스템이 감당하지 못하게 됩니다.

17장에서 배운 async/await가 სწორედ 이런 상황에서 강력하다는 사실도 떠올릴 수 있을 것입니다. 스레드 풀을 만들면서 async 로 하면 무엇이 달라지고 무엇이 같을지도 함께 생각해 두세요.

유한한 수의 스레드 만들기

스레드 풀도 비슷하고 익숙한 방식으로 동작하길 바랍니다. 그래야 스레드에서 스레드 풀로 바꿔도 우리 API를 사용하는 코드가 크게 바뀌지 않습니다. 목록 21-12는 thread::spawn 대신 사용하고 싶은 ThreadPool 구조체의 이상적인 인터페이스를 보여 줍니다.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-12: 우리가 원하는 ThreadPool 인터페이스

ThreadPool::new 를 사용해 새 스레드 풀을 만듭니다. 이 예제에서는 스레드 수를 네 개로 설정합니다. 그런 다음 for 루프 안에서 pool.execute 는 각 스트림마다 풀에서 실행해야 할 클로저를 받는다는 점에서 thread::spawn 과 비슷한 인터페이스를 가집니다. 이제 pool.execute 가 그 클로저를 받아 풀 안의 스레드 하나에 전달해 실행하게 구현해야 합니다. 아직은 이 코드가 컴파일되지 않지만, 컴파일러가 고치는 방향을 안내해 주도록 일단 시도해 보겠습니다.

컴파일러 주도 개발로 ThreadPool 만들기

목록 21-12의 변경을 src/main.rs 에 반영한 다음, cargo check 의 컴파일러 오류를 바탕으로 개발을 진행해 봅시다. 우리가 처음 받는 오류는 다음과 같습니다.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

좋습니다. 이 오류는 ThreadPool 타입 또는 모듈이 필요하다고 말해 줍니다. 그래서 이제 그것을 만들겠습니다. ThreadPool 구현은 웹 서버가 무슨 작업을 하는지와는 독립적일 것입니다. 따라서 hello 크레이트를 바이너리 크레이트에서 라이브러리 크레이트로 바꾸어 ThreadPool 구현을 담아 보겠습니다. 라이브러리 크레이트로 바꾸면, 웹 요청 처리뿐 아니라 스레드 풀을 이용하고 싶은 다른 작업에도 이 별도 라이브러리를 사용할 수 있습니다.

우선 src/lib.rs 파일을 만들고, 지금 당장 가질 수 있는 가장 단순한 ThreadPool 구조체 정의를 다음과 같이 넣습니다.

Filename: src/lib.rs
pub struct ThreadPool;

그다음 main.rs 파일을 수정해 라이브러리 크레이트의 ThreadPool 을 스코프로 가져옵니다. src/main.rs 맨 위에 다음 코드를 추가하세요.

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

이 코드는 아직 동작하지 않지만, 다음에 해결해야 할 오류를 보려면 다시 확인해 봅시다.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

이 오류는 다음 단계로 ThreadPoolnew 라는 연관 함수를 만들어야 한다는 뜻입니다. 또한 new 는 인수로 4 를 받을 수 있는 매개변수 하나를 가져야 하고, ThreadPool 인스턴스를 반환해야 한다는 것도 알 수 있습니다. 그런 성질을 만족하는 가장 단순한 new 함수를 구현해 봅시다.

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

size 매개변수 타입으로 usize 를 고른 이유는, 음수 개수의 스레드는 말이 되지 않기 때문입니다. 또한 이 4 를 스레드 컬렉션의 원소 개수로 쓸 것이라는 점도 알고 있습니다. 이는 3장의 “정수 타입” 절에서 논의했듯 usize 가 적합한 용도입니다.

코드를 다시 확인해 봅시다.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

이제는 ThreadPoolexecute 메서드가 없기 때문에 오류가 납니다. “유한한 수의 스레드 만들기” 절에서 스레드 풀이 thread::spawn 과 비슷한 인터페이스를 가져야 한다고 정했던 것을 기억하세요. 따라서 execute 함수도 전달받은 클로저를 받아, 풀 안의 놀고 있는 스레드에게 넘겨 실행하게 구현할 것입니다.

ThreadPoolexecute 메서드는 클로저를 매개변수로 받도록 정의할 것입니다. 13장의 “캡처된 값을 클로저 밖으로 이동시키기” 절에서 보았듯, 클로저를 매개변수로 받을 때는 Fn, FnMut, FnOnce 라는 세 가지 트레이트 중 하나를 사용할 수 있습니다. 여기서는 어떤 종류의 클로저를 써야 할지 결정해야 합니다. 결국 표준 라이브러리 thread::spawn 구현과 비슷한 일을 하게 될 것이므로, thread::spawn 시그니처가 매개변수에 어떤 제약을 두는지 살펴볼 수 있습니다. 문서에는 다음과 같이 나와 있습니다.

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

여기서 우리가 관심 있는 것은 F 타입 매개변수입니다. T 타입 매개변수는 반환값과 관련 있으므로 지금은 중요하지 않습니다. spawnF 의 트레이트 경계로 FnOnce 를 사용한다는 것을 볼 수 있습니다. 이것이 우리에게도 아마 적절할 것입니다. 결국 execute 가 받은 인수를 spawn 에 넘길 것이기 때문입니다. 또한 요청을 처리할 스레드는 그 요청의 클로저를 딱 한 번만 실행하므로, FnOnceOnce 와도 잘 맞습니다.

F 타입 매개변수에는 Send 트레이트 경계와 'static 라이프타임 경계도 붙어 있습니다. 이것도 우리 상황에 유용합니다. 클로저를 한 스레드에서 다른 스레드로 옮기려면 Send 가 필요하고, 스레드가 얼마나 오래 실행될지 모르므로 'static 이 필요합니다. 이제 이런 제약을 가진 제네릭 타입 F 를 받는 ThreadPoolexecute 메서드를 만들어 봅시다.

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

여기서도 FnOnce 뒤에 () 를 쓰는 이유는, 이 FnOnce 가 매개변수를 받지 않고 단위 타입 () 를 반환하는 클로저를 뜻하기 때문입니다. 함수 정의와 마찬가지로 반환 타입은 시그니처에서 생략할 수 있지만, 매개변수가 없더라도 괄호는 여전히 필요합니다.

다시 말해 이것은 execute 메서드의 가장 단순한 구현입니다. 아무 일도 하지 않지만, 지금은 일단 코드를 컴파일시키는 것이 목적입니다. 다시 확인해 봅시다.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

컴파일됩니다! 하지만 cargo run 을 실행하고 브라우저에서 요청을 보내 보면, 장 초반에 봤던 것과 같은 오류가 브라우저에 나타납니다. 우리의 라이브러리가 아직 execute 에 전달된 클로저를 실제로 호출하지는 않기 때문입니다.

Note: 하스켈이나 러스트처럼 컴파일러가 엄격한 언어에 대해 “코드가 컴파일되면 동작한다”는 말을 들을 수 있습니다. 하지만 이 말은 언제나 참은 아닙니다. 지금 프로젝트는 컴파일되지만 아무 일도 하지 않습니다! 실제 완성형 프로젝트를 만들고 있었다면, 지금쯤 코드를 컴파일할 뿐 아니라 원하는 동작도 하는지 확인하는 단위 테스트를 쓰기 시작해야 할 좋은 시점입니다.

생각해 봅시다. 여기서 클로저 대신 future 를 실행하려 했다면 무엇이 달라졌을까요?

new 에서 스레드 수 검증하기

아직 newexecute 의 매개변수로 아무것도 하고 있지 않습니다. 이제 우리가 원하는 동작을 하도록 이 함수들의 본문을 구현해 봅시다. 먼저 new 를 생각해 보면, 앞에서는 음수 개수의 스레드 풀이 말이 되지 않기 때문에 size 에 부호 없는 타입을 선택했습니다. 하지만 스레드가 0개인 풀도 역시 말이 되지 않는데, 0은 usize 로서는 완전히 유효한 값입니다. 그래서 ThreadPool 인스턴스를 반환하기 전에 size 가 0보다 큰지 확인하는 코드를 추가하고, 0이 들어오면 assert! 매크로를 사용해 프로그램이 패닉하게 만들겠습니다. 목록 21-13이 그 예입니다.

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-13: size 가 0일 때 패닉하도록 ThreadPool::new 구현하기

ThreadPool 에 대한 문서 주석도 몇 개 추가했습니다. 14장에서 이야기했던 것처럼, 함수가 어떤 상황에서 패닉할 수 있는지 명시하는 절을 넣어 좋은 문서화 관행을 따르고 있다는 점에 주목하세요. cargo doc --open 을 실행한 뒤 ThreadPool 구조체를 클릭해서 new 에 대해 생성된 문서가 어떻게 보이는지 확인해 보세요.

여기처럼 assert! 매크로를 추가하는 대신, newbuild 로 바꾸고 12-9의 I/O 프로젝트에서 Config::build 에 했던 것처럼 Result 를 반환할 수도 있습니다. 하지만 이 경우에는 스레드가 전혀 없는 스레드 풀을 만들려는 시도를 복구 불가능한 오류로 보기로 했습니다. 여력이 있다면 다음 시그니처를 가진 build 함수를 직접 써 보며 new 함수와 비교해 보세요.

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

스레드를 저장할 공간 만들기

이제 풀에 저장할 스레드 수가 유효한지 확인할 수 있게 되었으니, 구조체를 반환하기 전에 그 스레드들을 만들어 ThreadPool 구조체 안에 저장할 수 있습니다. 그런데 스레드를 “저장한다”는 것은 정확히 무엇일까요? thread::spawn 의 시그니처를 다시 봅시다.

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

spawn 함수는 JoinHandle<T> 를 반환합니다. 여기서 T 는 클로저의 반환 타입입니다. 우리도 JoinHandle 을 써 보고 어떤 일이 일어나는지 보겠습니다. 이 경우 스레드 풀에 전달하는 클로저는 연결을 처리하고 아무것도 반환하지 않으므로 T 는 단위 타입 () 가 됩니다.

목록 21-14의 코드는 컴파일되지만, 아직 실제 스레드는 만들지 않습니다. ThreadPool 정의를 thread::JoinHandle<()> 인스턴스들의 벡터를 들고 있게 바꾸고, 벡터를 size 용량으로 초기화했으며, 스레드를 만드는 코드를 실행할 for 루프를 마련하고, 그것들을 담은 ThreadPool 인스턴스를 반환합니다.

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-14: 스레드를 담을 ThreadPool 벡터 만들기

라이브러리 크레이트 안에서 std::thread 를 스코프로 가져온 이유는, ThreadPool 안의 벡터 원소 타입으로 thread::JoinHandle 을 사용하기 때문입니다.

유효한 크기를 받으면 ThreadPoolsize 개의 항목을 담을 수 있는 새 벡터를 만듭니다. with_capacity 함수는 Vec::new 와 같은 일을 하지만 중요한 차이가 하나 있습니다. 벡터 안의 공간을 미리 할당한다는 점입니다. 우리는 벡터에 size 개의 원소를 저장할 것을 알고 있으므로, 원소가 들어올 때마다 크기를 늘리는 Vec::new 보다 미리 할당하는 편이 약간 더 효율적입니다.

이제 cargo check 를 다시 실행하면 성공할 것입니다.

Sending Code from the ThreadPool to a Thread

목록 21-14의 for 루프에는 스레드 생성과 관련된 주석을 남겨 두었습니다. 이제 실제로 스레드를 어떻게 만드는지 살펴보겠습니다. 표준 라이브러리는 스레드를 만드는 방법으로 thread::spawn 을 제공하며, thread::spawn 은 스레드가 생성되자마자 실행할 코드를 받기를 기대합니다. 하지만 우리 경우에는 스레드를 먼저 만들고, 나중에 보낼 코드를 기다리게 하고 싶습니다. 표준 라이브러리의 스레드 구현은 그런 기능을 제공하지 않기 때문에, 우리가 직접 구현해야 합니다.

이 동작을 구현하기 위해 ThreadPool 과 스레드 사이에 새 데이터 구조를 하나 두고, 그 구조가 이 새로운 동작을 관리하게 하겠습니다. 이 데이터 구조를 Worker 라고 부르겠습니다. 풀 구현에서 흔히 쓰는 용어입니다. Worker 는 실행해야 할 코드를 가져와 자신의 스레드에서 실행합니다.

식당 주방에서 일하는 사람들을 떠올려 보세요. 일꾼들은 손님 주문이 들어올 때까지 기다렸다가, 주문을 받아 처리하는 책임을 집니다.

이제 스레드 풀 안에 JoinHandle<()> 벡터를 저장하는 대신 Worker 구조체 인스턴스들을 저장하겠습니다. 각 WorkerJoinHandle<()> 인스턴스 하나를 보관합니다. 그리고 Worker 에 메서드를 구현해서, 실행할 코드 클로저를 받아 이미 돌고 있는 스레드로 보내 실행하게 할 것입니다. 또한 로깅이나 디버깅 시 풀 안의 서로 다른 Worker 인스턴스를 구분할 수 있도록 각 Workerid 를 부여하겠습니다.

ThreadPool 을 만들 때 새로 일어날 과정은 다음과 같습니다. 먼저 이런 구조로 Worker 를 설정하고, 그 다음에 클로저를 스레드로 보내는 코드를 구현하겠습니다.

  1. Define a Worker struct that holds an id and a JoinHandle<()>.
  2. Change ThreadPool to hold a vector of Worker instances.
  3. Define a Worker::new function that takes an id number and returns a Worker instance that holds the id and a thread spawned with an empty closure.
  4. In ThreadPool::new, use the for loop counter to generate an id, create a new Worker with that id, and store the Worker in the vector.

도전해 보고 싶다면 목록 21-15의 코드를 보기 전에 이 변경을 직접 구현해 보세요.

준비됐나요? 앞의 수정 사항을 반영하는 한 가지 방법을 목록 21-15에 보여 줍니다.

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-15: 스레드를 직접 들고 있는 대신 Worker 인스턴스를 들도록 ThreadPool 수정하기

ThreadPool 의 필드 이름을 threads 에서 workers 로 바꿨습니다. 이제는 JoinHandle<()> 인스턴스가 아니라 Worker 인스턴스를 들고 있기 때문입니다. for 루프의 카운터를 Worker::new 의 인수로 사용하고, 새로 만든 Workerworkers 라는 벡터에 저장합니다.

외부 코드(예를 들어 src/main.rs 의 서버)는 ThreadPool 안에서 Worker 구조체를 사용한다는 구현 세부사항을 알 필요가 없습니다. 그래서 Worker 구조체와 그 new 함수는 비공개로 둡니다. Worker::new 함수는 우리가 넘긴 id 를 사용하고, 빈 클로저로 새 스레드를 생성해 얻은 JoinHandle<()> 인스턴스를 저장합니다.

Note: 운영체제가 시스템 자원이 부족해 스레드를 만들 수 없으면 thread::spawn 은 패닉합니다. 그러면 일부 스레드 생성은 성공했더라도 서버 전체가 패닉하게 됩니다. 단순한 예제로서는 이 동작도 괜찮지만, 실제 서비스용 스레드 풀 구현이라면 아마 std::thread::BuilderResult 를 반환하는 spawn 메서드를 쓰고 싶을 것입니다.

이 코드는 컴파일되고, ThreadPool::new 에 인수로 지정한 수만큼의 Worker 인스턴스를 저장합니다. 하지만 여전히 execute 에서 받은 클로저를 처리하지는 않습니다. 이제 그 방법을 살펴보겠습니다.

채널을 통해 스레드로 요청 보내기

다음으로 해결할 문제는 thread::spawn 에 넘긴 클로저들이 지금은 아무 일도 하지 않는다는 점입니다. 현재는 실행하고 싶은 클로저를 execute 메서드에서 받고 있지만, ThreadPool 생성 중 각 Worker 를 만들 때 thread::spawn 에도 실행할 클로저를 줘야 합니다.

우리는 방금 만든 Worker 구조체들이 ThreadPool 이 들고 있는 큐에서 실행할 코드를 가져와, 자신의 스레드로 보내 실행하게 만들고 싶습니다.

16장에서 배운 채널은 두 스레드 사이를 통신하는 단순한 방법으로, 이 용도에 딱 맞습니다. 채널을 작업 큐처럼 사용하고, executeThreadPool 에서 Worker 인스턴스로 작업을 보내며, Worker 는 그 작업을 자신의 스레드로 넘기게 할 것입니다. 계획은 다음과 같습니다.

  1. The ThreadPool will create a channel and hold on to the sender.
  2. Each Worker will hold on to the receiver.
  3. We’ll create a new Job struct that will hold the closures we want to send down the channel.
  4. The execute method will send the job it wants to execute through the sender.
  5. In its thread, the Worker will loop over its receiver and execute the closures of any jobs it receives.

먼저 목록 21-16처럼 ThreadPool::new 안에서 채널을 만들고, ThreadPool 인스턴스가 송신자를 들고 있도록 합시다. Job 구조체는 지금은 아무것도 담지 않지만, 앞으로 채널로 보낼 항목의 타입이 됩니다.

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-16: Job 인스턴스를 전송하는 채널의 송신자를 저장하도록 ThreadPool 수정하기

ThreadPool::new 안에서 새 채널을 만들고, 풀이 송신자를 들도록 했습니다. 이 코드는 성공적으로 컴파일됩니다.

이제 스레드 풀이 채널을 만들 때, 각 Worker 에 채널의 수신자를 넘겨 보겠습니다. 우리는 Worker 인스턴스가 생성하는 스레드 안에서 수신자를 사용하고 싶으므로, 클로저 안에서 receiver 매개변수를 참조할 것입니다. 목록 21-17의 코드는 아직은 완전히 컴파일되지는 않습니다.

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-17: 각 Worker 에 수신자 전달하기

작고 단순한 변경 몇 개를 했습니다. Worker::new 에 수신자를 넘기고, 그다음 클로저 안에서 그 수신자를 사용합니다.

이 코드를 확인해 보면 다음 오류가 납니다.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

이 코드는 receiver 를 여러 Worker 인스턴스에 전달하려 하고 있습니다. 16장에서 기억하겠지만, 러스트가 제공하는 채널 구현은 다중 생산자, 단일 소비자 입니다. 즉, 이 코드를 고치기 위해 채널의 소비 쪽 끝을 단순히 복제할 수는 없습니다. 또한 우리는 여러 소비자에게 메시지를 여러 번 보내고 싶은 것도 아닙니다. 여러 Worker 인스턴스가 하나의 메시지 목록을 공유하되, 각 메시지는 한 번만 처리되길 원합니다.

추가로, 채널 큐에서 작업을 꺼내는 과정은 receiver 를 변경하는 일이므로 스레드들은 receiver 를 안전하게 공유하고 수정할 수 있는 방법이 필요합니다. 그렇지 않으면 데이터 경쟁이 생길 수 있습니다(16장에서 다뤘습니다).

16장에서 본 스레드 안전 스마트 포인터를 떠올려 보세요. 여러 스레드가 소유권을 공유하고 값을 수정할 수 있게 하려면 Arc<Mutex<T>> 를 사용해야 합니다. Arc 타입은 여러 Worker 인스턴스가 수신자를 소유하게 해주고, Mutex 는 한 번에 단 하나의 Worker 만 수신자에게서 작업을 가져가도록 보장합니다. 목록 21-18은 필요한 변경을 보여 줍니다.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-18: ArcMutex 를 사용해 Worker 인스턴스들이 수신자를 공유하게 만들기

ThreadPool::new 에서 수신자를 ArcMutex 안에 넣습니다. 새 Worker 를 만들 때마다 Arc 를 복제해서 참조 횟수를 늘리고, 그 결과 Worker 인스턴스들이 수신자의 소유권을 공유할 수 있게 됩니다.

이렇게 바꾸면 코드가 컴파일됩니다! 거의 다 왔습니다.

execute 메서드 구현하기

이제 ThreadPoolexecute 메서드를 드디어 구현해 봅시다. Job 도 구조체에서, execute 가 받는 클로저 타입을 담는 트레이트 객체용 타입 별칭으로 바꿀 것입니다. 20장의 “타입 동의어와 타입 별칭” 절에서 논의했듯, 타입 별칭은 긴 타입을 짧게 줄여 쓰기 쉽게 만들어 줍니다. 목록 21-19를 보세요.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-19: 각 클로저를 담는 Box 에 대한 Job 타입 별칭을 만들고, 그 작업을 채널로 보내기

execute 에서 받은 클로저로 새 Job 인스턴스를 만든 뒤, 그 작업을 채널의 송신 쪽으로 보냅니다. send 가 실패할 경우를 대비해 unwrap 을 호출하고 있습니다. 예를 들어 모든 스레드 실행을 멈춰서 수신 쪽이 새 메시지를 더 이상 받지 않는다면 이런 일이 일어날 수 있습니다. 지금은 스레드를 멈출 수 없기 때문에, 풀이 존재하는 한 스레드는 계속 실행됩니다. unwrap 을 쓰는 이유는 실패 사례가 발생하지 않는다는 것을 우리는 알지만 컴파일러는 모르기 때문입니다.

하지만 아직 끝난 것은 아닙니다! Worker 안에서 thread::spawn 에 전달하는 클로저는 여전히 채널의 수신 끝을 단지 참조만 하고 있습니다. 실제로는 그 클로저가 무한히 반복하면서 채널 수신 끝에 작업을 요청하고, 작업을 받으면 실행해야 합니다. 목록 21-20 처럼 Worker::new 를 바꿔 봅시다.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-20: Worker 인스턴스의 스레드에서 작업을 받아 실행하기

여기서는 먼저 receiver 에 대해 lock 을 호출해 뮤텍스를 획득하고, 오류가 있으면 패닉하도록 unwrap 을 호출합니다. 락 획득은 뮤텍스가 poisoned 상태라면 실패할 수 있는데, 이는 다른 스레드가 락을 잡은 채 풀지 못하고 패닉했을 때 발생할 수 있습니다. 이 상황에서는 unwrap 으로 현재 스레드도 패닉하게 두는 것이 올바른 선택입니다. 원한다면 이 unwrap 을 의미 있는 오류 메시지를 담은 expect 로 바꿔도 됩니다.

뮤텍스 락을 얻으면 recv 를 호출해 채널에서 Job 을 받습니다. 마지막 unwrap 도 여기서 발생할 수 있는 오류를 넘기기 위한 것인데, 이는 수신자가 종료되면 sendErr 를 반환하듯, 송신자를 들고 있던 스레드가 종료되면 일어날 수 있습니다.

recv 호출은 블로킹되므로 아직 작업이 없다면 현재 스레드는 작업이 들어올 때까지 기다립니다. Mutex<T> 덕분에 한 번에 하나의 Worker 스레드만 작업을 요청할 수 있습니다.

이제 스레드 풀이 실제로 동작합니다! cargo run 을 실행하고 요청을 몇 개 보내 보세요.

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

성공입니다! 이제 연결을 비동기적으로 실행하는 스레드 풀이 생겼습니다. 최대 네 개의 스레드만 생성되므로, 서버가 많은 요청을 받아도 시스템이 과부하에 빠지지 않습니다. /sleep 요청이 들어와도 다른 스레드가 나머지 요청을 처리할 수 있습니다.

Note: /sleep 을 여러 브라우저 창에서 동시에 열면 5초 간격으로 하나씩 로드될 수도 있습니다. 일부 브라우저는 캐싱 때문에 같은 요청을 여러 번 보낼 때 순차적으로 실행하기도 합니다. 이 제한은 우리의 웹 서버 때문이 아닙니다.

여기서 잠시 멈추고, 21-18, 21-19, 21-20의 코드가 해야 할 일을 클로저가 아니라 future 로 표현했다면 어떻게 달라졌을지 생각해 보기 좋습니다. 어떤 타입이 바뀔까요? 메서드 시그니처는 달라질까요, 아니면 그대로일까요? 코드의 어떤 부분은 그대로 남을까요?

17장과 19장에서 while let 루프를 배운 뒤라면, 왜 Worker 스레드 코드를 목록 21-21처럼 쓰지 않았는지 궁금할 수도 있습니다.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-21: while let 을 사용하는 Worker::new 의 대체 구현

이 코드는 컴파일되고 실행도 되지만, 우리가 원하는 스레딩 동작을 만들지는 못합니다. 느린 요청이 여전히 다른 요청들을 기다리게 만들기 때문입니다. 이유는 다소 미묘합니다. Mutex 구조체에는 공개된 unlock 메서드가 없습니다. 락의 소유권은 lock 메서드가 반환하는 LockResult<MutexGuard<T>> 안의 MutexGuard<T> 라이프타임에 기반하기 때문입니다. 그러면 컴파일 시점에 대여 검사기가 “뮤텍스로 보호되는 자원은 락을 쥐고 있지 않으면 접근할 수 없다”는 규칙을 강제할 수 있습니다. 하지만 우리가 MutexGuard<T> 의 라이프타임을 주의하지 않으면, 이 구현은 의도보다 오래 락을 잡고 있게 만들 수도 있습니다.

목록 21-20의 let job = receiver.lock().unwrap().recv().unwrap(); 코드는 동작합니다. let 을 사용할 때는 등호 오른쪽 표현식에서 쓰인 임시 값들이 let 문이 끝나는 즉시 drop 되기 때문입니다. 하지만 while let 은(if let, match 도 마찬가지로) 연관된 블록이 끝날 때까지 임시 값을 drop 하지 않습니다. 목록 21-21에서는 job() 호출이 끝날 때까지 락이 계속 유지되므로, 다른 Worker 인스턴스들이 작업을 받을 수 없게 됩니다.