코틀린/코루틴

Flow 2주차 3차 스터디 Processing the latest value까지

강한 맷돼지 2023. 11. 23. 17:42

Flow context

Flow 생성자가 어떻게 생겨먹었든 간에 이것이 수집되는 블록의 코루틴 Context를 따른다(변경 방법이야 있겠지만)

그래서 이러한 특성을 context preservation(컨텍스트 보존) 이라고 부른다고 한다.

 

이에 대한 예시를 들어주는데

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  
​
fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}            
​
//결과
//[main @coroutine#1] Started simple flow
//[main @coroutine#1] Collected 1
//[main @coroutine#1] Collected 2
//[main @coroutine#1] Collected 3

예시와 같이 collect가 main쓰레드 에서 일어나니 생성자의 코드도 main thread에서 동작한다. -> 사실상 코루틴의 개념으로 가져가면 될것같다. 공식문서의 설명이 더 헷갈림...

 

Since simple().collect is called from the main thread, the body of simple's flow is also called in the main thread. This is the perfect default for fast-running or asynchronous code that does not care about the execution context and does not block the caller.

 

이러한 문구를 통해 설명하는데 이를 멧돼지식으로 해석하자면(정확하지는 않음)

 

같은 코루틴 context내에서 생성자,소비자 모두 돌아감으로 backPressure 같이 막 소비자가 소비하기전 생성을 먼저 쭉 해내고 이럴 일이 없다.(일을 순차적으로 진행할테니), 소비자 또한 blocking 할 필요가 없다 생산되기 전까지 생산자 코드에서 suspending될테니 -> 이를 통해서 다양한 이점을 얻을 수 있다.

A common pitfall when using withContext(flow에서는 withContext 사용을 주의하라)

우리는 용도에 따라 Coroutine Context를 지정해서 사용한다.

예를들어 무거운 작업이라면 Dispatchers.Default 화면 관련 작업이라면 Dispatchers.Main 이렇게 용도에 맞게 context를 바꿔쓰는경우 coroutine 에서는 일반적으로 withContext를 통해서 처리한다.

하지만 flow의 경우 context preservation 특성 때문에 수집을 호출하는 코루틴 context가 아닌 다른 context에서 emit하는 것을 금지함으로 withContext를 사용해 context를 변경하여 실행하면 오류가 난다.

fun simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}
​
fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}      
​
//결과
//Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
//Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@41791925, //BlockingEventLoop@3828baef],
//but emission happened in [CoroutineId(1), .....

 

flowOn operator

그래서 context를 변경하고싶은 요구사항이 온다면 flowOn이라는 중간연산자를 사용해야한다.

flowOn 중간연산자는 flowOn을 기준으로 upStream 을 해당 컨텍스트로 변경해준다.

사용예시는 다음과 같다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
​
fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}     
​
//결과
//[DefaultDispatcher-worker-2 @coroutine#2] Emitting 1
//[main @coroutine#1] Collected 1
//[DefaultDispatcher-worker-2 @coroutine#2] Emitting 2
//[main @coroutine#1] Collected 2
//[DefaultDispatcher-worker-2 @coroutine#2] Emitting 3
//[main @coroutine#1] Collected 3

 

이렇게 flowOn을 사용하면 수집코드의 코루틴 Context(메인 쓰레드) 발행코드의 코루틴 Context(백그라운드 쓰레드)와 다른 코루틴을 사용하게된다. 이로 인해 서로다른 쓰레드에서 작업하게 된다. 일반적으로 sequential 하게 돌아가는 특성을 변경시킨다.

생산시 소비한다 -> 생산 쭈루룩 한번에 하고 소비도 한번에 할 수도 있다.

그래서 원래는 순차적으로 생산과 소비를 했지만 flowOn을 이용할시 생산과 소비가 동시성을 띈다.

 

 

그래서 예시를 다음과 같이 바꾸면 생산부터 완료하고 소비하는 모습을 볼 수 있다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
​
fun main() = runBlocking<Unit> {
    simple().collect { value ->
        delay(300)
        log("Collected $value") 
    } 
}            
​
//결과
//[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
//[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
//[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
//[main @coroutine#1] Collected 1
//[main @coroutine#1] Collected 2
//[main @coroutine#1] Collected 3
​

 

 

여기서 flowOn을 제거하면

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        log("Emitting $i")
        emit(i) // emit next value
    }
}
​
fun main() = runBlocking<Unit> {
    simple().collect { value ->
        delay(300)
        log("Collected $value") 
    } 
}            
​
//결과
//[main @coroutine#1] Emitting 1
//[main @coroutine#1] Collected 1
//[main @coroutine#1] Emitting 2
//[main @coroutine#1] Collected 2
//[main @coroutine#1] Emitting 3
//[main @coroutine#1] Collected 3

 

Buffering

buffer라는 연산자를 이용하여 생산자와 소비자가 동시에 움직이도록 만들수 있다.

