본문 바로가기

Spring Boot/Webflux

Context

728x90

Context란?

일반적으로 문맥이라고도 알려져 있는 Context는 어떤 상황에서 그 상황을 처리하기 위해 필요한 정보를 의미합니다.

Reactor에서의 Context

Operator 체인에 전파되는 키와 값 형태의 저장소입니다. 

ThreadLocal에서 스레드마다 생성 저장소가 생성되듯이, Context는 구독이 발생할 때마다 생성됩니다. 

즉, Context는 Subscriber 와 매핑되어, 해당 Subscribe(구독)과 연결된 하나의 Context가 생깁니다.

 

아래 예시 코드로 Reactor의 Context를 자세히 알아보겠습니다.

예시


public class Main {
    public static void main(String[] args) throws InterruptedException {

        Mono 
            .deferContextual( // defereContextual: Context에 저장된 데이터와 원본 데이터 소스의 처리를 지연시키는 역할
            ctx ->
         Mono
             .just("Hello" + " " + ctx.get("firstName")))// Context 에서 데이터 읽기
             .doOnNext(data -> log.info("# just doOnNext: {}", data))
             .subscribeOn(Schedulers.boundedElastic())
             .publishOn(Schedulers.parallel())
             .transformDeferredContextual( // Operator 중간에서 Context 데이터 처리
                     (mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
            )
             .contextWrite(context -> context.put("lastName", "Jobs")) // Context 에서 데이터 쓰기
             .contextWrite(context -> context.put("firstName", "Steve"))
             .subscribe(data -> log.info("# subscribe: {}",  data));
             
         Thread.sleep(400L);
       }
 }

 

위 코드에서 가장 두드러지는 Operator는 크게 3가지입니다.

  • defereContextual() : 원본 데이터 소스 레벨에서 Context의 데이터를 읽을 수 있습니다.
  • transformDeferredContextual() : Operator 체인의 중간에서 데이터를 읽을 수 있습니다.
  • contextWrite() : Context에 쓰기 작업을 할 수 있습니다. 

Context에서 데이터를 읽는 방식은 크게 두 가지라고 할 수 있습니다.  deferContextual() 메서드처럼 원본 데이터 소스 레벨에서 읽는 방식이고, transformDeferredContextual() 메소드처럼 Operator 체인의 중간에서 읽는 방식입니다.

 

그리고, deferContextual()의 파라미터로 넘어온 ctx 파라미터는 ContextView 타입의 객체입니다. 즉, Context의 데이터를 쓸 때는 Context를 사용하지만, Context에 저장된 데이터를 읽을 때는 ContextView를 사용합니다!

    public static <T> Mono<T> deferContextual(Function<ContextView, ? extends Mono<? extends T>> contextualMonoFactory) {
        return onAssembly(new MonoDeferContextual(contextualMonoFactory));
    }

 

Context에 데이터를 쓸 때는 Context 타입을 사용 
Context에 저장된 데이터를 읽을 때는 ContextView 타입을 사용

 

 

위 코드 실행결과는 아래와 같습니다.


 

위 코드에서 subscribeOn()과 publishOn()을 사용해 데이터를 emit 하는 스레드와 데이터를 처리하는 스레드를 분리하였습니다. 그래서 context에서 데이터를 읽어 오는 작업을 각각 다른 스레드에서 수행하고 있습니다. 즉, Reactor에서는 Operator 체인상의 서로 다른 스레드들이 Context의 저장된 데이터에 접근할 수 있습니다!


 

Context의 특징

Context는 위에서 설명했듯이 구독이 발생할 때마다 하나의 Context 가 해당 구독에 연결됩니다. 예시를 통해 확인해 보겠습니다.


@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {

        Mono<String> mono = Mono.deferContextual(
                        ctx ->
                                Mono.just("Data: " + ctx.get("data"))
                )
                .publishOn(Schedulers.parallel());
        mono.contextWrite(ctx -> ctx.put("data", "Number1"))
                .subscribe(data -> System.out.println("# subscribe: " + data));

        mono.contextWrite(ctx -> ctx.put("data", "Number2"))
                .subscribe(data -> System.out.println("# subscribe: " + data));

        Thread.sleep(400L);

    }
}

 

위 코드를 실행해 보면 각 구독마다 다른 데이터를 출력하는 것을 확인할 수 있습니다. 


여기에 더해, Context는 Operator 체인의 아래에서 위로 전파됩니다. 따라서, 동일한 키에 대한 값을 중복해서 저장하면 Operator 체인상에서 가장 위쪽에 위치한 contextWrite()가 저장한 값으로 Context에 저장됩니다.


