Webflux 란? Webflux를 먼저 이해하려면, reactive streams 를 먼저 이해해야 한다.

reactive streams 란?


reactive-streams.org 에서는 다음과 같이 정의하고 있다.

 

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.

대충 해석하면 "논블로킹(Non-blocking) 백프레셔(back pressure)를 이용한 비동기 데이터 처리의 표준이다"

 

Reactive Streams 배경

2013년에 Netflix와 Pivotal, Lightbend의 엔지니어들이 처음 개발하기 시작하였고,  Netflix는 RxJava, Pivotal은 Reactor, 그리고 Lightbend는 Akka 에서 스트림 API가 필요하여 표준 API를 만들었다고 한다.

2015년 4월에 JVM에서 사용하기 위한 1.0.0 스펙을 릴리스 하였고, 2017년 9월에 Flow API라는 이름으로 java.util.concurrent 패키지 아래 포함시킨 Java 9이 릴리스되었다.

 

Reactive Streams API

그렇다면 reactive streams API 는 어떻게 생겼을까? 비교적 간단하게 생겼다.

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

 

Reactive Streams 구현체

  • Project Reactor
  • RxJava
  • Akka Streams
  • Java9

이제 Ractive Streams 에서 가장 중요한 용어인 스트리밍 처리, 논블록킹(asynchronous), 백프레셔(back pressure)를 이해를 해야 하는데, 스트리밍과 논블록킹은 많이 아는 내용이니, 스킵하고 백 프레셔를 간략하게 설명을 드리면,,

 

백프레셔(back pressure) : Publisher 에서 발행하고, Subscriber에서 구독할 때, Publisher 에서 데이터를 Subscriber 로 Push 하는 방식이 아니라, Pull 방식으로 Subscriber 가 Publisher 로 처리할 수 있는 양의 크기만큼 데이터를 요청 함으로써 Subscriber의 장애를 방지하기 위함이다.
즉, 다이나믹 풀 방식의 데이터 요청을 통해서 구독자가 수용할 수 있는 만큼 데이터를 요청하는 방식이다.

 

Webflux 란?


Project Reactor 의 웹의 스트리밍 처리를 담당하는 역활을 한다.

스프링5는 Spring Boot 2 부터 도입이 되었으니, Spring Boot 2 의 stack 는 아래와 같다.

개발자는 Reactive Stack 를 사용할지, Servlet Stack 를 사용할지 선택을 해야 한다. 두개의 stack 을 동시에 사용할 수 없다.

 

Spring Boot2 Stack

 

Flux 와 Mono

일단 Flux Mono 를 알고 넘어가야 한다. "Reactive Streams" 인터페이스 중에서 Publisher 를 구현을 해 놓은 발행자이다.

Flux 와 Mono 의 차이점은 발행하는 데이터 갯수이다.

 

  • Flux : 0 ~ N 개의 데이터 전달
  • Mono : 0 ~ 1 개의 데이터 전달
Flux<Integer> ints = Flux.range(1, 3);
ints.subscribe(System.out::println);

Mono<String> mono = Mono.just("hello");
mono.subscribe(System.out::println);

 

Webflux + Reactive Redis 예제


Webflux 는 아래 처럼 두개의 프로그래밍 모델을 지원하는데, 이번글에서는 Annotation 방식으로 설명하겠다.

 

 

1. Dependencies 에 webflux와 redis-reactive 를 추가해준다.

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
}

 

2. Redis config

@Configuration
public class RedisConfigurer {

    @Bean
    public LettuceConnectionFactory redisConnectionFactory() {

        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
                .commandTimeout(Duration.ofSeconds(2))
                .shutdownTimeout(Duration.ZERO)
                .build();

        return new LettuceConnectionFactory(new RedisStandaloneConfiguration("localhost", 6379), clientConfig);
    }

    @Bean
    public ReactiveRedisTemplate<String, Person> personRedisTemplate(
            ReactiveRedisConnectionFactory connectionFactory) {

        Jackson2JsonRedisSerializer<Person> serializer = new Jackson2JsonRedisSerializer(Person.class);
        RedisSerializationContextBuilder<String, Person> builder = RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
        RedisSerializationContext<String, Person> serializationContext = builder.value(serializer).build();

        return new ReactiveRedisTemplate<>(connectionFactory, serializationContext);
    }
}

 

3. Controller 작성

@RestController
public class PersonController {
  private ReactiveRedisOperations<String, Person> personOps;

  public PersonController(ReactiveRedisOperations<String, Person> personOps) {
    this.personOps = personOps;
  }

  @GetMapping("/person/{id}")
  public Mono<Person> person(@PathVariable long id) {
    return personOps.opsForValue().get("person:" + id);
  }
  
  @GetMapping("/persons")
  public Flux<Person> allPerson() {
    return personOps.keys("*")
        .flatMap(personOps.opsForValue()::get);
  }
}

 

코드를 보면 느낌이 올수도 있지만, Publisher 의 구현체인 Mono와 Flux 를 활용하여 데이터를 발행하는 객체를 만들고 Controller 로 리턴을 하면 Spring 에서 알아서 subscribe 하도록 되어 있다. 즉, Controller 에서 각 명령을 실행 한다기 보다, 데이터 발행하는 Publisher 를 만드는 코드를 작성하도록 접근을 해야 한다.

 

Webflux 는 Asynchronous Non-blocking I/O 을 방식을 활용하여 성능을 끌어 올릴 수 있는 장점이 있다. 그런데 이 말은 즉, Non Blocking 기반으로 코드를 작성해야 한다. 만약 Blocking 코드가 있다면, 안 하느니만 못하는 상황이 될 것이다

얼마 전까지는 Java 진영에 Non Blocking 을 지원하는 DB Driver가 없었다. 최근에 R2DBC 가 릴리즈 되면서 이제는 Java 진영에서도 Non Blocking 으로 DB 를 접근할 수 있게 되었다. (R2DBC는 곧 글을 써볼 예정이다.)

반응형

'개발관련' 카테고리의 다른 글

MSA 분산 트랜잭션  (0) 2020.04.27
Optimistic Lock과 Pessimistic Lock  (0) 2020.04.15
Webflux vs WebMvc 성능 비교  (0) 2020.04.06
Spring Webflux Cold / Hot 이해하기  (0) 2020.03.29
Spring Boot & HikariCP 튜닝  (0) 2020.03.15
댓글