Fan-out
구독자가 여러 어럿 있는 상황을 살펴보고있다. -> 값을 소비하는 코루틴이 여러개임
생산자 코드가 이러하고
-> 1초에 값 10개 생산
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
소비하는 코드가 이러하다.
-> 누가 소비하는지 숫자로 나타내줌
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
이런상황에 코드를 이렇게 작성하면
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
//결과
Processor #0 received 1
Processor #0 received 2
Processor #1 received 3
Processor #2 received 4
Processor #3 received 5
Processor #4 received 6
Processor #0 received 7
Processor #1 received 8
Processor #2 received 9
Processor #3 received 10
1~10까지 소비하는건 항상 갖지만 그걸 누가 소비하는가는 랜덤하게 컴퓨터 상황에 맞춰서 경쟁하며 소비한다.
한마디로 구독을 여러코루틴으로 나눠놓으면 지들끼리 값나오는대로 경쟁중인것이다.
그리고 channel에 cancel을 때리면 그것을 for문으로 소비하고 있던 것들이 다 일괄종료된다.
또한 for문을 통해 소비하는가와 consumeEcah 를 통해서 소비하는가에 따라 다른점이 있는데
예시와 같이 for문을 통해 값을 소비한다면 하나의 코루틴(소비자 하나)가 fail이 나도 나머지 것들이 살아서 계속 소비한다.
하지만 consumeEach 는 consume하든 cancel을 하든 둘중하나가 결론이 나기때문에 만약 여기 안에서 fail이 나면 채널 자체를 캔슬때려버린다.
이를 자세히 살펴보려면 consumeEach 코드를 까보면된다.
public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
consume {
for (e in this) action(e)
}
consumeEach 는 다음과같이 consume이라는 함수를 for문을 돌며 실행하도록 만들어주는 함수이다.
그럼 consume는 뭐하는 함수일까?
public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
var cause: Throwable? = null
try {
return block()
} catch (e: Throwable) {
cause = e
throw e
} finally {
cancelConsumed(cause)
}
}
consume의 코드는 다음과 같다.
이를 살펴보면 cause라는 오류 내용을 담는 변수가 있는데 기본적으로 null이고 성공상황에는 block 코드를 실행하고 오류가나면 오류사항을 cause 변수에 담는 형태이다.
또한 잘 살펴보면 결론적으로 cancelConsumed에 cause를 넘겨서 실행하는것을 볼 수 있다.
그럼 또 cancslConsumed는 뭘까?
@PublishedApi
internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
cancel(cause?.let {
it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
})
}
cancelConsumed를 살펴보면 cause가 null이라면 단순 cancel을 실행하고
만약 내부적인 오류가있다면 cancel에 CancellationException으로 캐스팅한 exception을 전달하여 실행한다.
이렇게 코드를 살펴보면 consumeEach 함수는 내부적으로 오류가 나서 종료되던 for문이 종료되어 정상종료되던
channel을 취소시키는것을 볼 수있다.
이제 왜 consumeEach를 사용하였을시 fail 상황에 channel이 cancel이 나는지 알수있을것이다.
Fan-in
이번에는 여러코루틴에서 하나의 channel에 값을 동시에 밀어넣는 상황을 살펴본다.
예시를 봐보자
예시는 값을 밀어넣는 생산자 코드가 생산자별 string을 조금씩 delay를 주면서 밀어넣는 형태이다.
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
이 생산자 함수를 여러 코루틴을 통해서 하나의 채널에 밀어넣는것을 봐보자
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
//결과
foo
foo
BAR!
foo
foo
BAR!
그냥 우리가 생각하는대로 잘 순서에맞춰서 들어간다 즉 생산자가 여럿이여도 channel은 잘 동작할 수있다.
'코틀린 > 코루틴' 카테고리의 다른 글
channels 6주차 2차 스터디 Prime numbers with pipeline 까지 (1) | 2023.12.06 |
---|---|
channels 6주차 1차 스터디 Building channel producers 까지 (0) | 2023.12.06 |
flow 5주차 1차 스터디 flow끝까지 (1) | 2023.11.29 |
Flow 4주차 2차 스터디 CatchingDeclaratively까지 (2) | 2023.11.23 |
Flow 4주차 1차 스터디 flatMapLatest 까지 (1) | 2023.11.23 |