@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException{

        String key1 = "Product";
        String key2 = "Name";

        Mono.deferContextual(ctx ->
                        Mono.just(ctx.get(key1)))
                .publishOn(Schedulers.parallel())
                .contextWrite(context -> context.put(key2, "Steve"))
                .transformDeferredContextual(
                 (mono, ctx) -> mono.map(data -> data + " " + ctx.getOrDefault(key2, "DefaultName"))
                )
                .contextWrite(context -> context.put(key1, "Book"))
                .subscribe(data -> log.info("# subscribe: {}", data));
                
        Thread.sleep(400L);
	}
 }

 

 

위 코드에서 key2에 'Steve'라는 값을 Operator 체인 중간에 쓰고 있지만, 실제 실행 결과는 아래와 같이 "DefaultName"이 출력됩니다. 이는 Operator 체인상 아래에서 위로 전파되는 Context의 특징 때문이라고 할 수 있습니다. (key2를 getOrDefault로 읽어올 때, 해당 key 값으로 저장되어 있는 value가 없기 때문에 Default 값으로 지정한 'DefaultName'을 출력하게 되는 것입니다.)


 

 

위와 같은 특징 때문에 일반적으로 모든 Operator에서 Context에 저장된 데이터를 읽을 수 있도록 contextWrite()을 Operator 체인의 맨 마지막에 둡니다!


마지막 특징으로는 Inner Sequence 내부에서는 외부 Context에 저장된 데이터에 접근할 수 있지만, Inner Sequence 외부에서는 Inner Sequence 내부의 Context에 저장된 데이터에 접근할 수 없습니다. 

Inner Sequence라는 것은 특정한 시퀀스 혹은 스트림 내에 포함된 다른 시퀀스를 가리킵니다.
예를 들어, 아래 코드와 같이 flatMap 연산자를 사용할 때, 원래의 스트림의 각 요소에 대해 새로운 스트림을 생성하게 되는데,  이 새로운 스트림이 inner sequence가 됩니다.
Flux<Integer> outerSequence = Flux.just(1, 2, 3);
Flux<Integer> innerSequence = outerSequence.flatMap(i -> Flux.range(i, 2));
// outerSequence: 1, 2, 3
// innerSequence: 1, 2, 2, 3, 3, 4 (각 숫자에 대해 두 개의 숫자 범위를 생성 - (1,2), (2,3), (3,4))
innerSequence.subscribe(System.out::println);

// 위 코드는 아래 코드와 동일한 코드입니다.
Flux.just(1,2,3)
	.flatMap(i->Flux.range(i,2))
	.subscribe(System.out::println);

 

 

관련 코드를 확인해 보겠습니다.


@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {

        String key1 = "Product";
        String key2 = "Name";

        Mono.just("Steve")
                .flatMap( // Inner Sequence
                        name -> Mono.deferContextual(ctx ->
                                        Mono.just(ctx.get(key1) + ", " + name))
                                .transformDeferredContextual(
                                        (mono, ctx) -> mono.map(data -> data + ", " + ctx.getOrDefault(key2, "DefaultName"))
                                )
                                .contextWrite(ctx -> ctx.put(key2, "Tomas"))
                )
                .publishOn(Schedulers.parallel())
                .contextWrite(context -> context.put(key1, "Book")) 
                .subscribe(data -> log.info("# subscribe: {}", data));
                
        Thread.sleep(400L);
	}
}

 

위 코드에서 마지막 contextWrite()에서 key1에 "Book"이라는 값을 저장했습니다. 이는 Inner Sequence 외부에 저장된 값으로 위에서 설명하였듯이 Inner Sequence에서 접근할 수 있는 값입니다. 따라서, Inner Sequence 내부에서 key1의 값을 ctx.get(key1)으로 꺼내올 수 있었습니다.

그리고, key2는 Inner Sequence 안에서 Context에 써주고 있기 때문에 당연히 Inner Sequence 에서 해당 값을 꺼내올 수 있고, 따라서 현재 코드에서 ctx.getOrDefault(key2, "DefaultName")으로 잘 꺼내오고 있습니다.

 

실행 결과는 아래처럼 Outer Sequence에서 저장한 값인 Book 도 잘 꺼내오고 있음을 확인할 수 있습니다.



 

마지막은 실제 Context가 어떻게 활용될 수 있는지 보여주는 코드입니다. Context에 인증 토큰을 담아서 서버에 전송한 후, 해당 정보를 Context를 활용해 접근하고 있습니다.


