Stream과 이벤트 처리

Dart의 비동기 프로그래밍에서 Stream은 데이터의 연속된 흐름을 처리하는 데 중요한 역할을 한다. 비동기적으로 발생하는 데이터 이벤트를 처리하기 위해서는 Stream을 적절히 다루어야 하며, Dart는 이를 효과적으로 관리할 수 있는 다양한 기능을 제공한다. 이 섹션에서는 Stream의 기본 개념부터 이벤트 처리 방법, 그리고 더 나아가서 Stream을 활용한 복잡한 비동기 데이터 흐름 처리까지 설명한다.

Stream의 기본 개념

Stream은 Dart에서 일련의 비동기 이벤트를 전달하는 객체이다. 여기서 이벤트는 데이터를 나타낼 수 있으며, 오류나 작업 완료와 같은 상황도 포함될 수 있다. Stream은 여러 개의 이벤트를 순차적으로 전달하며, 각 이벤트는 비동기적으로 처리된다.

Dart의 Stream은 두 가지 유형으로 나눌 수 있다.

  1. Single subscription Stream: 한 번에 하나의 리스너만 등록할 수 있는 Stream이다. 주로 UI 이벤트나 사용자 상호작용과 같은 상황에서 사용된다.

  2. Broadcast Stream: 여러 리스너가 동시에 동일한 Stream을 청취할 수 있는 Stream이다. 서버의 메시지를 여러 클라이언트에게 동시에 전달해야 하는 상황에서 주로 사용된다.

Stream의 생성과 사용

Dart에서 Stream을 생성하는 방법은 다양한다. 가장 기본적인 방법은 StreamController를 사용하는 것이다. StreamControllerStream과 이를 제어할 수 있는 기능을 제공한다.

StreamController<int> controller = StreamController<int>();
Stream<int> stream = controller.stream;

위 코드에서 controller.stream을 통해 Stream 객체가 생성되었다. 이후 controller를 통해 이벤트를 추가하거나 오류를 전달할 수 있다.

Stream의 주요 메소드

  • listen(): Stream을 구독하고 이벤트를 처리한다.

    stream.listen((event) {
      print('New event: $event');
    });
  • **pause()**와 resume(): Stream을 일시정지하거나 다시 재개한다.

    stream.listen((event) {
      print('New event: $event');
    }).pause(); // 스트림을 일시정지
  • cancel(): Stream의 구독을 취소한다.

Stream과 이벤트 처리의 흐름

Stream은 이벤트 기반의 데이터를 다룰 때 매우 유용하다. 예를 들어, 사용자가 특정 동작을 수행할 때마다 이벤트가 발생하는 경우, 이러한 이벤트를 Stream을 통해 처리할 수 있다. 이벤트는 보통 다음과 같이 세 가지 유형으로 구분된다.

  • 데이터 이벤트: 새로운 데이터가 들어왔을 때 발생한다.

  • 오류 이벤트: 비동기 작업 중 오류가 발생했을 때 발생한다.

  • 완료 이벤트: 스트림이 더 이상 이벤트를 전송하지 않을 때 발생한다.

각 이벤트는 Stream 구독자의 콜백 함수에서 처리된다. Stream의 구독자는 보통 다음과 같은 구조를 갖는다:

이처럼 Stream 구독자는 data를 처리하는 메인 콜백과 오류 및 완료 이벤트를 처리하는 추가적인 콜백을 갖는다.

Stream에서의 비동기 이벤트 흐름

Stream동기적 또는 비동기적으로 이벤트를 전달할 수 있다. Dart에서 비동기 스트림을 사용하는 경우 await for 구문을 통해 간편하게 비동기 데이터를 처리할 수 있다. 이 구문은 Stream에서 데이터를 비동기적으로 가져오고 이를 순차적으로 처리한다.

await for는 비동기 데이터를 순차적으로 처리할 때 매우 유용하며, 이벤트가 발생하는 순서대로 데이터를 처리할 수 있게 한다.

StreamController와 StreamTransformer

Stream을 생성할 때 일반적으로 사용하는 객체가 StreamController이다. StreamControllerStream을 제어할 수 있는 기능을 제공한다. 이를 통해 데이터 이벤트를 추가하거나, 오류를 발생시키거나, 스트림을 종료할 수 있다. StreamController는 단순한 데이터 이벤트 흐름뿐만 아니라 복잡한 비동기 작업을 처리하는 데 중요한 역할을 한다.

StreamController의 주요 메소드

  • add(): 새로운 데이터를 스트림에 추가한다.

  • addError(): 오류 이벤트를 스트림에 추가한다.

  • close(): 스트림을 더 이상 사용할 수 없도록 종료한다.

StreamController는 Dart에서 스트림을 다룰 때 기본적으로 많이 사용되는 컨트롤러로, 데이터를 제어하는 역할을 담당한다. 또한, StreamController를 활용하면 사용자 정의 스트림을 쉽게 생성하고 제어할 수 있다.

StreamTransformer

StreamTransformer는 입력 스트림을 출력 스트림으로 변환하는 기능을 제공한다. 이를 통해 스트림의 데이터를 원하는 형식으로 변환하거나 중간에서 데이터를 필터링하는 등의 작업을 할 수 있다.

위 예제에서는 StreamTransformer를 통해 정수형 데이터를 문자열로 변환하고 있다. 이처럼 StreamTransformer는 입력 스트림에서 데이터를 가져와서 특정 규칙에 따라 변환한 후 새로운 스트림으로 전달할 수 있다.

