본문 바로가기

코틀린/코루틴

Flow 2주차 2차 스터디 부록 Cancellation and timeouts(코루틴 cancel)

Cancellation and timeouts

flow를 공부하다 눈을 떠보니 여기와서 이부분을 학습하고있었다.

flow의 cancel을 알려면 coroutine의 cancel을 알아야했다.-> coroutine의 cancel 에 관한 내용이다

각설하고 내용을 살펴보자


별것 아닌것처럼 보이지만 취소가 가능하다는것 자체가 굉장히 강력한 기능중 하나이다. (코틀린 코루틴 책 초반부에 잘 나와있다.)

일반적으로 비동기 처리를 할때 콜백, 쓰레드 같은 것들을 사용하면 비동기처리 작업을 도중에 취소하기 어렵거나 취소할수 없다.

이제 코루틴의 Cancellation 과 timeouts 에 대해 알아보자 -> 링크 요약본이다.

Cancelling coroutine execution

일반적으로 우리가 job객체를 통해 할 수 있는 cancel관련 이야기를 한다.

코루틴 빌더 함수들은 job 객체를 반환하는데 이를 통해 작업이 필요없어졌을때 cancel메서드를 통해 작업을 취소할 수 있다.

   val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    job.join() // waits for job's completion 
    println("main: Now I can quit.")
    
    
결과:
//job: I'm sleeping 0 ...
//job: I'm sleeping 1 ...
//job: I'm sleeping 2 ...
//main: I'm tired of waiting!
//main: Now I can quit.

 

그냥 코드를 보면 이해가 가고 일반적인 이야기이다.

하지만 이 코드의 의문은 cancel은 하는데 다음에 바로왜 join을 하고있는가? 이다.

이에 대해서 공식문서가 뭐라고 쏼라쏼하 하는데 사실 말자체가 이해가 잘안되고 태환님의 블로그에 이에 대한 설명이 잘나와있다.

 

한마디로 cancel 만 해놨다고 종료가 되는게 아니다 -> 의도치 않게 동작하지 않을수 있다. 코루틴 내부에서 while문 돌려놓으면 영원히 안나가는것 처럼 여기서 코루틴이 비선점형이라는것이 체감된다.

 

그래서 단순 cancel 만 하는게 아니라 join을 바로 불러주는게 일반적인데 이를 편하게 하기위한 cancelAndJoin이라는 확장함수가 있으니 사용하자.

Cancellation is cooperative

직역을 하자면 coroutine의 취소는 협조적이라한다. -> 근데 무슨 뜻인지 정말 와닿지 않는다.

하지만 여기서 나오는 두가지 예시를 보면 코루틴이 구현된 구조상 cancel 을 해도 실제적으로 취소가 안되는 경우의 수를 설명한다.

cancel의 동작 매커니즘

밑의 두가지 예시를 살펴보기전 왜 이런 일들이 일어나는지 원리를 살펴보자

일단 Job 공식문서를 살펴보면 Job이 갖을수 있는 상태를 나열한 표가 나온다.

job이 가질 수 있는 상태는 다음과 같은데 cancel 과 관련된 상태로는 cancelling과 cancelled가 있다.

위의 도표와 같이 cancel 이라는 함수가 실행되는 시점에 job의 상태는 cancelling으로 변경된다.

하지만 상태가 변경되고 cancel 된 여부를 확인하는 프로퍼티인 isActive가 false로 변경되었을 뿐이지 실제적으로 실행중인 코루틴 블록을 직접 가서 꺼버리는것이 아니다.

 

그렇다면 코루틴의 cancel 은 어떻게 일어날까?

다음은 공식문서에 나와있는 글귀이다.

All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled.

 

Suspending functions의 경우 모두 취소가 가능하다.

 

또한 Suspending functions은 실행되는 시점에 cancellation여부를 확인하여(IsActive 프로퍼티 조사) 만약 이미 취소된 상태이라면 Cancellation Exception을 던짐으로서 취소동작을 실행시킨다.

 

한마디로 언제 job에 cancel을 거는 행위를 해도 suspend 함수가 실행되는 시점에 실제적으로 작업의 취소가 일어나는것이다.

이러한 매커니즘 때문에 일어날수 있는 취소했지만 취소가 안되는 경우들이 있다. 이를 밑에서 살펴보며 이런 행위를 하지 않아야함을 인식하자

 

 

