본문 바로가기
개발관련

Reactive Streams 이해하고 구현해보기

by 부발자 2020. 11. 25.

Reactive Streams 란?


reactive-streams.org 에서는 다음과 같이 정의하고 있다.

 

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.

대충 해석하면 "논블로킹(Non-blocking) 백프레셔(back pressure)를 이용한 비동기 데이터 처리의 표준이다"

 

Reactive Streams API


Reactive Streams 는 뭔가 복잡할것 같지만, 실제로 아주 간단한 API 들의 조합으로 구성되어 있다.

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

 

reactive streams 동작방식


 

reactive stream 동작방식

 

중요하게 봐야하는 점은 Publisher 가 Subscriber 에서 데이터를 Push 하는 방식이 아니라,

Subscriber 가 Publisher 에게 데이터를 요청하는 Pull 방식이라는 점이다. 이런 방식을 백프레셔라고 한다.

 

백프레셔란?


Publisher 에서 발행하고, Subscriber에서 구독할 때, Publisher 에서 데이터를 Subscriber 로 Push 하는 방식이 아니라, Pull 방식으로 Subscriber 가 Publisher 로 처리할 수 있는 양의 크기만큼 데이터를 요청 함으로써 Subscriber의 장애를 방지하기 위함이다.
즉, 다이나믹 풀 방식의 데이터 요청을 통해서 Subscriber가 수용할 수 있는 만큼 데이터를 요청하는 방식이다.

 

 

reactive streams 을 이해할 겸 구현해보자


Reactive Streams 을 직접 구현해보면 좀더 이해하는데 도움이 될것이다.

public class ReactiveStreamTest {

  public static class PublisherImpl implements Publisher<Integer> {
    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {

      Queue<Integer> queue = new LinkedList<>();
      IntStream.range(0, 10).forEach(queue::add);

      subscriber.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
          System.out.println("request:" + n);

          for (int i=0; i<=n; i++){
            if(queue.isEmpty()) {
              subscriber.onComplete();
              return;
            }
            subscriber.onNext(queue.poll());
          }
        }

        @Override
        public void cancel() {
          System.out.println("publish cancel");
        }
      });
    }
  }

  public static class SubscriberImpl implements Subscriber<Integer> {

    private Subscription subscription;
    private long requestSize = 2;
    private List<Integer> buffer = new ArrayList<>();

    @Override
    public void onSubscribe(Subscription s) {
      subscription = s;
      subscription.request(requestSize);
    }

    @Override
    public void onNext(Integer integer) {
      System.out.println("  onNext - " + integer);
      buffer.add(integer);
      if(buffer.size() == requestSize) {
        buffer.clear(); //flush
        subscription.request(requestSize);
      }
    }

    @Override
    public void onError(Throwable t) {
      System.out.println("error:" + t.getMessage());
    }

    @Override
    public void onComplete() {
      System.out.println("subscribe complete");
    }
  }

  public static void main(String[] args) {

    Publisher<Integer> publisher = new PublisherImpl();
    publisher.subscribe(new SubscriberImpl());

  }
}
 

실행 결과

request:2
  onNext - 0
  onNext - 1
request:2
  onNext - 2
  onNext - 3
request:2
  onNext - 4
  onNext - 5
request:2
  onNext - 6
  onNext - 7
request:2
  onNext - 8
  onNext - 9
request:2
subscribe complete
반응형