스프링의 웹플럭스에는 Flux와 Mono 라는 Reactive Streams Publisher 의 구현체가 있다.
Flux 와 Mono 는 두 종류의 발행 방식이 있는데, Cold 과 Hot 방식이 존재한다.
Cold sequences
subscribe 할때 마다, 매번 새로운 데이터를 발행하고 동작하는 방식이다. 그리고 subscribe 를 하기 전에는 동작하지 않는다. Webflux 에서는 일반적으로 Cold 방식으로 동작한다.
먼저 Mono 로 예를 들어 보자.
Mono<String> body = httpGet();
body.subscribe(s -> log.info("Subscriber 1 : " + s.length()));
body.subscribe(s -> log.info("Subscriber 2 : " + s.length()));
public Mono<String> httpGet() {
return WebClient
.create()
.get()
.uri("http://google.com")
.retrieve()
.bodyToMono(String.class)
.log();
}
output :
[reactor-http-nio-1] INFO reactor.Mono.FlatMap.1 - | onNext(생략..)
[reactor-http-nio-2] INFO reactor.Mono.FlatMap.1 - | onNext(생략..)
[reactor-http-nio-2] INFO Test - Subscriber 2 : 219
[reactor-http-nio-1] INFO Test - Subscriber 1 : 219
[reactor-http-nio-2] INFO reactor.Mono.FlatMap.1 - | onComplete()
[reactor-http-nio-1] INFO reactor.Mono.FlatMap.1 - | onComplete()
onNext와 onComplete 가 2번씩 호출된 것을 알수 있고 Subscribe 결과도 2번이 찍혔다.
즉, Cold 방식대로 subscribe 할때 마다 매번 WebClient 가 동작을 한 것을 알수 있다.
만약, 위 코드에서 body.subscribe 메소드를 호출하지 않는다면 어떻게 될까? WebClient 가 동작 할 것인가?
Cold 는 subscribe 를 하지 않는다면 데이터를 발행과 동작을 하지 않기 때문에 아무런 동작을 하지 않는다.
다음은 Flux 로 예를 들어 보자.
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
source.subscribe(System.out::println, e -> {}, () -> {});
source.subscribe(System.out::println, e -> {}, () -> {});
output:
subscribed to source
1
2
3
subscribed to source
1
2
3
Cold 방식대로 subscribe 할때마다 매번 독립적인 데이터를 발행하는 동작을 확인 할 수 있다.
Hot sequences
subscribe 할때 마다, 새로운 데이터를 발행이나 동작하지 않는 방식이다. subscribe를 매번 할때마다 미리 생성해 둔 데이터로 동작을 한다. 그리고 subscribe 와 무관하게 즉시 데이터 발행과 동작도 가능한 방식이다.
위의 Mono 의 예를 가지고 Cold 에서 Hot 으로 변경해보자.
Mono 의 cache() 메소드를 호출해 주면 된다. (참 간단하다.)
Mono<String> body = httpGet().cache();
body.subscribe(s -> log.info("Subscriber 1 : " + s.length()));
body.subscribe(s -> log.info("Subscriber 2 : " + s.length()));
public Mono<String> httpGet() {
return WebClient
.create()
.get()
.uri("http://google.com")
.retrieve()
.bodyToMono(String.class)
.log();
}
output
[reactor-http-nio-1] INFO reactor.Mono.FlatMap.1 - | onNext(생략..)
[reactor-http-nio-1] INFO Test - Subscriber 2 : 219
[reactor-http-nio-1] INFO Test - Subscriber 1 : 219
[reactor-http-nio-1] INFO reactor.Mono.FlatMap.1 - | onComplete()
onNext, onComplate 가 1번만 찍혀 있고, reactor-http-nio 스레드도 1개만 사용하고 있다.
즉, Hot 방식대로 subscribe 할때 마다 매번 독립적인 데이터를 발행하지 않고, 미리 생성해둔 데이터를 발행하고 동작한 것을 알수 있다.
위의 Flux 의 예를 가지고 Cold 에서 Hot 으로 변경해보자.
Flux 는 ConnectableFlux 라는 것이 있다.
Flux 를 publish 메소드를 호출하면 ConnectableFlux 로 변경이 되고, connect 메소드를 호출 하면 구독을 시작하게 된다.
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
ConnectableFlux<Integer> hotSource = source.publish();
hotSource.subscribe(System.out::println, e -> {}, () -> {});
hotSource.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");
hotSource.connect();
output
done subscribing
will now connect
subscribed to source
1
1
2
2
3
3
또한, ConnectableFlux 에는 autoConnect 라는 메소드도 있다.
아래 코드에서 autoConnect(2) 라는 것은 2개의 구독자가 생기게 된다면 실행한다라는 것이다. 명시적으로 connect 를 호출하지 않아도 된다.
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
Flux<Integer> autoCo = source.publish().autoConnect(2);
autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});
output
subscribed first
subscribing second
subscribed to source
1
1
2
2
3
3
2번째의 subscribe 를 호출할때 실행되는 것을 확인 할 수 있다.
'개발관련' 카테고리의 다른 글
MSA 분산 트랜잭션 (0) | 2020.04.27 |
---|---|
Optimistic Lock과 Pessimistic Lock (0) | 2020.04.15 |
Webflux vs WebMvc 성능 비교 (0) | 2020.04.06 |
Spring Webflux, 이해하고 사용하자 (0) | 2020.03.21 |
Spring Boot & HikariCP 튜닝 (0) | 2020.03.15 |