스트림: 순차적으로 이어지는 퓨처
이 장 앞쪽 “메시지 전달” 절에서 async 채널의
수신자를 사용했던 것을 떠올려 보세요. async recv 메서드는 시간에 따라 아이템의
시퀀스를 만들어 냅니다. 이것은 스트림(stream) 이라고 불리는 훨씬 더 일반적인
패턴의 한 예입니다. 큐에 항목이 차례로 들어오는 것, 한 번에 메모리에 다 올리기엔
너무 큰 데이터를 파일시스템에서 조금씩 읽어 오는 것, 네트워크를 통해 시간이 지나며
데이터가 도착하는 것 등, 많은 개념이 자연스럽게 스트림으로 표현됩니다. 스트림은
퓨처이기 때문에, 다른 어떤 퓨처와도 함께 사용할 수 있고 흥미로운 방식으로 조합할
수도 있습니다. 예를 들어 너무 많은 네트워크 호출을 피하려고 이벤트를 묶거나, 오래
걸리는 작업 시퀀스에 타임아웃을 걸거나, 불필요한 작업을 줄이기 위해 사용자 인터페이스
이벤트를 조절(throttle)할 수도 있습니다.
13장의 “Iterator 트레이트와 next 메서드”
절에서 우리는 이미 아이템의 시퀀스를 보았습니다. 그러나 반복자와 async 채널 수신자
사이에는 두 가지 차이가 있습니다. 첫 번째 차이는 시간 입니다. 반복자는 동기적이고,
채널 수신자는 비동기적입니다. 두 번째 차이는 API입니다. Iterator 를 직접 다룰 때는
동기식 next 메서드를 호출합니다. 하지만 trpl::Receiver 스트림에서는 비동기
recv 메서드를 호출했습니다. 이 차이를 빼면 API 느낌은 매우 비슷한데, 이는 우연이
아닙니다. 스트림은 반복자의 비동기 버전 같은 존재이기 때문입니다. 다만
trpl::Receiver 는 구체적으로 메시지를 받는 역할을 하고, 일반적인 스트림 API는
그보다 더 넓은 개념입니다. 즉, Iterator 가 다음 항목을 제공하는 방식과 비슷하게,
하지만 비동기적으로 다음 항목을 제공합니다.
러스트에서 반복자와 스트림이 이렇게 비슷하다는 사실은, 사실 어떤 반복자든 스트림으로
바꿀 수 있다는 뜻이기도 합니다. 반복자와 마찬가지로, 스트림도 next 메서드를 호출하고
그 출력을 await 하면서 다룰 수 있습니다. 목록 17-21을 보세요. 이 코드는 아직
컴파일되지 않습니다.
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
우리는 숫자 배열 하나로 시작하고, 이를 반복자로 바꾼 뒤 map 을 호출해 모든 값에
2를 곱합니다. 그 다음 trpl::stream_from_iter 함수를 사용해 이 반복자를 스트림으로
변환합니다. 그리고 while let 루프로, 도착하는 스트림의 각 항목을 순서대로 처리합니다.
하지만 이 코드를 실제로 실행하려 하면, 컴파일되지 않고 next 메서드가 없다는 오류를
보게 됩니다.
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
이 오류가 설명하듯, 문제는 올바른 트레이트를 스코프로 가져오지 않았다는 데 있습니다.
지금까지 이야기한 내용을 떠올리면 그 트레이트가 Stream 이라고 생각할 법하지만,
실제 필요한 것은 StreamExt 입니다. Ext 는 extension 의 줄임말로, 러스트
커뮤니티에서 어떤 트레이트를 다른 트레이트로 확장할 때 자주 붙이는 이름입니다.
Stream 트레이트는 사실상 Iterator 와 Future 트레이트를 결합한 저수준
인터페이스를 정의합니다. StreamExt 는 그 위에 더 고수준 API를 얹어 제공하며,
next 메서드와 그 밖의 반복자 스타일 유틸리티 메서드들이 여기에 들어 있습니다.
Stream 과 StreamExt 는 아직 러스트 표준 라이브러리 일부는 아니지만, 생태계의
대부분의 크레이트가 비슷한 정의를 사용합니다.
이 컴파일 오류를 고치려면 목록 17-22처럼 trpl::StreamExt 에 대한 use 문을
추가하면 됩니다.
extern crate trpl; // required for mdbook test
use trpl::StreamExt;
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// --snip--
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
이제 모든 조각이 맞춰졌으니, 이 코드는 우리가 원한 방식으로 동작합니다! 더 나아가,
이제 StreamExt 가 스코프에 있으므로, 반복자에서 했던 것처럼 그 안의 모든
유틸리티 메서드를 함께 사용할 수도 있습니다.