1. while문 loop에 susepnd 되는 시점이 없어 cancel 자체를 실행하지 못할때

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (i < 5) { // computation loop, just wastes CPU -> i<10 으로 바꾸면 9가찍힐떄까지 계속 나옴
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
​
결과
//job: I'm sleeping 0 ...
//job: I'm sleeping 1 ...
//job: I'm sleeping 2 ...
//main: I'm tired of waiting!
//job: I'm sleeping 3 ...
//job: I'm sleeping 4 ...
//main: Now I can quit.

자 예시를 보면 분명 cancel 을 했는데 계속 코루틴을 통해서 돌고있는 작업이 취소되지 않고 그 작업이 완전히 종료 된 이후에(4가 찍힐때 까지) main: Now I can quit. 가 print 되는 모습을 볼 수 있다.

분명 취소했는데 왜 취소가 안되고 계속 작업하는 것일까?

 

cancel의 동작을 실행하는 시점이 suspend 함수가 실행되는 시점일때 이기 때문이다.

현재 코루틴 블록 내부에는 suspend 되는 함수가 없다. 즉 cancel 이 나도 cancel 되었는지 확인조차 안한다.

그러므로 실제적으로 작업을 취소 시킬 수 없다.

 

만약 delay(1)가 while문 어딘가에 존재한다면 결과물은 달라진다.

 

 

2. CancellationException을 susepnd 함수에서 체크해서 cancel 하는 구조인데 이를 rethrow하지않고 catch 해버릴때

val job = launch(Dispatchers.Default) {
    repeat(5) { i ->
        try {
            // print a message twice a second
            println("job: I'm sleeping $i ...")
            delay(500)
        } catch (e: Exception) {
            // log the exception
            println(e)
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
​
​
결과
//job: I'm sleeping 0 ...
//job: I'm sleeping 1 ...
//job: I'm sleeping 2 ...
//main: I'm tired of waiting!
//kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancell...
//job: I'm sleeping 3 ...
//kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancell...
//job: I'm sleeping 4 ...
//kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancell...
//main: Now I can quit.

여기서는 왜 또 취소가 안될까?

위에서 말했듯이 cancel이 동작하는 매커니즘은 suspend 함수가 cancel 여부를 확인후 cancel 되었다면 CancellationException를 던지는 방식이다.

위의 예시에서는 delay(500) 에서 CancellationException을 던지고 있을것이다. 하지만 이를 try catch 문으로 잡아놨으므로 당연스럽게도 종료가 되지않는다.

 

이를 해결하기 위한 방법으로 간단하게는 exception 을 rethrow해서 멈추는 방법이있다.

While catching Exception is an anti-pattern, this issue may surface in more subtle ways, like when using the runCatching function, which does not rethrow CancellationException.

근데 일단 Exception을 catching 하는 행위자체가 anit pattern 이라고 한다.

 

만약 이 설명들을 보고도 이해하지 못했다면 이글을 참고해보자

 

결론적으로 cancel의 정상동작을 보장하려면

Long running Loop(while문 같은것) 혹은 Exception을 catch 하는 부분에 있어서 주의를 기울어야 할것이다.

 

Making computation code cancellable

위와 같은 경우에 빠지지 않도록 하는 방법을 두가지 제시한다.

 

1. 주기적으로 suspending function 호출하기

이용도로 yield 함수가 적절한데 yield함수는 현재 코루틴이 들고있는Dispatcher 의 thread(혹은 thread pool)의 소유권을 Dispatcher내의 다른 코루틴에게 넘기는 함수이다. 즉 yield 함수를 주기적으로 실행해준다면 cancel도 동작하고 무한히 한 코루틴이 독점하는 일 또한 사라진다.

 

2. isActive를 통한 검사

위에서도 설명했지만 CoroutineScope의 확장 프로퍼티로 isActive 가 존재한다.

이는 cancel 관련 값이고 이를 검사해서 현재 cancel에 들어갔는지 아닌지 판단할 수 있다.

 

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (isActive) { // cancellable computation loop
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
​
결과
//job: I'm sleeping 0 ...
//job: I'm sleeping 1 ...
//job: I'm sleeping 2 ...
//main: I'm tired of waiting!
//main: Now I can quit.

 

코드의 예시를 보면 정확한 시점에 cancel 되는것을 볼 수 있다.

만약 while 등 긴 주기로 쓰레드 제어권을 갖을 일이 있다면 isActive 를 검사하는 로직을 끼워넣거나 yield함수를 사용해야 할 것이다.

Closing resources with finally

cancel 났을때 CancellationException을 통해서 종료하기 떄문에 finally를 통해서 리소스 같은것들을 해제해주는 용도로 사용할 수 있다고 한다. join을 사용하던 cancelAndjoin을 사용하든 finally블록이 종료될때까지 기다리기 때문에 사용에 적절하다한다.

 

근데 이글에서도 나와있지만 어짜피 리소스를 사용하는 것들은 Closeable의 확장함수로 붙어있는 use, useLines 를 사용하는것이 편하다.  그리고 use를 사용할때 코루틴 상황에서도 정상 종료된다고 공식문서에 나와있으니 그냥 use 쓰는게 편할것이다.

 

Use가 뭔지 모른다면 이펙티브 코틀린 item 9(use를 사용하여 리소스를 닫아라)를 봐보자

Run non-cancellable block

cancel이 나서 finally 블록을 사용할 때 주의점에 대한 이야기이다.

일반적으로 finally 블록에서는 파일을 닫거나, 리소스를 닫는등 non-blocking 인 경우가 대다수이다(suspend 함수를 포함하지 않는다.)

 

하지만 드물게 suspend function을 사용하는 경우가 있는데 이럴경우 withContext에 NonCancellable 컨텍스트를 넣어서 그 안에서 동작하게 코드를 구성하라고 한다.

 

우선 NonCancellable는 항상 활성화 상태를 유지 하도록 설계된 context이다.

 

현재 사용하려는 용도 cancel 되었지만 cancel 되지않고 실행되어야하는 블록을 위해 사용되도록 설계되었다.(withContext와 함께 사용하라고 명시되어있음)

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        withContext(NonCancellable) {
            println("job: I'm running finally")
            delay(1000L)
            println("job: And I've just delayed for 1 sec because I'm non-cancellable")
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
​
​
결과
//job: I'm sleeping 0 ...
//job: I'm sleeping 1 ...
//job: I'm sleeping 2 ...
//main: I'm tired of waiting!
//job: I'm running finally
//job: And I've just delayed for 1 sec because I'm non-cancellable
//main: Now I can quit.
​

 

여기까지는 사실상 공식문서에서 이렇게 사용하라에 대한 내용이고 이거 왜 이런지 궁금하지 않는가? 실험해봤다.

val job = launch(CoroutineName("pig")) {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        //withContext(NonCancellable) {
          println("Currently in coroutine: ${coroutineContext[CoroutineName]}")
​
            println("job: I'm running finally")
            try{
              delay(1L)
            }catch(e: Exception){
                println(e)
            }
            println("job: And I've just delayed for 1 sec because I'm non-cancellable")
        }
    //}
}
​
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
​
결과
//job: I'm sleeping 0 ...
//job: I'm sleeping 1 ...
//job: I'm sleeping 2 ...
//main: I'm tired of waiting!
//Currently in coroutine: CoroutineName(pig)
//job: I'm running finally
//kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelle...
//job: And I've just delayed for 1 sec because I'm non-cancellable
//main: Now I can quit.

내가 궁금한 의문점은 이러했다.

 

1. cancel 났는데 finally 블록에서는 어떤 코루틴을 끌어다 쓰고있는거지?

위의 코드예시를 보자 CoroutineName을 통해서 finally블록에서 어떤 코루틴에서 돌아가는지 보면 그냥 launch로 실행한 Pig 코루틴을 사용하고있다.

 

2. 그럼 withContext(NonCancellable) 없으면 어떻게 동작하지?

코드가 길어서 첨부를 안했지만 delay이후의 코드는 실행되지 않는다.

 

3. 왜 withContext(NonCancellable) 없으면 동작을 안하는거지?

이미 Pig코루틴이 cancel 이 되었기 때문에 그위에서 suspend함수를 실행하면 CancellationException이 일어난다.

-> 예시에서는 tryCatch 로 잡았기 때문에 그 후 코드도 정상 실행되는 것이다.

 

4.  CancellationException은 왜 일어나도 일반적인 Exception 처럼 전파되어 프로그램을 종료시키지 않는거지?

이 부분은 많은 키워드로 찾아보고 코틀린 코드 자체를 까봤는데 찾기 힘들었다 JobSupport.kt파일을 까봤는데 대충 1000자가 훨씬 넘고

난리도 아니다 결론적으로 CancellationException은 취소를 전파하는 용도일뿐 코루틴 밖을 빠져나와 프로그램을 멈추고 하는등의 행위는 일으키지 않는다.

 

추후 계속 의문을 갖다보면 언젠간 이에 대한 자료를 찾을테니 그때 업데이트 해야겠다.

Timeout

우리가 Job 인스턴스를 명시적으로 들고다니면서 취소해주는 방법도 있지만 시간을 기준으로 취소시키는 기준을 잡을 수도 있다.

withTimeout(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
}
​
//결과
//I'm sleeping 0 ...
//I'm sleeping 1 ...
//I'm sleeping 2 ...
//Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
// at _COROUTINE._BOUNDARY._ (CoroutineDebugging.kt:46) 
// at FileKt$main$1$1.invokeSuspend (File.kt:-1) 
// at FileKt$main$1.invokeSuspend (File.kt:-1) 
//Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
//at kotlinx.coroutines.TimeoutKt .TimeoutCancellationException(Timeout.kt:191)
//at kotlinx.coroutines.TimeoutCoroutine .run(Timeout.kt:159)
//at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask .run(EventLoop.common.kt:501)
//-> 실제 오류난 상황

 

withTimeOut함수를 이용하면 설정한 시간안에 일이 안끝날씨 TimeoutCancellationException 이 일어난다.

 

이 Exception은 CancellationException을 상속받았지만 CancellationException처럼 터지지않고 넘어가는것이 아닌 실제적으로 프로그램의 동작을 멈추는 Exception이다.

 

그래서 이를 사용하기위해 Try Catch(runcatching) 을 사용하거나

withTimeoutOrNull 을 이용하여 Null 을 대신 돌려받아 프로그램이 멈추지 않도록 할 수 있다.

val result = withTimeoutOrNull(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
    "Done" // will get cancelled before it produces this result
}
println("Result is $result")
​
//결과 
//I'm sleeping 0 ...
//I'm sleeping 1 ...
//I'm sleeping 2 ...
//Result is null