Spring Boot/Webflux

Cold sequence와 Hot sequence

작은별._. 2024. 10. 12. 14:31
728x90

Cold와 Hot

Cold sequence와 Hot sequence를 알기 전에 Cold와 Hot의 의미를 아래와 같이 정리하고 시작합시다.

 

Cold는 무언가를 새로 시작하고, Hot은 무언가를 새로 시작하지 않는다.

 

Sequence

Sequence란, Publisher가 emit 하는 데이터의 연속적인 흐름을 정의해 놓은 것으로, 코드로 표현하면 Operator 체인 형태로 정의됩니다.

즉, Cold를 Sequence에 적용해 보면 Sequence가 새로 시작된다 로 생각해 볼 수 있고, Hot을 Sequence에 적용해 보면 Sequence가 새로 시작되지 않는다라고 생각할 수 있겠습니다.

 

Cold sequence란?

Cold sequence는 Subscriber 가 구독할 때마다 데이터 흐름이 처음부터 다시 시작되는 Sequence입니다.

 

아래 마블 다이어그램을 보면 타임라인이 Subscriber의 개수만큼 있습니다. 즉, Subscriber의 구독 시점이 달라도, 구독을 할 때마다 Publisher가 데이터를 emit 하는 과정을 처음부터 다시 시작하는 데이터의 흐름이 Cold sequence입니다. 그리고 이러한 Publisher를 Cold Publisher라고 부릅니다.


cold sequence

 

Hot sequence란?

Hot sequence는 Cold sequence와 반대의 의미를 가집니다. Cold sequence의 경우 구독이 발생할 때마다 Sequence의 타임라인이 처음부터 새로 시작하기 때문에 Subscriber는 구독 시점과 상관없이 데이터를 처음부터 다시 전달받습니다. 하지만 Hot sequence는 구독이 발생한 시점 이전에 Publisher로부터 emit 된 데이터는 전달받지 못하고, 구독이 발생한 시점 후부터 emit 한 데이터만 받을 수 있습니다.


 

Hot sequence

 


Cold sequence 예제

@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {

        Flux<Integer> coldFlux = Flux.fromArray(new Integer[]{1, 2, 3, 4, 5})
                .map(x -> x * 2);
        coldFlux.subscribe(number -> log.info("# Subscriber 1: {}", number));
        Thread.sleep(1000L);
        coldFlux.subscribe(number -> log.info("# Subscriber 2: {}", number));
    }
}

 

실행 결과

Subscriber 2가 늦게 구독을 시작했지만 Publisher의 데이터를 처음부터 받고 있음을 확인할 수 있습니다.


 

Hot sequence 예제

@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {

        Flux<Integer> hotFlux = Flux.fromArray(new Integer[]{1, 2, 3, 4, 5})
                .delayElements(Duration.ofSeconds(1))
                .map(x -> x * 2)
                .share();
                
        hotFlux.subscribe(number -> log.info("# Subscriber 1: {}", number));
        Thread.sleep(2500); // 3번째 데이터부터 받도록 시간 설정
        hotFlux.subscribe(number -> log.info("# Subscriber 2: {}", number));


        Thread.sleep(5500);
    }
}

 

  • delayElements() Operator를 통해 각 데이터의 emit을 1초씩 지연시키도록 하였습니다.
  • 위 코드에서 share() Operator는 Cold sequence를 Hot sequence로 동작하게 해주는 Operator입니다. 즉, share() Operator는 원본 Flux(fromArray()에서 처음으로 리턴된 Flux)를 공유해서 Subscriber가 사용하도록 해 주는 Operator 입니다.

실행 결과

아래 실행결과를 통해 Subscriber 2가 구독을 시작한 시점부터 Publisher로부터 emit 된 데이터를 받고 있음을 확인할 수 있습니다.


 

Http 요청/응답에서의 Cold/Hot sequence

아래 코드는 Http 요청과 응답에서 Cold와 Hot sequence 동작을 구현한 코드입니다. 세계 시각을 알려주는 API를 사용하여 Seoul의 현재 시각을 받아오도록 하였습니다.


import com.jayway.jsonpath.JsonPath;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;

import java.net.URI;

@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();
                
                
	Mono<String> mono = getWorldTime(worldTimeUri); // Cold sequence
       // Mono<String> mono = getWorldTime(worldTimeUri).cache(); // cache() Operator -> Hot sequence
        mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
        Thread.sleep(2000);
        mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));

        Thread.sleep(2000);
    }

    private static Mono<String> getWorldTime(URI worldTimeUri){
        return WebClient.create()
                .get()
                .uri(worldTimeUri)
                .retrieve()
                .bodyToMono(String.class)
                .map(resp->
                    JsonPath.parse(resp)
                            .read("$.datetime")
                );
    }
}

 

 

위 코드에서 주석 처리된 부분이 Hot sequence를 구현한 코드입니다. cache() Operator 또한 Cold sequence를 Hot sequence로 동작하게 해 줍니다. 이 Operator는 emit 된 데이터를 캐시한 뒤, 구독이 발생할 때마다 캐시된 데이터를 전달하기 때문에 Subscriber는 동일한 데이터를 전달받게 됩니다.

 public final Mono<T> cache() {
        return onAssembly(new MonoCacheTime(this));
 }

 

실행 결과

위 코드를 실행해 보면, Cold sequence의 경우 2초 정도 차이나는 다른 시각이 출력되지만, Hot sequence는 동일한 시각이 출력됩니다.


cold sequence


Hot sequence

 

참고로, cache() Operator는 REST API에서 요청을 위해 인증 토큰이 필요한 경우 유용하게 사용할 수 있습니다. 인증 토큰을 한 번 받고 난 뒤에 해당 토큰이 만료될 때까지는 해당 토큰을 사용해서 인증이 필요한 API 요청에 사용할 수 있습니다. 이를 위해 인증 토큰을 API 요청마다 생성하여 전달받는 것이 아니라 cache() Operator를 사용해서 캐시된 인증 토큰을 사용하여 효율적인 동작 과정을 구성할 수 있을 것입니다.


[참고]

(도서) 황정식, 스프링으로 시작하는 리액티프 프로그래밍 , 비제이퍼블릭(2023년)

728x90
반응형