본문 바로가기

코틀린/코루틴

channels 6주차 2차 스터디 Prime numbers with pipeline 까지

Pipelines

공식문서의 설명이 사실 난해하다고 생각한다.

일단 공식문서에 나와있는 부분을 해석해보고 이에대해 추가 조사한 자료들을 통해 다시 정리해보려한다.

 

파이프 라인은 하나의 코루틴이 값의 스트림을 무한이 만들어낼 수 있는 패턴이라고 한다.

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

예시가 이런데 while문으로 열어놓고 cancel 로 종료하는 형태를 주로 다룬다.

 

그리고 왜인지는 의문이지만 channels에는 딱히 중간연산자가 없다(map,filter 이런거 조차도 없다...)

그래서 이런것들을 channel을 받아서 값을 변형후 새로운 channel 을 반환하는 형태로 가공한다.

-> 이런형태를 파이프 라인 구축이라고 한다.

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

그래서 중간연산자 대신에 이렇게 다음 소비와 동시에 재생산하는 새로운 채널을 통해 값의 변형을 일으킨다.

 

그래서 코드상에서 이 두채널을 연결해서 원하는 바를 이룬다 -> 파이프 라인을 구성한다.

val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
repeat(5) {
    println(squares.receive()) // print first five
}
println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines
​
//결과
1
4
9
16
25
Done!

 

위에 예시들로는 이게 뭐야 싶어서 일부 자체 해석도 집어넣고 여기저기 내용을 찾아서 보충했다. 그 자료와 구문을 첨부하려한다.

https://kt.academy/article/cc-channel

Pipelines

Sometimes we set two channels such that one produces elements based on those received from another. In such cases, we call it a pipeline.

 

 

https://www.baeldung.com/kotlin/channels#pipelines-using-channels

-> 이거는 길어서 링크로 대체

Prime numbers with pipeline

channel을 통해 파이프라인을 구축해서 효율적으로 작업을 처리하는 예시를 보여주고있다.

예시는 소수를 구하는 예시인데 처음 보면 좀 이해가 안된다.

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

일단 최초 숫자들을 순차적으로 뿌리는 channel을 구성하고

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

다음 이렇게 가공하는채널 가공 함수를 만들어준다 -> 내부적으로 채널을 재생산해서 내보낸다.

var cur = numbersFrom(2)
repeat(10) {
    val prime = cur.receive()
    println(prime)
    cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
​
//결과
2
3
5
7
11
13
17
19
23
29

그래서 호출하는 부분에서 다음과 같이 호출하면 실제적으로 소수를 제외한 숫자들을 다 걸러버리는 파이프라인을 구축할수있다.

 

이거 처음에 보고 도데체 어떻게 돌아가는거지 하고 이해가 안되고 머리가 아팠는데

그냥 계속해서 보고있으니 어떻게 돌아가는지 파악이되었다.

 

말로 설명하자니 어려운데 순차적으로 숫자가 어떻게 처리되는지 따라가다보면 이해할 수 있다.

요약하자면 filter함수는 일회성으로 호출되고 끝나는게 아니라 계속해서 최초생성된 channel을 감싸고있으며 값을 처리해서 내보낸다.

그래서 cur에 receive 연산을 걸면 최외각을 감싸고있는 filter가 조건에 맞는 값을 받아서 send를 1회 실시할때까지channel(numbersFrom 을 통해 생성된 채널)은 수를 연속적으로 send한다.

 

그럼 숫자를 직접 넣어보며 생각하자

예를들어 filter(5)까지 적용한다고 생각해보자

그럼 cur의 상태는 이런식이 될것이다.

filter( filter( filter(numberFrom(), 2), 3), 5) 이런식을 뚫고 값이 나온다고 생각하면된다.

그래서 최초 생성된 channel에서 6부터 숫자를 순차적으로 흘려보내고 각 filter를 거치며 2로 나눠지나 보고,3으로 나눠지나보고 5로 나눠지는 최초의 수가 나올때까지 최초생성 channel이 값을 흘려보낸다.

내부적으로 조건식이 for (x in numbers) if (x % prime != 0) send(x) 이므로 값으나올때까지 계속 값을 받는다.

그래서 10개의 소수를 내보내고 종료되는것이다.

 

근데 이거 무슨 재귀함수처럼 애초에 살짝 스택에 어떻게 함수가 쌓일지 예상가는 사람이 잘쓰는것처럼 머리가 아프다.

애초에 이렇게 설계 잘못할것 같은데 그래도 필요하다면 머리를 써가며 사용해야할것 같다.

 

그리고 여기서 또 한가지 중요키워드가 나오는데

이렇게 pipeline을 구축할때 while 문을 돌려놨으니 종료지점을 내가 명시해야한다.(cancel 해줘야한다.)

근데 방금과 같이 filter로 자식 코루틴을 계속생성했는데 이를 각각 관리해야하나? 싶은 마음이든다.

 

하지만 예시와 같이 cancelChildren()을 사용한다면 한방에 자식 코루틴을 싹 날리며 종료할 수 있다.(그냥 forEach돌면서 자식을 다 취소한다.)

public fun CoroutineContext.cancelChildren(cause: CancellationException? = null) {
    this[Job]?.children?.forEach { it.cancel(cause) }
}

-> 사실 부모 코루틴을 캔슬시키면 자동으로 자식은 다 캔슬되는데 굳이 이걸 왜쓰지라는 생각이 드는데 이런 컨텍스트 자체를 재활용하는 상황이라면(컨텍스트 하나 주입받아서 여기저기서 쓰는경우가 있으니) 이런 함수를 이용하면 되겠다는 생각이든다.

 

 

어쨋든 결론적으로 이 파이프 라인이 가지는 장점을 살펴보면

이렇게 channels 를 통해 pipeline을 구축하여 작업한다면 여러개의 Cpu Cores를 이용할 수 있는 장점이 있다고한다.(병렬성 프로그래밍 쌉가능)

그래서 이런때는 Dispatchers.Default디스패처를 이용하면 무거운 cpu연산에 더욱 어울리게 될것이다.

 

 

channels 를 sequence혹은 iterrator를 통해서 동기방식으로 대체할 수도 있다고는 하는데 그냥 그런가보다 하면될것 같다.