본문 바로가기

코틀린/코루틴

flow 5주차 1차 스터디 flow끝까지

Flow completion

flow 종료시점에 작업을 해야할때 취할수있는 방법은 두가지이다. 선언적 방법, 명령형 방법

그냥 딱봐도 선언적 방법 쓸것같다...

Imperative finally block

trt/catch의 finally 블록을 이용하면 collect가 종료되는 시점의 작업을 해줄 수 있다.

fun simple(): Flow<Int> = (1..3).asFlow()
​
fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}       
​
//결과
1
2
3
Done

가능이야 하겠지만 뎁스 깊어지는거 너무 열받지 않는가?

우테코 하고나서 성격이 나빠졌는지 이런것들을 보면 화가난다. -> 예쁘고 좋고 알잘딱한 선언형쓰자 ^^

Declarative handling

onCompletion이라는 중간연산자를 이용하여 위의 구문을 대체 할 수 있다.

근데 순서가 중간에 들어간게 좀 킹받는다.(뭔가 좀 헷갈리는?)

simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }

또한 onCompletion의 장점으로는 exception으로 종료되었는지 정상 종료되었는지 알 수 있는 nullable한 매개변수가 넘어오는데

-> null이라면 정상종료 아니라면 exception에 의한 종료이다. (어떤 에러인지도 넘어온다.)

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}
​
fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally $cause") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}            
​
//결과
1
Flow completed exceptionally java.lang.RuntimeException
Caught exception

이런식으로 사용할 수 있는데

그렇다고 onCompetion이 exception을 캐치하는것은 아니니 이점을 유의하자 -> exception은 catch 로 잡지 않으면 계속 타고 흐른다.

Successful completion

위의 설명과 중복되는 이야기라 생략하려한다.

Imperative versus declarative

명령형 vs 선언형

라이브러리 관점에서 선언형이든 명령형이든 다 취향이니까 알잘딱하게 본인 코딩스타일에 맞춰 쓰라고한다.

그렇다면 난 선언형 ㅎㅎㅎ

Launching flow

flow를 실제적으로 사용할 만한 예시로 비동기 처리의 예시인 addEventListener 같은 상황을 들었다.

이런것들을 사용할때는 onEach를 사용하면 좋은데 onEach는 중간 연산자이기 때문에 terminal연산자가 없다면 아무 의미없다.

그래서 빈 collect를 뒤에 붙여사용하는데

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
​
fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}    
​
//결과
Event: 1
Event: 2
Event: 3
Done

이를 LaunchIn을 통해 쉽게 사용할 수 있다.

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}            
​
//결과
Done
Event: 1
Event: 2
Event: 3

이 둘의 다른점을 살펴보면 collect를 사용할 경우 flow가 종료될 시점까지 collect 밑의 코드로 벗어나지 못한다(done가 가장 마지막에 찍힌다.)

하지만 LaunchIn연산자를 이용하면 collect할 스코프를 새로 열고 각각 코루틴이 만들어져 작업하는것이기에 코루틴이 일반적으로 돌아가는 것을 생각 해보면 done가 왜 미리 찍히는지 알 수 있다.

LaunchIn의 내부 코드를 보면 실제로 들어오는 스코프를 통해 새로운 코루틴을 열어서 collect하는 것을 볼 수있다.

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

또한 이런 작업을 취소하고 싶을때 cancel과 structured concurrency을 이용해서 취소할 수 있다.

위의 LaunchIn을 보면 job을 내뱉는다. 이 job을 이용하여 전체스코프가 아닌 해당 플로우 수집 코루틴(lacnchIn을 통해 열은 코루틴)만 취소할 수 있거나 join할 수 있다.

그래서 removeEventListener 같은 취소상황이 왔다고 가정한다면 코루틴 자체를 취소시켜 버리면 된다.

Flow cancellation checks

우리가 많이 사용하는 flow{} 이 빌더를 이용한다면 중간에 cancel 을 확인하는 ensureActive가 붙어있어 중간중간 cancel 을 알아서 확인한다.

그래서 loop가 계속돌아도 emit하는 과정에서 cancel이 확인되고 취소된다.

fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}
​
fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
​
//결과
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c

하지만 다른 빌더들은 퍼포먼스상의 이유로 이런 cancel 체크가 없다고 한다.

그래서 cancel을 내더라도 suspend같은게 없고 while문을 겁나 돌고있으면 cancel을 알길이 없다 -> 일이 다 종료된후 cancel이 난다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
​
//결과
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
 at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1558) 
 at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:287) 
 at kotlinx.coroutines.CoroutineScopeKt.cancel$default 

그래서 cancel을 중간에 체크해줘야하는데 그것을 onEach를 통해서 해줄수도 있겠지만

.onEach { currentCoroutineContext().ensureActive() }

하지만 애초에 cancellable이라는 중간연산자를 cancel을 확인하는 용도로 제공한다고 한다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
​
//결과
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365

이런식으로 중간에 cancellable 을 넣어주면 취소를 확인하게된다.

Flow and Reactive Streams

이 부분은 Rx를 사용하거나 레거시에 Rx가 들어있는 사람이 접해야하는 부분이겠지만

코루틴은 단순하고, structured concurrency 를 보장하도록 설계된 코틀린전용 Reactive Streams 이나 다름없다고한다.

애초에 여기에 구구절절히 자기네들은 Rx에 영감을 받았고 어쩌구저쩌구

그래서 어쩃든 Rx랑 상호변환이 된다고 한다. 그래서 만약 Rx를 다룰일이있다면 변환을 고려해봐도 좋을것 같다.