Stream과 이벤트 루프

Dart에서 Stream은 이벤트 루프를 기반으로 작동한다. 이벤트 루프는 Dart의 비동기 작업을 처리하는 기본 메커니즘으로, Stream의 이벤트 또한 이 이벤트 루프에 의해 관리된다. Dart의 이벤트 루프는 각 이벤트를 큐(queue)에 저장하고, 비동기적으로 이를 처리한다.

마이크로태스크 큐와 이벤트 큐

이벤트 루프에는 두 가지 큐가 존재한다:

  1. 마이크로태스크 큐(Microtask Queue): 우선적으로 처리되어야 하는 짧은 작업이 저장된다. 보통 Future.microtask()로 생성된 작업이 여기에 저장된다.

  2. 이벤트 큐(Event Queue): 비동기 작업이 이 큐에 저장되며, 일반적인 FutureStream 이벤트가 이 큐에 저장된다.

Stream의 이벤트는 보통 이벤트 큐에서 처리되며, 각 이벤트는 순차적으로 처리된다. 이벤트가 처리되는 동안 새로운 이벤트가 큐에 추가되면, 이벤트 루프는 이를 다음 차례로 처리한다.

이러한 Dart의 이벤트 기반 아키텍처 덕분에 Stream은 비동기 데이터 흐름을 효율적으로 처리할 수 있다.

Broadcast Stream

기본적으로 Stream은 한 번에 하나의 구독자만 허용하는 single subscription 방식이다. 하지만 여러 구독자가 동일한 Stream을 청취해야 하는 경우에는 Broadcast Stream을 사용해야 한다.

Broadcast Stream은 여러 구독자를 동시에 허용하며, 주로 이벤트가 여러 대상에게 동시에 전달되어야 할 때 사용된다. 예를 들어, 서버에서 클라이언트로 메시지를 전송할 때 동일한 이벤트가 여러 클라이언트에게 동시에 전달되어야 한다면 Broadcast Stream이 유용하다.

asBroadcastStream() 메소드를 사용하여 일반 스트림을 Broadcast Stream으로 변환할 수 있으며, 여러 구독자를 동시에 처리할 수 있다.

Stream의 지연 평가와 lazy loading

Stream은 Dart에서 **지연 평가(lazy evaluation)**를 사용하여 비동기 데이터를 처리할 수 있다. 지연 평가는 필요할 때만 데이터를 처리하여 자원을 절약하는 방식이다. Dart의 Stream은 데이터를 즉시 소비하지 않고, 구독자가 Stream을 구독할 때 이벤트를 처리한다.

이 방식은 특히 대용량 데이터를 다루거나 시간이 지남에 따라 점진적으로 데이터를 처리할 때 유용하다. Stream이 지연 로딩 방식으로 동작하기 때문에 구독이 이루어질 때까지는 아무런 데이터도 처리하지 않는다.

위의 예제에서는 StreamfromIterable로부터 데이터를 제공하지만, 실제로 구독자가 데이터를 처리할 때까지 지연되며, await for를 통해 구독이 시작되면 데이터가 순차적으로 처리된다.

Stream에서 데이터 필터링

Stream에서 데이터를 필터링할 수 있는 다양한 메소드가 제공된다. 이를 통해 스트림에서 특정 조건을 만족하는 데이터만 처리할 수 있다.

  • where(): 특정 조건에 맞는 데이터만 필터링한다.

  • map(): 데이터를 변환한다.

  • skip(): 처음의 n개의 데이터를 건너뛰고 그 이후의 데이터를 처리한다.

이러한 필터링 기능을 통해 Stream에서 불필요한 데이터를 제외하고 필요한 데이터만 처리할 수 있다.

Stream을 통한 데이터 병합과 분리

여러 개의 Stream을 하나로 병합하거나, 하나의 Stream을 여러 개로 분리하는 작업도 가능한다. 이 작업은 복잡한 비동기 작업을 처리할 때 유용하다.

Stream 병합

Dart에서 제공하는 Stream 관련 유틸리티 함수는 여러 개의 Stream을 하나로 병합할 수 있는 기능을 제공한다. 예를 들어, Stream 두 개를 하나로 결합하여 동시에 두 Stream에서 발생하는 데이터를 모두 처리할 수 있다.

위 코드에서 Stream.concat()을 사용하여 두 Stream의 데이터를 병합하여 하나의 Stream으로 처리하고 있다.

Stream 분리

하나의 Stream을 여러 개로 분리하는 것도 가능한다. Dart에서는 Stream을 특정 조건에 따라 여러 하위 Stream으로 나누어 처리할 수 있다.

이처럼 Stream을 원하는 조건에 따라 분리하여 각각의 데이터 흐름을 별도로 처리할 수 있다.

Stream의 Buffering

Stream의 **버퍼링(buffering)**은 일정 시간 동안 발생한 이벤트를 모아서 한꺼번에 처리하는 방식이다. 버퍼링은 네트워크 요청이나 대량의 데이터를 처리할 때 유용하다.

Dart에서는 RxDart와 같은 라이브러리를 사용하여 Stream의 버퍼링 기능을 구현할 수 있다.

위 코드는 2초 동안 발생한 모든 이벤트를 모아 한꺼번에 처리하는 예제이다. 이 방식은 네트워크나 I/O 작업에서 대량의 이벤트를 처리할 때 매우 유용하다.

Last updated