public class Main {
    public static void main(String[] args) {

        Mono<String> mono1 = postBook(Mono.just(
                new Book("ISBN-1111", "MyBook", "Tom")
        ))
                .contextWrite(Context.of("authToken", "abcdefg"));

        Mono<String> mono2 = postBook(Mono.just(
                new Book("ISBN-2222", "MyBook2", "Kevin")
        ))
                .contextWrite(Context.of("authToken", "lalalala"));

        mono1.subscribe(data -> System.out.println("# subscribe: " + data));
        mono2.subscribe(data -> System.out.println("# subscribe: " + data));

    }

    private static Mono<String> postBook(Mono<Book> book) {
        return Mono
                .zip(book, Mono
                        .deferContextual(ctx ->
                                Mono.just(ctx.get("authToken"))))
                .flatMap(tuple -> {
                    String response = "POST the book(" + tuple.getT1() + "," + tuple.getT1().getBookName()
                            + "," + tuple.getT1().getAuthor() + ") with token: " +
                            tuple.getT2();
                    return Mono.just(response);
                });
    }
}

 

실행 결과는 아래와 같습니다.



 

번외

이 외에도 어떤 상황에서 활용할 수 있는지 chatGpt 한테 물어봤습니다. ㅎㅎ 답변이 꽤나 자세하네요.

더보기

1. 상태 관리

Context를 사용하여 흐름 내에서 상태를 관리할 수 있습니다. 예를 들어, 사용자 세션 정보나 인증 토큰 등을 저장할 수 있습니다.

Mono.just("Hello")
    .contextWrite(Context.of("userId", 123))
    .flatMap(value -> {
        Integer userId = value.get("userId");
        // ...
    });​

 

2. 전역 설정

애플리케이션의 설정이나 구성 정보를 Context에 저장하여, 모든 리액티브 연산에서 접근할 수 있도록 할 수 있습니다.

 

3. 테스트

단위 테스트에서 Context를 사용하여 테스트 중에 필요한 데이터를 주입할 수 있습니다. 이를 통해 테스트의 독립성을 유지할 수 있습니다.

Flux<String> flux = Flux.just("A", "B", "C")
    .contextWrite(Context.of("testKey", "testValue"));
// 테스트에서 Context의 값을 검증

 

4. 데이터 전파

Context는 연산자 간에 데이터를 전파하는 데 사용될 수 있습니다. 예를 들어, API 호출 시 필요한 메타데이터를 전달할 수 있습니다.

 

5. 종속성 주입

서비스나 리포지토리와 같은 종속성을 Context에 저장하여, 필요한 곳에서 쉽게 사용할 수 있습니다.

Flux<String> flux = Flux.deferContextual(context -> {
    MyService myService = context.get(MyService.class);
    return myService.getData();
});

6. 데이터 전처리

데이터의 전처리 과정에서 필요한 정보나 설정을 Context에 저장하고, 이후의 처리 단계에서 활용할 수 있습니다.

 

그리고 요즘 회사에서 로깅 관련해서 코드를 작성하고 있는데, 로깅을 추적할 때도 Context를 사용할 수 있어 보여서 chatGpt 한테 물어보니 가능하다고 하네요!

더보기

네, Reactor에서 Context는 로깅을 추적할 때 매우 유용하게 사용할 수 있습니다. 다음과 같은 방식으로 활용할 수 있습니다.

1. 추적 정보 저장

각 리액티브 흐름에서 필요한 로깅 정보를 Context에 저장할 수 있습니다. 예를 들어, 요청 ID나 사용자 ID와 같은 정보를 저장하면, 이를 통해 로그를 추적할 수 있습니다.

Flux<String> flux = Flux.deferContextual(context -> {
    String requestId = context.get("requestId");
    return Flux.just("Processing request: " + requestId);
}).contextWrite(Context.of("requestId", "12345"));​

 

2. 로깅 연산자 사용

doOnNext, doOnError, doOnSubscribe, doOnComplete 등의 연산자를 사용하여 각 단계에서 Context에 저장된 정보를 로그에 기록할 수 있습니다.

 
flux
    .doOnNext(value -> {
        String requestId = value.get("requestId");
        System.out.println("Processing value: " + value + " for request: " + requestId);
    })
    .doOnError(error -> {
        String requestId = error.get("requestId");
        System.err.println("Error processing request: " + requestId + ", error: " + error);
    });

 

이처럼 Context는 다양한 상황에서 유용하게 사용되며, 리액티브 프로그래밍에서 상태를 관리하고 전파하는 강력한 도구로서 사용할 수 있어 보입니다!


[참고]

(도서) 황정식, 스프링으로 시작하는 리액티프 프로그래밍 , 비제이퍼블릭(2023년)

728x90
반응형

'Spring Boot > Webflux' 카테고리의 다른 글

Scheduler  (1) 2024.10.13
Cold sequence와 Hot sequence  (0) 2024.10.12
Reactor란?  (1) 2024.10.11