본문 바로가기

Spring Boot/Webflux

Reactor란?

728x90

Reactor란?

Reactor는 RxJava와 함께 Reactive Stream(리액티브 스트림) 사양을 구현한 Reactive 라이브러리입니다. 

공식문서: https://projectreactor.io/

 

 

Reactive Stream 관련 내용은 아래 블로그에서도 설명이 잘되어 있어서 참고하시면 되겠습니다.

 

https://velog.io/@korea3611/%EB%A6%AC%EC%95%A1%ED%8B%B0%EB%B8%8C-%EC%8A%A4%ED%8A%B8%EB%A6%BC%EC%A6%88Reactive-Streams%EB%9E%80

 

리액티브 스트림즈(Reactive Streams)란?

이 글은 인프런의 Kevin의 알기 쉬운 RXJava 1부를 참고합니다.데이터를 생성해서 내보는 쪽 : 생산자생산자가 데이터를 내보는 것을 통지데이터를 소비하는 것을 소비자리액티브 프로그래밍 라이

velog.io

 

https://sabarada.tistory.com/98

 

[Java] Reactive Stream 이란?

reactive stream이란 non-blocking(넌블럭킹) backPressure(역압)을 이용하여 비동기 서비스를 할 때 기본이 되는 스펙입니다. java의 RxJava, Spring5 Webflux의 Core에 있는 ProjectReactor 프로젝트 모두 해당 스펙을 따

sabarada.tistory.com

 


Reactor는 아래의 특징을 가집니다.

  • Reactor는 Spring Framework 5부터 리액티브 프로그래밍을 위해 지원되고 있고, 최소 Java8 이상에서 동작합니다. 특히, Java8부터 지원되는 함수형 프로그래밍 API를 통해서 Reactor의 Publisher와 Subscriber 간의 상호작용이 이루어집니다.
  • Reactor는 JVM 위에서 실행되는 Non-Blocking 애플리케이션을 제작하기 위해 필요한 핵심 기술입니다. 
  • Reactor는 MonoFlux라는 두 가지 비동기 Sequence API를 제공합니다. 
    • Mono: Reactor에서 지원하는 Publisher 타입으로, 방출하는 데이터가 0개이거나 1개인 경우에 특화된 Publisher
    • Flux: Reactor에서 지원하는 Publisher 타입으로, 방출하는 데이터가 N개인 경우에 특화된 Publisher
  • Reactor는 Publisher로부터 전달받은 데이터를 처리하는 데 있어서 과부하가 걸리지 않도록 제어하는 Backpressure를 지원합니다.   

 


Reactor의 핵심은 아래로 점철됩니다.

  • 1단계: 데이터를 생성해서 제공
  • 2단계: 데이터를 가공
  • 3단계: 전달받은 데이터를 처리

 

아래 예시 코드를 통해 확인해 보겠습니다.

 

Reactor를 사용하기 위해서는 Spring boot의 Webflux 관련 라이브러리를 추가해야 합니다. 아래 코드는 maven 기준입니다.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

 

import reactor.core.publisher.Flux; 

public class Main {
    public static void main(String[] args) {
        Flux<String> sequence = Flux.just("Hello", "Reactor"); // 1단계 (데이터 방출)
        sequence.map(String::toLowerCase) // 2단계 (데이터 가공)
                .subscribe(System.out::println); // 3단계 (가공된 데이터 처리)
    }
}

 

 

위 코드를 실행하면 아래와 같이 실행됩니다.

 

이 3가지 단계는 데이터를 가공 처리하는 단계가 얼마나 복잡해지느냐 등의 추가 작업에 상관없이 수행되는 필수 단계라는 것을 기억합시다!

 


참고 (subscribe() 응용)

subscribe 함수에 커서를 올리면 아래와 다양한 시그니처가 있음을 볼 수 있습니다.

 

아래는 subscribe()의 여러 구현체 중에, 3번째 메서드를 사용한 코드입니다.

public class Main {
    public static void main(String[] args) {
        Mono.empty()
                .subscribe(none-> System.out.println("# emitted onNext signal"), 
                        error -> {System.out.println(error)},
                        () -> System.out.println("# emitted onComplete signal"));

    }
}

 

위 코드에서 subscribe()의 첫 번째 인수는 onNext Signal을 전송하면 실행되는 람다 표현식입니다. 즉, Subscriber가 Publisher로부터 데이터를 전달받기 위해 사용됩니다.

두 번째 람다 표현식은 Publisher가 onError Signal을 전송하면 실행됩니다. Publisher가 데이터를 전송하는 도중에 에러가 발생할 경우 error을 Exception 형태로 전달받기 위해 사용됩니다.

세 번째 람다 표현식은 Publisher가 onComplete Signal을 전송하면 실행됩니다. Publisher의 데이터 emit이 종료되었음을 인지하고 이에 따른 후처리를 진행하는 데 사용할 수 있습니다.

 

위 코드의 경우 방출된 데이터가 하나도 없고, 에러가 발생하지 않기 때문에 마지막 람다 표현식만 수행되어 아래와 같이 출력됩니다.

# emitted onComplete signal

 


아래 코드를 실행하면 어떻게 될지 예상한 후 결과를 확인해 보세요!

public class Main {
    public static void main(String[] args) {
        Mono.empty()
                .subscribe(none -> System.out.println("# emitted onNext signal"),
                        error -> {
                        },
                        () -> System.out.println("# emitted onComplete signal"));

        System.out.println("========================");

        Flux.just(1, 2, 3)
                .subscribe(data -> System.out.println("# emitted onNext signal: " + data),
                        error -> {
                        },
                        () -> System.out.println("# emitted onComplete signal"));

        System.out.println("========================");

        Mono.error(new Exception("Exception!!"))
                .subscribe(none -> System.out.println("# emitted onNext signal"),
                        System.out::println,
                        () -> System.out.println("# emitted onComplete signal"));


    }
}

 

실행결과


[참고]

(도서) 황정식, 스프링으로 시작하는 리액티프 프로그래밍 , 비제이퍼블릭(2023년)

728x90
반응형

'Spring Boot > Webflux' 카테고리의 다른 글

Context  (1) 2024.10.15
Scheduler  (1) 2024.10.13
Cold sequence와 Hot sequence  (0) 2024.10.12