본문 바로가기

코틀린/코루틴

channels 7주차 1차 스터디 Fan-in 까지

Fan-out

구독자가 여러 어럿 있는 상황을 살펴보고있다. -> 값을 소비하는 코루틴이 여러개임

 

생산자 코드가 이러하고

-> 1초에 값 10개 생산

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

 

 

소비하는 코드가 이러하다.

-> 누가 소비하는지 숫자로 나타내줌

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}

 

 

이런상황에 코드를 이렇게 작성하면

val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
​
//결과
Processor #0 received 1
Processor #0 received 2
Processor #1 received 3
Processor #2 received 4
Processor #3 received 5
Processor #4 received 6
Processor #0 received 7
Processor #1 received 8
Processor #2 received 9
Processor #3 received 10

1~10까지 소비하는건 항상 갖지만 그걸 누가 소비하는가는 랜덤하게 컴퓨터 상황에 맞춰서 경쟁하며 소비한다.

한마디로 구독을 여러코루틴으로 나눠놓으면 지들끼리 값나오는대로 경쟁중인것이다.

 

그리고 channel에 cancel을 때리면 그것을 for문으로 소비하고 있던 것들이 다 일괄종료된다.

 

 

또한 for문을 통해 소비하는가와 consumeEcah 를 통해서 소비하는가에 따라 다른점이 있는데

예시와 같이 for문을 통해 값을 소비한다면 하나의 코루틴(소비자 하나)가 fail이 나도 나머지 것들이 살아서 계속 소비한다.

하지만 consumeEach 는 consume하든 cancel을 하든 둘중하나가 결론이 나기때문에 만약 여기 안에서 fail이 나면 채널 자체를 캔슬때려버린다.

 

 

이를 자세히 살펴보려면 consumeEach 코드를 까보면된다.

public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
    consume {
        for (e in this) action(e)
    }

consumeEach 는 다음과같이 consume이라는 함수를 for문을 돌며 실행하도록 만들어주는 함수이다.

 

그럼 consume는 뭐하는 함수일까?

public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    var cause: Throwable? = null
    try {
        return block()
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        cancelConsumed(cause)
    }
}

consume의 코드는 다음과 같다.

이를 살펴보면 cause라는 오류 내용을 담는 변수가 있는데 기본적으로 null이고 성공상황에는 block 코드를 실행하고 오류가나면 오류사항을 cause 변수에 담는 형태이다.

또한 잘 살펴보면 결론적으로 cancelConsumed에 cause를 넘겨서 실행하는것을 볼 수 있다.

 

 

그럼 또 cancslConsumed는 뭘까?

@PublishedApi
internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
    cancel(cause?.let {
        it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
    })
}

cancelConsumed를 살펴보면 cause가 null이라면 단순 cancel을 실행하고

만약 내부적인 오류가있다면 cancel에 CancellationException으로 캐스팅한 exception을 전달하여 실행한다.

 

이렇게 코드를 살펴보면 consumeEach 함수는 내부적으로 오류가 나서 종료되던 for문이 종료되어 정상종료되던

channel을 취소시키는것을 볼 수있다.

이제 왜 consumeEach를 사용하였을시 fail 상황에 channel이 cancel이 나는지 알수있을것이다.

Fan-in

이번에는 여러코루틴에서 하나의 channel에 값을 동시에 밀어넣는 상황을 살펴본다.

 

예시를 봐보자

예시는 값을 밀어넣는 생산자 코드가 생산자별 string을 조금씩 delay를 주면서 밀어넣는 형태이다.

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

 

이 생산자 함수를 여러 코루틴을 통해서 하나의 채널에 밀어넣는것을 봐보자

val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
    println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
​
//결과
foo
foo
BAR!
foo
foo
BAR!

 

그냥 우리가 생각하는대로 잘 순서에맞춰서 들어간다 즉 생산자가 여럿이여도 channel은 잘 동작할 수있다.