JAVA 🎻

[JAVA] Reactive Streams, Back Pressure란?

minl741 2023. 8. 5. 22:26
모던 자바 인 액션을 보고 리액티브 스트림스의 백프레셔는 무엇이고 왜 좋은지 라는 의문이 생겨 정리하였습니다. 

참고한 글
Armeria로 Reactive Streams와 놀자! - 1

[리액티브 프로그래밍] Publisher, Subscriber 그리고 Subscription - 2

관찰자(Observer) 패턴이란?

  • 리액티브 프로그래밍의 기초
  • 관찰자 패턴에는 이벤트를 발생시키는 역할(주체)와 이벤트를 수신하는 역할(관찰자)가 있다.
    • 주체(subject)는 관찰자(observer)에게 상태변경을 알려준다.

관찰자 패턴의 문제점과 결합

  • 옵저버 패턴에서는 발행자(publisher)가 구독자(subscriber)에게 밀어 넣는 방식으로 데이터가 전달
    • 발행자는 구독자의 상태를 고려하지 않고 데이터를 전달하는 데에만 충실

Push 방식

  • Observer가 이벤트를 처리하는 속도보다, Subject 가 이벤트를 발행하는 속도가 빠르다면?
    • (빠른 프로듀서와 느린 컨슈머 이슈)
    • Subject 는 이벤트를 발행하고 전달하는 데만 집중할 뿐이다.
    • Observer 는 유실없이 처리하기 위해 Queue를 따로 두거나(이래도 결국 쌓여서 터짐), 별도 조치가 필요할 것이다.
  • 발행자가 1초 동안 100개의 메시지를 보내는데 구독자는 1초에 10개밖에 처리하지 못할 경우
    • 큐(queue)를 이용해서 대기 중인 이벤트를 저장해야 함

고정 길이 버퍼 - Push 방식

  • 신규로 수신된 메시지를 거절
  • 재요청 과정에서 네트워크와 CPU 연산 비용이 추가로 발생

가변 길이 버퍼 - Push 방식

  • 이벤트를 저장할 때 'out of memory' 에러가 발생하면서 서버 크래시(crash)가 발생
    • 왜? 다량의 GC가 발생하면서 서버가 정상적으로 응답할 수 없는 상태
    • 예를 들어 SQL로 많은 양의 데이터를 질의하면 DBMS는 발행자가 되고 여러분의 서버가 구독자가 되어 List 자료 구조형에 데이터를 전부 담으려고 하다가 다량의 GC가 발생하면서 서버가 정상적으로 응답할 수 없는 상태에 이를 수 있습니다.

리액티브 스트림

리액티브 스트림은 관찰자(Observer) 패턴 등 여러 조합으로 정의되는데, 

핵심은 아래와 같다

  • Publisher = Subject = Observable = Producer
  • Subscriber = Observer = Consumer
  • Publisher는 Subscriber에게 Event를 Push한다.

Back Pressure (배압)

  • 리액티브 스트림은 비동기적으로 처리되므로 역압력 기법이 기본적으로 탑재되어 있음
    • 발행자가 구독자보다 빠른 아이템을 발행하므로 발생하는 문제 해결
    • 즉, 빠른 Publisher - 느린 Subscriber 문제를 해결하는 원리
  • Publisher의 일방적 데이터 Push 가 아니라, Subscriber가 처리할 수 있을 만큼의 데이터만 Subscriber의 요청에 의해서 전달해주는 것 
    • 중간에 Queue 같은게 필요없어짐
    • 이것을 dynamic pull 이라고 부름

구독자가 이미 8개의 일을 처리하고 있다면 추가로 2개만 더 요청하여 자신이 현재 처리 가능한 범위 내에서만 메시지를 받게 할 수 있음

  • 풀 방식에선 이렇게 전달되는 모든 데이터의 크기를 구독자가 결정하는데, 이런 다이나믹 풀 방식의 데이터 요청을 통해서 구독자가 수용할 수 있는 만큼만 데이터를 요청하는 방식이 백 프레셔라고 함
    • request 메소드를 통해 요청량을 조절

request 를 LONG.MAX 개씩 요청하면 순수 push 모델이 되는 것이고, request 를 onNext 당 1개씩 요청하면 pull 모델이 된다

👀 코드로 적용하기

public interface Publisher<T> {
   public void subscribe(Subscriber<? super T> s);
}
 
public interface Subscriber<T> {
   public void onSubscribe(Subscription s);
   public void onNext(T t);
   public void onError(Throwable t);
   public void onComplete();
}
 
public interface Subscription {
   public void request(long n);
   public void cancel();
}
  1. Publisher(생산자)가 Subscriber(소비자)를 subscribe(등록)한다.
  2. 동시에 Subscriber(소비자)가 Subscription(전달자)을 onSubscribe(등록)한다
  3. Subscriber(소비자)는 필요할 때 Subscribe(전달자).request(요청)을 통해 Publisher에게 데이터를 요청한다.
  4. Publisher(생산자)는 요청을 받으면 생성한 데이터를 보낸다.
  5. Subscriber는 onNext로 데이터를 받는다.
  6. 모든 요청이 성공적으로 완료되면 onComplete을 호출하고 흐름을 종료한다.
  7. 요청이 실패하면 onError를 호출하고 흐름을 종료한다.

예제

    • SimplePublisher.java
import java.util.concurrent.Flow.*;

class SimplePublisher<T> implements Publisher<T> {
    private Subscriber<? super T> subscriber;
    
    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        this.subscriber = subscriber;
        subscriber.onSubscribe(new SimpleSubscription());
    }
    
    public void publish(T item) {
        subscriber.onNext(item);
    }
    
    public void complete() {
        subscriber.onComplete();
    }
    
    class SimpleSubscription implements Subscription {
        private boolean cancelled = false;
        
        @Override
        public void request(long n) {
            // 역압 처리를 위한 커스텀 로직을 구현
            // 및 요청된 수요에 따라 subscriber에게 item 전달
        }
        
        @Override
        public void cancel() {
            cancelled = true;
        }
    }
}
    • SimpleSubscriber.java
class SimpleSubscriber<T> implements Subscriber<T> {
    private Subscription subscription;
    
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
			// 초기에 한 개의 아이템만 요청하고, 
			// 추가적인 아이템은 onNext 메서드를 통해 수신한 후에 요청할 수 있도록 하는 것
    }
    
    @Override
    public void onNext(T item) {
        System.out.println("Received item: " + item);
        subscription.request(1);
    }
    
    @Override
    public void onError(Throwable throwable) {
        System.err.println("An error occurred: " + throwable.getMessage());
    }
    
    @Override
    public void onComplete() {
        System.out.println("Subscription completed");
    }
}
    • SimpleSubscriber.java
public class Main {
    public static void main(String[] args) {
        SimplePublisher<Integer> publisher = new SimplePublisher<>();
        SimpleSubscriber<Integer> subscriber = new SimpleSubscriber<>();
        
        publisher.subscribe(subscriber);
        
        publisher.publish(1);
        publisher.publish(2);
        publisher.publish(3);
        
        publisher.complete();
    }
}