ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring WebFlux (1)
    Backend/spring 2023. 4. 17. 01:45

    최근에 사내에서 Spring WebFlux 관련 교육이 있었습니다. 이번 글과 다음 글을 통해 학습한 내용을 정리하도록 하겠습니다.

    목차는 아래 내용과 같고 글이 길어지기 때문에 이번 글에서는 2번 Reactive Streams 개요 목차 까지만 진행하고 다음 글에서 나머지 부분 모두 정리하도록 하겠습니다.

     

    목차

    1. Reactive Programming
    2. Reactive Streams 개요
      • Reactive Streams 개념
      • Reactive Streams 소개
    3. Project Reactor
      • Project Reactor 소개
      • Flux와 Mono
      • Flux, Mono API 소개
    4. WebFlux와 R2DBC, WebClient
      • Spring WebFlux
      • WebFlux App 구현 - Annotation방식, Fuctional Endpoint 방식
      • R2DBC
      • WebClient

     

    1. Reactive Programming (반응형 프로그래밍)

    • Reactive Programming(반응형 프로그래밍)은 데이터의 흐름과 변화의 전파에 중점을 둔 Declarative(선언적) programming 패러다임 입니다.
    • 작성한 코드의 순서대로 진행 되는 기존의 Imperative(명령형) programming과는 다르게 데이터의 흐름을 먼저 정의하면 데이터의 변화 혹은 작업의 종료에 따라 반응하여 진행되는 프로그래밍 입니다.
    // Imperative Programming
    let array = [1, 2, 3, 4, 5, 6]
    let eventNumbers = [];
    
    for (let i=0; i<6; i++) {
        if (array[i] % 2 == 0) {
            eventNumbers.append(array[i]);
        }
    }
    
    // Declarative Programming
    let eventNumbers2 = array.filter(x -> x % 2 == 0);
    • 선언적 프로그래밍
      • 무엇을 해야 할지 따로 약속(표현)을 만들어 기술하게 하고, 언제 어떻게 동작하는지는 내부에 처리하는 방식의 프로그래밍 기법
      • 일반적으로 선언적 프로그래밍이 명령형 프로그래밍에 비해서 가독성, 재사용성, 독립성, 유지보수성면에서 훨씬 좋다 → Framework를 쓰는것과 jQuery를 쓰는 것을 비교해서 생각해보자.

     

     

    명령형 프로그래밍 선언적 프로그래밍
    어떻게 하는지 알고리즘을 묘사 무엇을 해야 하는지 묘사
    시간 순서대로 작성 언제 어떻게 실행되는지는 내부에서 결정된다.
    가장 일반적인 스크립트 코드 CSS, HTML, SQL, JSON, Template
    컴퓨터가 이해하기 좋은 코드 기획문서에 가까운 코드

     

     

     

    • 변경사항의 전파(Pull → Push)
      • 예전에 DOM 조작을 통해 화면을 구현하는 것이 중심이었으니 Render를 해야 하는 시점에 필요한 데이터를 모두 불러와서(Pull) 화면을 출력하는 관점에서 UI 개발을 하였습니다.
      • 하지만 새로운 반응형 프로그래밍에서는 미리 선언이 되어 있는 구조에서 값이 변화할 때마다 템플릿으로 데이터를 전달(Push)하는 관점으로 설계가 되도록 변화되었습니다.
    즉, 웹 프로그래밍은 데이터를 가져와서 화면을 만드는 방향에서 **무엇을 할지 선언을 하고 변경된 데이터를 감지하고 전파하는 방향**으로 변해왔습니다.

     

     

    Stream 이란

    • 변경사항의 전파와 데이터 흐름을 Event라고 생각할 수 있는데 이러한 Event를 좀 더 제네럴 하게 만든것이 Stream 이다. 즉, 반응형 프로그래밍은 스트림을 선언적으로 작성하는 프로그래밍이라고 생각할 수 있습니다.
    • 스트림을 여러 개 연결하여 아래 사진 처럼 각 노드를 하나의 스트림으로 정의하고 선언적으로 개발 할 수 있습니다.

     

    결국 반응형 프로그래밍이란, 데이터의 흐름과 변경사항의 전파에 중점을 둔 선언적 프로그래밍이며 → 이는 즉, 모든 것을 스트림으로 간주하고 선언적으로 개발하는 것을 의미합니다.

     

    아래 글을 참고 했는데 Reactive Programming에 대해 자세히 설명이 되어 있어서 읽어보시면 좋을 것 같습니다.

     

     

    프로그래밍 패러다임과 반응형 프로그래밍 그리고 Rx | 요즘IT

    설계에 관한 이야기를 먼저 쓰려고 했는데 먼저 설계의 원칙이라고 할 수 있는 프로그래밍 패러다임에 대한 설명이 선행이 되어야 할 것 같아, 현재 제가 쓰고 있는 개발 패러다임인 ’반응형

    yozm.wishket.com

     


     

     

    2. Reactive Streams

    Reactive Streams 개념

    • Reactive Streams 공식 페이지에는 아래와 같이 설명이 되어있습니다. 즉, 논블로킹(non-blocking) 백 프레셔(back pressure)를 이용한 비동기 데이터 처리의 표준입니다.

    Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.

     

     

    스트리밍 처리, 비동기 방식, 백프레셔 각각의 단어가 의미하는 바를 조금 더 자세히 알아보겠습니다.

     

     

    [ 스트리밍 처리 ]

     

     

    • 왼쪽의 전통적인 데이터 처리 방식은 데이터 처리를 작업을 모든 데이터가 애플리케이션 메모리에 저장한 후에 처리를 한다는 점입니다. 즉, 전달된 데이터는 물론 저장소에서 조회한 데이터까지 모두 메모리에 적재되어야만 응답 메시지를 만들 수 있다는 것입니다.
    • 하지만 스트림 처리 방식을 적용한다면 크기가 작은 시스템 메모리로도 많은 양의 데이터를 처리할 수 있습니다. 입력 데이터에 대한 파이프 라인을 만들어 데이터가 들어오는 대로 물 흐르듯이 구독(subscribe)하고, 처리한 뒤, 발행(publish)까지 한 번에 연결하여 처리할 수 있습니다.

     

     

    [ 비동기 방식 ]

     

     

     

    •  Synchronous
      • 클라이언트가 서버에 요청을 보내면 응답을 받기 전까지 블로킹됩니다. 블로킹 된다는 것은 현재 스레드가 다른 일을 하지 못하고 기다린다는 것을 의미합니다. 따라서 두 개의 요청을 A와 B 서버로 보내면 A의 응답이 긑나고 나서야 B로 요청을 보낼 수 있습니다.

     

    • Asynchronous
      • 현재 스레드가 블로킹 되지 않기 때문에 다른 일을 계속할 수 있습니다. A에게 요청을 보낸 뒤 다른 일을 처리 할 수도 있고, 혹은 B에게 또 다른 요청을 보낼 수도 있습니다.
      • 두 개의 요청을 동시에 보내기 때문에 더 빠른 응답 속도를 낼 수 있습니다.
      • 적은 리소스 사용 - 현재 스레드가 블로킹 되지 않고 다른 업무를 처리 할 수 있어서 더 적은 수의 스레드로 더 많은 요청을 처리 할 수 있습니다.

     

    • Blocking / Non-Blocking : 제어권의 이동, 작업 완료의 통지
      • Blocking은 호출된 함수가 자신의 할 일을 모두 마칠 때까지 제어권을 계속 가지면서 호출한 함수에게 바로 돌려주지 않는다 → 즉, 호출한 함수는 제어권이 없어서 다른 작업을 수행 할 수 없습니다.
      • Non-Blocking은 호출된 함수가 자신이 할 일을 마치지 않았더라도 바로 제어권을 건네주어(return) 호출 한 함수가 다른 일을 진행 할 수 있도록 해준다 → 즉, 호출한 함수는 결과를 기다리지 않고, 제어권이 있어서 다른 작업을 수행 할 수 있습니다.

     

     

     

    • Synchronous / Asynchronous : 작업 순서의 보장 여부
      • 동기식 모델은 모든 작업들이 일련의 순서를 따르며 그 순서에 맞게 동작합니다 → 즉, 작업의 순서가 보장됩니다. a, b, c 순서대로 작업이 시작 되었다면 a, b, c 순서로 작업이 끝나야 합니다.
      • 비동기식 모델은 작업의 순서가 보장되지 않습니다. 즉, 비동기 방식은 요청 → 결과가 동시에 이루어지지 않는다는 것을 의미합니다.

     

     

     

    [ 백 프레셔 ]

    • 백 프레셔를 알아보기 전에 push 방식과 pull 방식에 대해 먼저 알아보겠습니다.
    • push 방식
      • 옵저버 패턴에서는 발행자(publisher)가 구독자(subscriber)에게 밀어 넣는 방식으로 데이터가 전달됩니다. 발행자는 구독자의 상태를 고려하지 않고 데이터를 전달하는 데에 만 충실합니다.
      • 만약 구독자의 처리량 보다 발행자가 더 많은 데이터를 보낸다면 큐(Queue)를 이용해서 대기 중인 이벤트를 저장해야 합니다.

     

    • 개선된 push 방식
      • 서버가 가용 할 수 있는 메모리는 한정되어 있습니다. 아무리 버퍼를 사용한다고 하더라도 버퍼는 순식간에 소모되고 말 것입니다. 결국 기존 push 이벤트와 동일한 문제가 발생하게 됩니다.

     

     

     

    • pull 방식
      • 발행자가 데이터를 전달 할 때 구독자가 필요한 만큼만 전달하면 해결할 수 있다는 것이 Back Pressure의 기본 원리 입니다.
      • pull 방식에선 구독자가 10개를 처리할 수 있다면 발행자에게 10개만 요청합니다. 발행자는 요청 받은 만큼만 전달하고 구독자는 더이상 out of memory 에러를 걱정하지 않아도 됩니다.

     

    • Back Pressure
      • pull 방식에선 이렇게 전달되는 모든 데이터의 크기를 구독자가 정합니다.
      • 이런 Dynamic Pool 방식의 데이터 요청을 통해서 구독자가 수용 할 수 있는 만큼만 데이터를 요청하는 방식이 Back Pressure 입니다.

     

     

    [ 표준 ]

    • Reactive Streams는 2013년에 Netflix와 Pivotal, Lightbend의 엔지니어들이 처음 개발하기 시작했습니다. 모두 스트림 API가 꼭 필요한 회였는데, 스트림은 서로 유기적으로 엮여서 흘러야 의미가 있었습니다.
    • 데이터가 지속적으로 흐르기 위해서는 서로 다른 회사가 공통의 스펙을 설정하고 구현해야 하기 때문에 표준화가 필요했고 Reqctive Streams라는 표준화된 API가 개발되었습니다.

     

     

    • Reactive Stream에는 다양한 구현체가 존재합니다. 각각의 구현체는 서로 특성이 조금씩 다르기 때문에 상황에 따라, 필요에 맞게 골라서 사용할 수 있습니다.
      • 순수하게 스트림 연산 처리만 필요하다면 : RxJava, Reactor Core, Akka Streams …
      • 저장소의 데이터를 Reactive Streams로 조회하고 싶다면 : Reactive Mongo, Slick …
      • 웹 프로그래밍과 연결된 Reactive Streams가 필요하다면 : Spring WebFlux, Play Framework, Armeria, Vert.x …

     

    Reactive Streams API 소개

    • Reactive Streams는 4개의 인터페이스 API로 구성되어 있습니다.
    • java.util.concurrent.Flow 에서 확인할 수 있습니다.

     

    public interface Publisher<T> {
       public void subscribe(Subscriber<? super T> s);
    }
    
    public interface Subscriber<T> {
       public void onSubscribe(Subscription s);
       public void onNext(T t);
       public void onError(Throwable t);
       public void onComplete();
    }
    
    public interface Subscription {
       public void request(long n);
       public void cancel();
    }
    
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }

     

    • Publisher에는 Subscriber의 구독을 받기 위한 subscribe API 하나만 있습니다.
    • Subscriber에는 받은 데이터를 처리하기 위한 onNext, 에러를 처리하는 onError, 작업 완료 시 사용하는 onComplete, 그리고 매개 변수로 Subscription을 받는 onSubscribe API 가 있습니다.
    • Subscription은 n개의 데이터를 요청하기 위한 request와 구독을 취소하기 위한 cancel API가 있습니다.
    • Processor는 Publichser와 Subscriber 기능이 모두 있습니다.

     

     

    [ Reactive Streams 에서 위 API를 사용하는 흐름 ]

     

     

     

    1. Subscriber가 subscribe 함수를 통해 Publisher에게 구독을 요청합니다.
    2. Publisher는 onSubscribe 함수를 통해 Subscriber에게 subscription을 전달합니다.
    3. 이제 Subscription은 Subscriber와 Publisher 간 통신 매개체가 됩니다. Subscriber는 Publisher에게 직접 데이터를 요청하지 않고 Subscription의 request 함수를 통해 Publisher에게 전달합니다.
    4. Publisher는 Subscription을 통해 Subscriber의 onNext에 데이터를 전달하고, 작업이 완료되면 onComplete, 에러가 발생하면 onError 시그널을 전달합니다.
    5. Subscriber와 Publisher, Subscription이 서로 유기적으로 연결되어 통신을 주고받으면서 subscribe부터 onComplete까지 연결되고, 이를 통해 백 프레셔가 완성됩니다.

     

     

    • 아래는 예제 코드입니다.
    public class ReactiveStreamTest {
    
        public static class PublisherImpl implements Publisher<Integer> {
            @Override
            public void subscribe(Subscriber<? super Integer> subscriber) {
                LinkedList<Integer> queue = new LinkedList<>();
                IntStream.range(0, 10).forEach(queue::add);
    
                subscriber.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        System.out.println("request: " + n);
    
                        for (int i = 0; i < n; i++) {
                            if (queue.isEmpty()) {
                                subscriber.onComplete();
                                return;
                            }
                            subscriber.onNext(queue.poll());
                        }
                    }
    
                    @Override
                    public void cancel() {
                        System.out.println("publish cancel");
                    }
                });
            }
        }
    
        public static class SubscriberImpl implements Subscriber<Integer> {
    
            private Subscription subscription;
            private long requestSize = 2;
            private List<Integer> buffer = new ArrayList<>();
    
            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                subscription.request(requestSize);
            }
    
            @Override
            public void onNext(Integer integer) {
                System.out.println("    onNext - " + integer);
                buffer.add(integer);
                if (buffer.size() == requestSize) {
                    buffer.clear();
                    subscription.request(requestSize);
                }
            }
    
            @Override
            public void onError(Throwable throwable) {
                System.out.println("error:" + throwable.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println("subscribe complete");
            }
        }
    
        public static void main(String[] args) {
            PublisherImpl publisher = new PublisherImpl();
            publisher.subscribe(new SubscriberImpl());
        }
    }
    
    // 결과
    request: 2
        onNext - 0
        onNext - 1
    request: 2
        onNext - 2
        onNext - 3
    request: 2
        onNext - 4
        onNext - 5
    request: 2
        onNext - 6
        onNext - 7
    request: 2
        onNext - 8
        onNext - 9
    request: 2
    subscribe complete

     

     

    Armeria로 Reactive Streams와 놀자! - 1

    Reactive Streams란? LINE+에서 오픈소스 Armeria와 Central Dogma를 개발하고 있는 엄익훈입니다. 저는 Reactive Streams의 개념과, Reactive Streams를 오픈 소스 비동기 HTTP/2, RPC,...

    engineering.linecorp.com

     

     

    Reactive Streams 이해하고 구현해보기

    Reactive Streams 란? reactive-streams.org 에서는 다음과 같이 정의하고 있다. Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. 대충 해석하면 "논블로킹(Non-blo

    hyunsoori.tistory.com

     

     

    지금까지가 Reactive Programming 의 개념과 Reactive Streams의 개요에 관한 소개였습니다.

     

    다음 글에서 나머지 부분인 Project Reactor와 WebFlux 관련 내용 이어가도록 하겠습니다.

    'Backend > spring' 카테고리의 다른 글

    Spring Boot 자동 구성  (0) 2023.07.06
    Spring WebFlux (2)  (0) 2023.04.17

    댓글

Designed by Tistory.