모던 자바 인 액션을 보고 리액티브 스트림스의 백프레셔는 무엇이고 왜 좋은지 라는 의문이 생겨 정리하였습니다.
관찰자(Observer) 패턴이란?
- 리액티브 프로그래밍의 기초
- 관찰자 패턴에는 이벤트를 발생시키는 역할(주체)와 이벤트를 수신하는 역할(관찰자)가 있다.
- 주체(subject)는 관찰자(observer)에게 상태변경을 알려준다.
관찰자 패턴의 문제점과 결합
- 옵저버 패턴에서는 발행자(publisher)가 구독자(subscriber)에게 밀어 넣는 방식으로 데이터가 전달
- 발행자는 구독자의 상태를 고려하지 않고 데이터를 전달하는 데에만 충실
Push 방식
- Observer가 이벤트를 처리하는 속도보다, Subject 가 이벤트를 발행하는 속도가 빠르다면?
- (빠른 프로듀서와 느린 컨슈머 이슈)
- Subject 는 이벤트를 발행하고 전달하는 데만 집중할 뿐이다.
- Observer 는 유실없이 처리하기 위해 Queue를 따로 두거나(이래도 결국 쌓여서 터짐), 별도 조치가 필요할 것이다.
- 발행자가 1초 동안 100개의 메시지를 보내는데 구독자는 1초에 10개밖에 처리하지 못할 경우
- 큐(queue)를 이용해서 대기 중인 이벤트를 저장해야 함
고정 길이 버퍼 - Push 방식
- 신규로 수신된 메시지를 거절
- 재요청 과정에서 네트워크와 CPU 연산 비용이 추가로 발생
가변 길이 버퍼 - Push 방식
- 이벤트를 저장할 때 'out of memory' 에러가 발생하면서 서버 크래시(crash)가 발생
- 왜? 다량의 GC가 발생하면서 서버가 정상적으로 응답할 수 없는 상태
- 예를 들어 SQL로 많은 양의 데이터를 질의하면 DBMS는 발행자가 되고 여러분의 서버가 구독자가 되어 List 자료 구조형에 데이터를 전부 담으려고 하다가 다량의 GC가 발생하면서 서버가 정상적으로 응답할 수 없는 상태에 이를 수 있습니다.
리액티브 스트림
리액티브 스트림은 관찰자(Observer) 패턴 등 여러 조합으로 정의되는데,
핵심은 아래와 같다
- Publisher = Subject = Observable = Producer
- Subscriber = Observer = Consumer
- Publisher는 Subscriber에게 Event를 Push한다.
Back Pressure (배압)
- 리액티브 스트림은 비동기적으로 처리되므로 역압력 기법이 기본적으로 탑재되어 있음
- 발행자가 구독자보다 빠른 아이템을 발행하므로 발생하는 문제 해결
- 즉, 빠른 Publisher - 느린 Subscriber 문제를 해결하는 원리
- Publisher의 일방적 데이터 Push 가 아니라, Subscriber가 처리할 수 있을 만큼의 데이터만 Subscriber의 요청에 의해서 전달해주는 것
- 중간에 Queue 같은게 필요없어짐
- 이것을 dynamic pull 이라고 부름
구독자가 이미 8개의 일을 처리하고 있다면 추가로 2개만 더 요청하여 자신이 현재 처리 가능한 범위 내에서만 메시지를 받게 할 수 있음
- 풀 방식에선 이렇게 전달되는 모든 데이터의 크기를 구독자가 결정하는데, 이런 다이나믹 풀 방식의 데이터 요청을 통해서 구독자가 수용할 수 있는 만큼만 데이터를 요청하는 방식이 백 프레셔라고 함
- request 메소드를 통해 요청량을 조절
request 를 LONG.MAX 개씩 요청하면 순수 push 모델이 되는 것이고, request 를 onNext 당 1개씩 요청하면 pull 모델이 된다
👀 코드로 적용하기
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
- Publisher(생산자)가 Subscriber(소비자)를 subscribe(등록)한다.
- 동시에 Subscriber(소비자)가 Subscription(전달자)을 onSubscribe(등록)한다
- Subscriber(소비자)는 필요할 때 Subscribe(전달자).request(요청)을 통해 Publisher에게 데이터를 요청한다.
- Publisher(생산자)는 요청을 받으면 생성한 데이터를 보낸다.
- Subscriber는 onNext로 데이터를 받는다.
- 모든 요청이 성공적으로 완료되면 onComplete을 호출하고 흐름을 종료한다.
- 요청이 실패하면 onError를 호출하고 흐름을 종료한다.
예제
-
- SimplePublisher.java
import java.util.concurrent.Flow.*;
class SimplePublisher<T> implements Publisher<T> {
private Subscriber<? super T> subscriber;
@Override
public void subscribe(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
subscriber.onSubscribe(new SimpleSubscription());
}
public void publish(T item) {
subscriber.onNext(item);
}
public void complete() {
subscriber.onComplete();
}
class SimpleSubscription implements Subscription {
private boolean cancelled = false;
@Override
public void request(long n) {
// 역압 처리를 위한 커스텀 로직을 구현
// 및 요청된 수요에 따라 subscriber에게 item 전달
}
@Override
public void cancel() {
cancelled = true;
}
}
}
-
- SimpleSubscriber.java
class SimpleSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
// 초기에 한 개의 아이템만 요청하고,
// 추가적인 아이템은 onNext 메서드를 통해 수신한 후에 요청할 수 있도록 하는 것
}
@Override
public void onNext(T item) {
System.out.println("Received item: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("An error occurred: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Subscription completed");
}
}
-
- SimpleSubscriber.java
public class Main {
public static void main(String[] args) {
SimplePublisher<Integer> publisher = new SimplePublisher<>();
SimpleSubscriber<Integer> subscriber = new SimpleSubscriber<>();
publisher.subscribe(subscriber);
publisher.publish(1);
publisher.publish(2);
publisher.publish(3);
publisher.complete();
}
}
'JAVA 🎻' 카테고리의 다른 글
내가 보려고 정리한 Map의 메서드 (0) | 2023.08.18 |
---|---|
Function Interface과 Lambda Expression (0) | 2023.08.17 |
[JAVA] JDBC란? (0) | 2023.06.04 |
[JAVA] 스트림 활용하기 (0) | 2023.03.04 |
[JAVA] Jsoup을 이용한 크롤링 (0) | 2023.02.21 |