Reactor란?
Reactor란?
Reactor는 RxJava와 함께 Reactive Stream(리액티브 스트림) 사양을 구현한 Reactive 라이브러리입니다.
공식문서: https://projectreactor.io/
Reactive Stream 관련 내용은 아래 블로그에서도 설명이 잘되어 있어서 참고하시면 되겠습니다.
https://sabarada.tistory.com/98
Reactor는 아래의 특징을 가집니다.
- Reactor는 Spring Framework 5부터 리액티브 프로그래밍을 위해 지원되고 있고, 최소 Java8 이상에서 동작합니다. 특히, Java8부터 지원되는 함수형 프로그래밍 API를 통해서 Reactor의 Publisher와 Subscriber 간의 상호작용이 이루어집니다.
- Reactor는 JVM 위에서 실행되는 Non-Blocking 애플리케이션을 제작하기 위해 필요한 핵심 기술입니다.
- Reactor는 Mono와 Flux라는 두 가지 비동기 Sequence API를 제공합니다.
- Mono: Reactor에서 지원하는 Publisher 타입으로, 방출하는 데이터가 0개이거나 1개인 경우에 특화된 Publisher
- Flux: Reactor에서 지원하는 Publisher 타입으로, 방출하는 데이터가 N개인 경우에 특화된 Publisher
- Reactor는 Publisher로부터 전달받은 데이터를 처리하는 데 있어서 과부하가 걸리지 않도록 제어하는 Backpressure를 지원합니다.
Reactor의 핵심은 아래로 점철됩니다.
- 1단계: 데이터를 생성해서 제공
- 2단계: 데이터를 가공
- 3단계: 전달받은 데이터를 처리
아래 예시 코드를 통해 확인해 보겠습니다.
Reactor를 사용하기 위해서는 Spring boot의 Webflux 관련 라이브러리를 추가해야 합니다. 아래 코드는 maven 기준입니다.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
import reactor.core.publisher.Flux;
public class Main {
public static void main(String[] args) {
Flux<String> sequence = Flux.just("Hello", "Reactor"); // 1단계 (데이터 방출)
sequence.map(String::toLowerCase) // 2단계 (데이터 가공)
.subscribe(System.out::println); // 3단계 (가공된 데이터 처리)
}
}
위 코드를 실행하면 아래와 같이 실행됩니다.
이 3가지 단계는 데이터를 가공 처리하는 단계가 얼마나 복잡해지느냐 등의 추가 작업에 상관없이 수행되는 필수 단계라는 것을 기억합시다!
참고 (subscribe() 응용)
subscribe 함수에 커서를 올리면 아래와 다양한 시그니처가 있음을 볼 수 있습니다.
아래는 subscribe()의 여러 구현체 중에, 3번째 메서드를 사용한 코드입니다.
public class Main {
public static void main(String[] args) {
Mono.empty()
.subscribe(none-> System.out.println("# emitted onNext signal"),
error -> {System.out.println(error)},
() -> System.out.println("# emitted onComplete signal"));
}
}
위 코드에서 subscribe()의 첫 번째 인수는 onNext Signal을 전송하면 실행되는 람다 표현식입니다. 즉, Subscriber가 Publisher로부터 데이터를 전달받기 위해 사용됩니다.
두 번째 람다 표현식은 Publisher가 onError Signal을 전송하면 실행됩니다. Publisher가 데이터를 전송하는 도중에 에러가 발생할 경우 error을 Exception 형태로 전달받기 위해 사용됩니다.
세 번째 람다 표현식은 Publisher가 onComplete Signal을 전송하면 실행됩니다. Publisher의 데이터 emit이 종료되었음을 인지하고 이에 따른 후처리를 진행하는 데 사용할 수 있습니다.
위 코드의 경우 방출된 데이터가 하나도 없고, 에러가 발생하지 않기 때문에 마지막 람다 표현식만 수행되어 아래와 같이 출력됩니다.
# emitted onComplete signal
아래 코드를 실행하면 어떻게 될지 예상한 후 결과를 확인해 보세요!
public class Main {
public static void main(String[] args) {
Mono.empty()
.subscribe(none -> System.out.println("# emitted onNext signal"),
error -> {
},
() -> System.out.println("# emitted onComplete signal"));
System.out.println("========================");
Flux.just(1, 2, 3)
.subscribe(data -> System.out.println("# emitted onNext signal: " + data),
error -> {
},
() -> System.out.println("# emitted onComplete signal"));
System.out.println("========================");
Mono.error(new Exception("Exception!!"))
.subscribe(none -> System.out.println("# emitted onNext signal"),
System.out::println,
() -> System.out.println("# emitted onComplete signal"));
}
}
실행결과
[참고]
(도서) 황정식, 『 스프링으로 시작하는 리액티프 프로그래밍 』 , 비제이퍼블릭(2023년)