Reactive Streams 란?
reactive-streams.org 에서는 다음과 같이 정의하고 있다.
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
대충 해석하면 "논블로킹(Non-blocking) 백프레셔(back pressure)를 이용한 비동기 데이터 처리의 표준이다"
Reactive Streams API
Reactive Streams 는 뭔가 복잡할것 같지만, 실제로 아주 간단한 API 들의 조합으로 구성되어 있다.
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
reactive streams 동작방식
중요하게 봐야하는 점은 Publisher 가 Subscriber 에서 데이터를 Push 하는 방식이 아니라,
Subscriber 가 Publisher 에게 데이터를 요청하는 Pull 방식이라는 점이다. 이런 방식을 백프레셔라고 한다.
백프레셔란?
Publisher 에서 발행하고, Subscriber에서 구독할 때, Publisher 에서 데이터를 Subscriber 로 Push 하는 방식이 아니라, Pull 방식으로 Subscriber 가 Publisher 로 처리할 수 있는 양의 크기만큼 데이터를 요청 함으로써 Subscriber의 장애를 방지하기 위함이다.
즉, 다이나믹 풀 방식의 데이터 요청을 통해서 Subscriber가 수용할 수 있는 만큼 데이터를 요청하는 방식이다.
reactive streams 을 이해할 겸 구현해보자
Reactive Streams 을 직접 구현해보면 좀더 이해하는데 도움이 될것이다.
public class ReactiveStreamTest {
public static class PublisherImpl implements Publisher<Integer> {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
Queue<Integer> queue = new LinkedList<>();
IntStream.range(0, 10).forEach(queue::add);
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
System.out.println("request:" + n);
for (int i=0; i<=n; i++){
if(queue.isEmpty()) {
subscriber.onComplete();
return;
}
subscriber.onNext(queue.poll());
}
}
@Override
public void cancel() {
System.out.println("publish cancel");
}
});
}
}
public static class SubscriberImpl implements Subscriber<Integer> {
private Subscription subscription;
private long requestSize = 2;
private List<Integer> buffer = new ArrayList<>();
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(requestSize);
}
@Override
public void onNext(Integer integer) {
System.out.println(" onNext - " + integer);
buffer.add(integer);
if(buffer.size() == requestSize) {
buffer.clear(); //flush
subscription.request(requestSize);
}
}
@Override
public void onError(Throwable t) {
System.out.println("error:" + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("subscribe complete");
}
}
public static void main(String[] args) {
Publisher<Integer> publisher = new PublisherImpl();
publisher.subscribe(new SubscriberImpl());
}
}
실행 결과
request:2
onNext - 0
onNext - 1
request:2
onNext - 2
onNext - 3
request:2
onNext - 4
onNext - 5
request:2
onNext - 6
onNext - 7
request:2
onNext - 8
onNext - 9
request:2
subscribe complete
반응형
'개발관련' 카테고리의 다른 글
SpringBoot + Prometheus + Grafana 모니터링 (0) | 2020.12.31 |
---|---|
[Mysql] INNER JOIN + ORDER BY 에서 filesort 제거해보기 (0) | 2020.12.11 |
commit 메시지에 자동으로 branch명 추가해보기 (0) | 2020.11.04 |
Webflux Functional Endpoints 시작하기 (0) | 2020.09.30 |
JPA Fetch Join과 페이징 문제 (0) | 2020.09.22 |