본문 바로가기

코틀린/코루틴

channels 6주차 1차 스터디 Building channel producers 까지

Channels

드디어 flow를 넘어 Channel 이다.

팀과 함께 스터디하니 높아보였던 장벽이 깰만했던것 같아 기분이 좋다.

이제 들어가보자!!!

 

Deferred(async를 통해서 내뱉고 하는것) 을 통해 코루틴에서 single value를 전달할 수 있었다.

이제 Channels 는 값들의 stream을 전달할 수 있는 방법이다.

Channel basics

Channel 은 컨셉이 BlockingQueue와 비슷하다고 한다.(BlockingQueue가 뭔데 씹덕아)

BlockingQueue의 put 연산을 send를 통해하고 take연산을 receive를 통해서 한다고한다.

이때 주요 차이점이 BlockingQueue는 대기 해야하는 상황에 쓰레드에 Block을 걸어버리는 반면에 -> Block == 컨텍스트 스위칭

Channel은 이걸 코루틴의 suspending으로 쓰레드 블락을 걸지않고 컨텍스트 스위칭도 줄일 수 있기 때문에 효율적이다.(suspending 한다는 것이 주요 키워드이다.)

 

사용예시가 나와있다.

val channel = Channel<Int>()
launch {
    // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
    for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
​
//결과 
1
4
9
16
25
Done!

BlockingQueue와 비슷하게 여기서 send부분을 1~6까지 바꾸고 receive를 5회로 두면 무한히 기다린다.

반대로 receive를 6회로 늘리고 send부분을 1~5로 냅둬도 무한히 기다린다.

 

사실 사용하는 예시는 그렇구나 싶고 공식문서에서 자세한 설명을 BlockingQueue에다가 델리게이션 때렸으니 BlockingQueue가 뭔지 좀 알아봐야할것이다.

멧돼지 추가자료 Blocking Queue

자바 Blocking Queue 인터페이스 설명글

자바 Blocking Queue 구현체 설명글

BlockingQueueue 안드로이드 공식문서 글

 

이 첨부한 3가지 글에 자세한 설명이 잘 나와있다.

사실상 구현체는 그냥 궁금하면 얕게봐도되고 인터페이스를 설명한것을 보면 Blocking queue에 대한 정책을 이해할 수 있을것이다.

 

그리고 재미있는건 Okhttp Dispatcher에서 쓰레드풀 관리할때 SynchronousQueue를 쓰는데 이 또한 생산자/소비자 패턴이라 볼 수 있을것 같고 여기서 만나니 반가웠다.

Closing and iteration over channels

그냥 queue와는 다르게 Channel은 생산하는 쪽에서 channel을 close()를 통해 종료시켜버릴수 있다.

그래서 수신자 측에서는 그냥 for loop 를 통해 편하게 채널의 값을 받아다 처리할 수 있다.

예시를 살펴보면 한방에 이해할 수 있다.

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
​
//결과
1
4
9
16
25
Done!

이렇게 수신하는측에서 for문을 돌려버리면 편하고 생산하는 쪽에서는 끝내야하는 시점에 channel을 아예 close시켜버리면된다.(close안시키면 거기서 무한대기한다 -> 물론 suspend이지만)

 

close를 한다는것은 채널에 종료를 위한 특별한 토큰을 보내는것이다.

이 특별한 토큰이 전달되면 iteration이 종료된다고 한다.

 

그래서 이뜻은 close를 수신하기 이전에 받은 원소들은 모두다 처리된것을 보장한다고 한다.

Building channel producers

시퀀스의 원소를 코루틴을 통해 생성해내는것은 일반적이라고 한다.

이는 일종의 생산자/소비자 패턴(producer-consumer pattern) 인데 우리가 일반적으로 동시성(concurrent) 코드를 접할때 만날 수 있는 패턴이다.

사실 위에서 Blocking Queue를 공부했다면 생산자 소비자 패턴을 주구장창 봤을것이다. -> 생산자/소비자 패턴을 구현하는데 가장 일반적으로 사용되기 때문이다.

어쩃든 channel을 매개변수로 받아서 사용하는 함수를 통해 producer을 추상화 할 수 있지만, 이는 함수의 결과물을 함수의 반환값으로 반환해야한다는 통념을 어기게 된다.

 

producer관점에서 ReceiveChannel 을 반환하는 produce 코루틴 빌더를 이용하여 channel을 반환하는 형태를 취할 수 있고(produce는 CoroutineScope의 확장함수이다.)

또한 consumer관점에서 consumeEach 라는 확장함수를 이용하여 for문으로 loop를 도는것을 대체할 수 있다.

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}
​
fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

그래서 결론적으로 간단하게 consumeEach와 produce 코루틴 빌더를 사용하자.

멧돼지 추가자료 producer-consumer 패턴

producer -consumer 패턴은 일반적으로 검색하면 자료가 많으니 대략적인 키워드만 살짝 정리하고 넘어가려한다.

  • 멀티쓰레드 환경에서 작업할때 사용하는 가장 유명한 패턴 중 하나
  • 생산자와 소비자 각각을 독립적으로 관리하여 서로가 서로를 몰라도 되는 장점이있음
  • 생산자 소비자의 부하를 각각 조절할 수 있다는 장점이 있다.