우아한 종료와 정리
목록 21-20의 코드는 의도한 대로 스레드 풀을 사용해 요청에 비동기적으로 응답하고
있습니다. 다만 workers, id, thread 필드에 대해 직접 사용하지 않는다는 경고가
나오는데, 이는 우리가 아직 아무 정리 작업도 하지 않고 있다는 신호이기도 합니다.
그다지 우아하지 않은 ctrl-C 방식으로 메인 스레드를 멈추면,
다른 스레드들도 요청을 처리하는 도중이라 해도 즉시 함께 멈춥니다.
다음으로는 Drop 트레이트를 구현해서 풀 안의 각 스레드에 join 을 호출하고, 각
스레드가 작업 중인 요청을 마친 뒤 종료하게 만들겠습니다. 그다음에는 스레드들에게 새
요청 수락을 멈추고 종료하라고 알리는 방법도 구현하겠습니다. 이 코드를 실제로 보려면,
서버가 요청 두 개만 처리한 뒤 스레드 풀을 우아하게 종료하도록 바꿔 보겠습니다.
진행하면서 주목할 점이 하나 있습니다. 여기서 하는 일은 클로저 실행을 담당하는 코드에는 아무 영향도 주지 않습니다. 따라서 async 런타임용 스레드 풀을 쓰고 있었다 해도 이 부분은 모두 동일했을 것입니다.
ThreadPool 에 Drop 트레이트 구현하기
먼저 스레드 풀에 Drop 을 구현하는 것부터 시작합시다. 풀이 drop 될 때는 모든
스레드가 작업을 끝낼 수 있도록 join 해야 합니다. 목록 21-22는 Drop 구현의 첫
번째 시도를 보여 줍니다. 아직은 완전히 동작하지 않습니다.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
먼저 스레드 풀의 각 worker 를 순회합니다. self 가 가변 참조이고 worker 도
변경할 수 있어야 하므로 &mut 를 사용합니다. 각 worker 에 대해, 해당 Worker
인스턴스가 종료 중이라는 메시지를 출력한 다음 그 Worker 의 스레드에 join 을
호출합니다. join 호출이 실패하면 unwrap 으로 러스트가 패닉하게 만들어, 우아하지
않은 종료로 들어가게 합니다.
이 코드를 컴파일하면 다음 오류가 납니다.
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:1921:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
이 오류는 우리가 각 worker 에 대해 가변 대여만 갖고 있고, join 은 인수를
소유해야 하기 때문에 join 을 호출할 수 없다고 알려 줍니다. 이 문제를 해결하려면
thread 를 소유한 Worker 인스턴스에서 스레드를 꺼내어 join 이 그것을 소비할 수
있게 해야 합니다. 이를 해결하는 한 가지 방법은 목록 18-15에서 썼던 접근을 다시 쓰는
것입니다. 만약 Worker 가 Option<thread::JoinHandle<()>> 를 들고 있다면,
Option 의 take 메서드를 호출해 Some 안의 값을 꺼내고 그 자리에 None 을
남길 수 있습니다. 즉, 실행 중인 Worker 는 thread 에 Some 을 갖고 있다가,
정리하고 싶을 때 Some 을 None 으로 바꿔 더 이상 실행할 스레드가 없게 만드는
방식입니다.
하지만 이 상황은 Worker 가 drop 될 때에만 필요합니다. 그 대가로, worker.thread
에 접근하는 곳마다 Option<thread::JoinHandle<()>> 를 다뤄야 합니다. 관용적인
러스트는 Option 을 자주 쓰지만, 이렇게 우회책으로 “항상 값이 있다고 알고 있는 것”을
Option 으로 감싸고 있다면 코드를 더 깔끔하고 오류가 덜 나게 만드는 다른 접근을
찾아보는 편이 좋습니다.
이 경우에는 더 나은 대안이 있습니다. Vec::drain 메서드입니다. 이는 벡터에서 제거할
항목 범위를 나타내는 매개변수를 받고, 제거된 항목들의 반복자를 반환합니다. ..
범위 문법을 넘기면 벡터 안의 모든 값을 제거합니다.
따라서 ThreadPool 의 drop 구현을 다음처럼 바꾸어야 합니다.
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
}
이렇게 하면 컴파일 오류가 해결되고, 다른 코드 변경도 필요 없습니다. 다만 drop 은
패닉 중에도 호출될 수 있으므로, 여기의 unwrap 이 다시 패닉해서 이중 패닉을 만들 수
있고, 그러면 프로그램이 즉시 크래시하면서 진행 중인 정리가 모두 중단됩니다. 예제
프로그램으로는 괜찮지만, 실제 서비스 코드에서는 권장되지 않습니다.
스레드에게 더 이상 작업을 기다리지 말라고 알리기
지금까지의 변경으로 코드는 경고 없이 컴파일됩니다. 하지만 안 좋은 소식은, 아직 우리가
원하는 방식으로 동작하지는 않는다는 점입니다. 핵심은 Worker 인스턴스의 스레드가
실행하는 클로저 로직에 있습니다. 지금은 join 을 호출하지만, 스레드들은 작업을
찾으려고 영원히 loop 를 돌기 때문에 이걸로는 종료되지 않습니다. 현재의 drop
구현으로 ThreadPool 을 drop 하려고 하면, 메인 스레드는 첫 번째 스레드가 끝나기를
영원히 기다리며 막히게 됩니다.
이 문제를 해결하려면 ThreadPool 의 drop 구현을 먼저 바꾸고, 그 다음 Worker
루프도 바꿔야 합니다.
먼저 ThreadPool 의 drop 구현에서 스레드들이 끝나기를 기다리기 전에 sender 를
명시적으로 drop 하도록 바꾸겠습니다. 목록 21-23은 sender 를 명시적으로 drop 하도록
바꾼 ThreadPool 코드를 보여 줍니다. 스레드와 달리 이 경우에는 Option::take 로
sender 를 ThreadPool 밖으로 꺼내려면 정말로 Option 을 써야 합니다.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Worker 스레드들을 join 하기 전에 sender 를 명시적으로 drop 하기sender 를 drop 하면 채널이 닫히고, 더 이상 메시지가 오지 않을 것이라는 뜻이 됩니다.
그러면 Worker 인스턴스가 무한 루프 안에서 호출하던 모든 recv 가 오류를 반환하게
됩니다. 목록 21-24에서는 그런 경우 Worker 루프가 우아하게 빠져나오도록 바꾸는데,
그 결과 ThreadPool 의 drop 구현이 join 을 호출할 때 스레드들이 끝날 수 있게
됩니다.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker { id, thread }
}
}
recv 가 오류를 반환하면 루프를 명시적으로 빠져나오기이 코드가 실제로 어떻게 동작하는지 보려면, 목록 21-25처럼 main 을 바꾸어 요청 두
개만 받은 뒤 서버를 우아하게 종료해 봅시다.
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
실제 웹 서버라면 요청 두 개만 처리하고 종료되길 원하지는 않을 것입니다. 이 코드는 우아한 종료와 정리 동작이 실제로 정상적으로 작동한다는 점만 보여 줍니다.
take 메서드는 Iterator 트레이트에 정의되어 있으며, 반복을 최대 처음 두 항목으로
제한합니다. main 끝에서 ThreadPool 은 스코프를 벗어나고, drop 구현이 실행됩니다.
cargo run 으로 서버를 시작한 뒤 요청을 세 개 보내 보세요. 세 번째 요청은 오류가
나야 하고, 터미널에는 다음과 비슷한 출력이 보여야 합니다.
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
Worker ID와 출력 메시지 순서는 다르게 나타날 수 있습니다. 하지만 메시지를 보면
코드가 어떻게 동작하는지 알 수 있습니다. Worker 0과 3이 처음 두 요청을
받았습니다. 서버는 두 번째 연결 이후 더 이상 연결을 받지 않았고, Worker 3 이
자기 작업을 시작하기도 전에 ThreadPool 의 Drop 구현이 실행되기 시작합니다.
sender 를 drop 하면 모든 Worker 인스턴스와의 연결이 끊기고, 종료하라는 신호가
전달됩니다. 각 Worker 는 연결이 끊겼다는 메시지를 출력하고, 그 뒤 스레드 풀이
각 Worker 스레드가 끝날 때까지 join 으로 기다립니다.
이 실행에서 흥미로운 점이 하나 있습니다. ThreadPool 이 sender 를 drop 한 뒤,
아직 어떤 Worker 도 오류를 받기 전에 우리는 Worker 0 을 join 하려 했습니다.
Worker 0 은 아직 recv 로부터 오류를 받지 못했기 때문에, 메인 스레드는 Worker 0
이 끝나기를 기다리며 막혔습니다. 그 사이 Worker 3 은 작업 하나를 받았고, 그 후
모든 스레드가 오류를 받았습니다. Worker 0 이 작업을 마치자, 메인 스레드는 बाकी
Worker 인스턴스들이 끝나기를 기다렸고, 그 시점에는 모두 루프를 빠져나와 종료한
상태였습니다.
축하합니다! 이제 프로젝트를 완성했습니다. 스레드 풀을 사용해 비동기적으로 응답하는 기본 웹 서버가 생겼습니다. 또한 서버를 우아하게 종료해, 풀 안의 모든 스레드를 정리할 수도 있게 되었습니다.
참고용으로 전체 코드를 다시 적어 두겠습니다.
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
여기서 더 나아갈 수도 있습니다! 이 프로젝트를 계속 확장하고 싶다면 다음과 같은 아이디어를 시도해 보세요.
ThreadPool과 그 공개 메서드에 문서를 더 추가하기- 라이브러리 기능을 테스트하는 코드 추가하기
unwrap호출을 더 튼튼한 오류 처리로 바꾸기- 웹 요청 처리 외의 다른 작업에도
ThreadPool사용하기 - crates.io 에서 스레드 풀 크레이트를 찾아, 그 크레이트로 비슷한 웹 서버를 구현해 보기. 그리고 그 API와 견고함을 우리가 구현한 스레드 풀과 비교하기
마무리
잘 해냈습니다! 드디어 책의 끝에 도달했습니다. 러스트 여행을 함께해 주셔서 감사합니다. 이제 여러분은 자신만의 러스트 프로젝트를 구현하고, 다른 사람들의 프로젝트를 돕기 시작할 준비가 되었습니다. 러스트 여정에서 마주칠 어떤 어려움이든 기꺼이 도와주려는 친근한 러스타시안 공동체가 있다는 점도 기억해 두세요.