이를 통하여 데이터를 수집 혹은 생산하는 상황에 긴 비동기 연산이 있을때 전체 실행시간을 줄이는 효과를 낼 수 있다.

flow인스턴스
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 

실제적으로 공식문서의 예시를 보면 사용시간이 줄어드는 예시를 볼 수 있다.

 

사실 flowOn에서도 컨텍스트가 달라서 생성자가 우다다다 결과물을 뽑아낼 때 버퍼링되는것을 볼 수 있는데 공식문서에 이에 대한 설명이 나와있다.

flowOn을 통해서 다른 코루틴에서 쭉 결과물을 뽑아낼 때 현재와 같은 버퍼링 메커니즘을 사용한다고 한다.

하지만 buffer 연산자의 경우 실행 컨텍스트를 변경하지 않고도 명시적으로 버퍼링을 요청할수 있는 기능 이라고 보면된다.

 

멧돼지의 추가학습

현재 공식문서에는 나와있지 않지만 buffer 연산자를 까보면 많은 정보를 볼 수 있다.

@Suppress("NAME_SHADOWING")
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
    require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
        "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
    }
    require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
        "CONFLATED capacity cannot be used with non-default onBufferOverflow"
    }
    // desugar CONFLATED capacity to (0, DROP_OLDEST)
    var capacity = capacity
    var onBufferOverflow = onBufferOverflow
    if (capacity == CONFLATED) {
        capacity = 0
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    }
    // create a flow
    return when (this) {
        is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
        else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
    }
}

일단 함수의 매개변수를 보면 capacity와 onBufferOverflow 를 받는다.

이를 살펴보면

 

Capacity : 버퍼를 어떤 청책을 유지할것인지, 혹은 버퍼의 사이즈를 얼마나 가져갈 것인지를 결정하는 것이다.

-> 버퍼 사이즈 혹은 미리 정의된 상수를 받는다. (상수 음수 값으로 정의되어있음)

onBufferOverflow: 버퍼가 Overflow 났을때 어떻게 할지 정책이다.

 

capacity의 기본값은 BUFFERED로 밑에 나와있는 코드의 주석에 따르면 기본사이즈가 64인 버퍼를 생성해서 관리한다.

-> 코드 1 참고

onBufferOverflow의 기본값은 BufferOverflow.SUSPEND로 오버플로우가 났을때 기본적으로 suspend정책을 사용한다(대기한다.)

-> 코드 2 참고

 

require문을 통해 조건을 검사하는 식을 살펴보자

첫번째 require 문을 통해 capacity를 확인 버퍼 정책 상수인지 아니라면 양수로 버퍼를 설정하고 있는지 확인하고

두번째 require 문에서 CONFLATED 일때 suspend 정책이여야함을 확인한다.

 

이렇게 조건을 확인하고 난 후 각각의 조건들을 적용하여 CONFALTED라면 capacity = 0, 오버플로우 정책은 DROP_OLDEST로 고정하고 CONFLATED가 아니라면 인자로 들어온 조건을 적용하여 새로운 플로우를 반환한다.

 

 

코드 1

/**
 * Requests a conflated channel in the `Channel(...)` factory function. This is a shortcut to creating
 * a channel with [`onBufferOverflow = DROP_OLDEST`][BufferOverflow.DROP_OLDEST].
 */
public const val CONFLATED: Int = -1
​
/**
 * Requests a buffered channel with the default buffer capacity in the `Channel(...)` factory function.
 * The default capacity for a channel that [suspends][BufferOverflow.SUSPEND] on overflow
 * is 64 and can be overridden by setting [DEFAULT_BUFFER_PROPERTY_NAME] on JVM.
 * For non-suspending channels, a buffer of capacity 1 is used.
 */
public const val BUFFERED: Int = -2

 

 

코드 2

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,

    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,

    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}

 

Conflation

conflation은 애초에 버퍼없이 가장 최근 값 하나만 가지고있는 상태로 처리할 수 있도록 구현한것이다.

즉 최신값이 들어오면 기존값들은 다 날아가고 최신값만 남아있는다.

소비자가 소비하는데 시간이 오래걸릴 경우 사용할 수 있을것이다.

val time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

이런식으로 중간연산자를 통해서 사용할 수 있으며

내부구현은 위에 buffering 설명할때 CONFLATED 상수를 이용해서 버퍼를 만드는 방법을 통해 구현되어있다.

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

 

Processing the latest value

collectLatest를 통해서 값을 받는다면 conflate와 비슷하지만 결과값이 살짝 다른데

conflate의 경우 값을 그냥 버려 버리는 것이지만 collectLatest의 경우 값은 정상적으로 들어와 collect의 로직을 타지만 새로운 값이 들어오는 시점에서 진행되던 collect 로직이 취소되고 새로운 값을 처리하게 된다.

백문불여일견이라고 그냥 예시를 보면 한방에 이해된다.

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            println("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            println("Done $value") 
        } 
}   
println("Collected in $time ms")

//결과
//Collecting 1
//Collecting 2
//Collecting 3
//Done 3
//Collected in 741 ms