스프링의 웹플럭스에는 Flux와 Mono 라는 Reactive Streams Publisher 의 구현체가 있다.

Flux 와 Mono 는 두 종류의 발행 방식이 있는데, Cold 과 Hot 방식이 존재한다.

 

Cold sequences


subscribe 할때 마다, 매번 새로운 데이터를 발행하고 동작하는 방식이다. 그리고 subscribe 를 하기 전에는 동작하지 않는다. Webflux 에서는 일반적으로 Cold 방식으로 동작한다.

 

먼저 Mono 로 예를 들어 보자.

Mono<String> body = httpGet();
body.subscribe(s -> log.info("Subscriber 1 : " + s.length()));
body.subscribe(s -> log.info("Subscriber 2 : " + s.length()));

public Mono<String> httpGet() {
	return WebClient
		.create()
		.get()
		.uri("http://google.com")
		.retrieve()
		.bodyToMono(String.class)
		.log();
}

output :

[reactor-http-nio-1] INFO reactor.Mono.FlatMap.1 - | onNext(생략..)
[reactor-http-nio-2] INFO reactor.Mono.FlatMap.1 - | onNext(생략..)
[reactor-http-nio-2] INFO Test - Subscriber 2 : 219
[reactor-http-nio-1] INFO Test - Subscriber 1 : 219
[reactor-http-nio-2] INFO reactor.Mono.FlatMap.1 - | onComplete()
[reactor-http-nio-1] INFO reactor.Mono.FlatMap.1 - | onComplete()

onNext와 onComplete 가 2번씩 호출된 것을 알수 있고 Subscribe 결과도 2번이 찍혔다.

즉, Cold 방식대로 subscribe 할때 마다 매번 WebClient 가 동작을 한 것을 알수 있다.

 

만약, 위 코드에서 body.subscribe 메소드를 호출하지 않는다면 어떻게 될까? WebClient 가 동작 할 것인가?

Cold 는 subscribe 를 하지 않는다면 데이터를 발행과 동작을 하지 않기 때문에 아무런 동작을 하지 않는다. 

 

 

다음은 Flux 로 예를 들어 보자.

Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("subscribed to source"));

source.subscribe(System.out::println, e -> {}, () -> {});
source.subscribe(System.out::println, e -> {}, () -> {});

output:

subscribed to source
1
2
3
subscribed to source
1
2
3

Cold 방식대로 subscribe 할때마다 매번 독립적인 데이터를 발행하는 동작을 확인 할 수 있다.

 

Hot sequences


subscribe 할때 마다, 새로운 데이터를 발행이나 동작하지 않는 방식이다. subscribe를 매번 할때마다 미리 생성해 둔 데이터로 동작을 한다. 그리고 subscribe 와 무관하게 즉시 데이터 발행과 동작도 가능한 방식이다.

 

위의 Mono 의 예를 가지고 Cold 에서 Hot 으로 변경해보자.

Mono 의 cache() 메소드를 호출해 주면 된다. (참 간단하다.)

Mono<String> body = httpGet().cache();
body.subscribe(s -> log.info("Subscriber 1 : " + s.length()));
body.subscribe(s -> log.info("Subscriber 2 : " + s.length()));

public Mono<String> httpGet() {
	return WebClient
		.create()
		.get()
		.uri("http://google.com")
		.retrieve()
		.bodyToMono(String.class)
		.log();
}

output

[reactor-http-nio-1] INFO reactor.Mono.FlatMap.1 - | onNext(생략..)
[reactor-http-nio-1] INFO Test - Subscriber 2 : 219
[reactor-http-nio-1] INFO Test - Subscriber 1 : 219
[reactor-http-nio-1] INFO reactor.Mono.FlatMap.1 - | onComplete()

onNext, onComplate 가 1번만 찍혀 있고, reactor-http-nio 스레드도 1개만 사용하고 있다. 

즉, Hot 방식대로 subscribe 할때 마다 매번 독립적인 데이터를 발행하지 않고, 미리 생성해둔 데이터를 발행하고 동작한 것을 알수 있다.

 

 

위의 Flux 의 예를 가지고 Cold 에서 Hot 으로 변경해보자.

Flux 는 ConnectableFlux 라는 것이 있다.

Flux 를 publish 메소드를 호출하면 ConnectableFlux 로 변경이 되고, connect 메소드를 호출 하면 구독을 시작하게 된다.

Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("subscribed to source"));

ConnectableFlux<Integer> hotSource = source.publish();

hotSource.subscribe(System.out::println, e -> {}, () -> {});
hotSource.subscribe(System.out::println, e -> {}, () -> {});

System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");

hotSource.connect();

output

done subscribing
will now connect
subscribed to source
1
1
2
2
3
3

 

또한, ConnectableFlux 에는 autoConnect 라는 메소드도 있다.

아래 코드에서 autoConnect(2) 라는 것은 2개의 구독자가 생기게 된다면 실행한다라는 것이다.  명시적으로 connect 를 호출하지 않아도 된다.

Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("subscribed to source"));

Flux<Integer> autoCo = source.publish().autoConnect(2);

autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});

output

subscribed first
subscribing second
subscribed to source
1
1
2
2
3
3

2번째의 subscribe 를 호출할때 실행되는 것을 확인 할 수 있다.

 

 

 

반응형

'개발관련' 카테고리의 다른 글

MSA 분산 트랜잭션  (0) 2020.04.27
Optimistic Lock과 Pessimistic Lock  (0) 2020.04.15
Webflux vs WebMvc 성능 비교  (0) 2020.04.06
Spring Webflux, 이해하고 사용하자  (0) 2020.03.21
Spring Boot & HikariCP 튜닝  (0) 2020.03.15
댓글