Composing multiple flows
플로우를 합쳐내는 방법을 다룬다.
Zip
zip 사실 잘 안써서 생소하지만 두 플로우를 섞어서 쓸 수 있는 방법으로 zip이 있다.
예전에 학습했던 컬렉션 api를 뒤져보면 관련된 사용법을 겁나 많이 찾을 수 있을듯 하다
val nums = flow<Int>{
repeat(3){
emit(it)
delay(100)
}
} // numbers 1..3
val strs = flow<String>{
emit("???")
} // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
//결과
//0 -> ???
이런저런 실험을 해봤는데
두 플로우가 모두 값을 내뱉어야 처리하고 하나가 트리거 된다고 값을 처리하지 않는다.
예를 들어 A플로우가 3개 값을 뱉고 B플로우가 1개 값을 뱉으면 1개값만 처리되며(A플로우의 2개 값은 사라짐)
각각 모두 값을 하나씩 내뱉었을때만 비로소 처리된다.(플로우의 값이 하나씩 나올때까지 기다린다.)
예를들어 A플로우가 값이 나오는데 딜레이가 100이며 1,2,3을 내뱉고 B플로우가 값이나오는데 딜레이가 200 걸리며 값을 a,b,c를 내뱉는다면 collect에서 값을 받는건 1,a 세트 2,b 세트 이런식으로 하나씩 묶어서 받게된다.
Combine
Combine 연산자는 zip이랑 비슷한데 zip에서는 각 flow당 방출하는 값을 세트로 맞췄다면 combine는 각 flow가 방출할때마다 두 flow의 값을 합쳐서 내뱉는다.
즉 어느 하나라도 값을 방출하면 방출하지않는 flow는 가장 마지막 값을 갖고있다가 내뱉는데 공식문서에서 conflation을 생각하면 된다고 나와있다. 즉 가장 마지막 값을 갖고있고 flow를 방출하지 않는 flow는 마지막 값을 줘서 처리한다는 뜻이다.
그냥 사용 예시를 보면 이해가된다.
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//결과값
//1 -> one at 437 ms from start
//2 -> one at 635 ms from start
//2 -> two at 839 ms from start
//3 -> two at 936 ms from start
//3 -> three at 1239 ms from start
여기서 어떻게 보면 당연하지만 헷갈릴수도 있는게 만약 두 flow가 모두 값을 뱉기 이전상태 즉 첫번째 emit을 하나의 flow만 하고 다른 flow는 emit하지 않았을때는 제 아무리 한 flow가 값을 많이 뱉어도 다씹힌다.
val nums = (1..3).asFlow().onEach { delay(100) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//결과
//3 -> one at 438 ms from start
//3 -> two at 840 ms from start
//3 -> three at 1240 ms from start
다음 예시를 보면 strs가 첫값을 내뱉기 전에 nums가 값을 다 뱉어버리는데 이러면 이때 값을 뱉는것은 flow가 collect하지 않는다.
이런 부분만 주의한다면 될것 같다.
Flattening flows
Flattening이 직역하면 평탄화인데
종종 flow를 쓰다보면 flow안에 flow가 들어가는 상황(Flow<Flow<String>>)이 나온다고 한다. -> 물론 이렇게 잘 안짜지 않나 싶은데 근데 또 생각해보면 이런 경우가 종종있었는데 flatMap이 생각이 안나서 안쓴것 같기도 하다.
이런 상황이 나오는 예시로
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
(1..3).asFlow().map { requestFlow(it) }
이런 상황을 제시했다.
어쨋든 이럴때 하나의 flow로 합쳐버리기 위해 Flattening을 쓴다고 한다.
근데 flow는 비동기적인 환경 때문에 Flattening의 방식이 3가지가 있고 이를 적절히 사용해야 한다고 한다.
이제 3가지 방법을 살펴보자
1. flatMapConcat
flatMapConcat 와 flattenConcat을 이용하여 사용한다.
공식문서에는 가장 아날로그적으로 자연스러운 방식이라고 설명한다.
한마디로 정리하자면 flatMapConcat연산자 안에있는 함수가 다 끝날때까지 해당 중간 연산자가 적용되는 flow(예시의 (1..3).asFlow())가 대기하는 형태이다.
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//결과
//1: First at 121 ms from start
//1: Second at 622 ms from start
//2: First at 727 ms from start
//2: Second at 1227 ms from start
//3: First at 1328 ms from start
//3: Second at 1829 ms from start
결과적으로 사실상 대기가 키워드라고 생각하면된다.
2.flatMapMerge
flatMapMerge 와 flattenMerge을 이용하여 사용한다.
이제 슬슬 머리가 아파오기 시작하는데 일단 emit를 하는순간 죄다 동시성으로 collect를 타는거라고 생각하면된다.
그래서 concurrency 매개변수를 받아서 이를 관리하는데 기본값은 DEFAULT_CONCURRENCY으로 16개의 실행흐름이 생긴다고 한다. 그래서 동시적으로 처리해서 결과값을 뽑고 그것을 병합하는 형태이다.
일단 예시를 보면서 살펴보면 좋은데
공식문서에 나와있는걸 보면
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//결과
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
이렇게 동시적으로 collect를 탄다고 보면된다 (실행흐름이 빠바박 생기는것이다.)
근데 이런 실행흐름이 기본적으로 16개라고했으니 한계지점을 봐야할것이다, 그래서 예제를 변형시켜보면
fun myRequestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(2000) // wait 500 ms
emit("$i: Second")
}
val startTime = System.currentTimeMillis() // remember the start time
(1..20).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { myRequestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//결과
1: First at 169 ms from start
2: First at 265 ms from start
3: First at 366 ms from start
4: First at 466 ms from start
5: First at 567 ms from start
6: First at 667 ms from start
7: First at 768 ms from start
8: First at 868 ms from start
9: First at 969 ms from start
10: First at 1069 ms from start
11: First at 1169 ms from start
12: First at 1270 ms from start
13: First at 1370 ms from start
14: First at 1471 ms from start
15: First at 1571 ms from start
16: First at 1671 ms from start
1: Second at 2171 ms from start
17: First at 2171 ms from start
2: Second at 2271 ms from start
18: First at 2271 ms from start
3: Second at 2366 ms from start
19: First at 2372 ms from start
4: Second at 2467 ms from start
20: First at 2473 ms from start
5: Second at 2567 ms from start
6: Second at 2668 ms from start
7: Second at 2768 ms from start
8: Second at 2868 ms from start
9: Second at 2969 ms from start
10: Second at 3069 ms from start
11: Second at 3171 ms from start
12: Second at 3270 ms from start
13: Second at 3371 ms from start
14: Second at 3471 ms from start
15: Second at 3571 ms from start
16: Second at 3672 ms from start
17: Second at 4172 ms from start
18: Second at 4272 ms from start
19: Second at 4372 ms from start
20: Second at 4475 ms from start
이렇게 나온다 보면 16까지는 실행흐름이 빠바박 나오다가 17은 1번 실행흐름이 종료되고난후 시작된다.
자 그럼 당연히 다음 수순은 concurrency값을 조정해 봐야할것이다.
fun myRequestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(2000) // wait 500 ms
emit("$i: Second")
}
val startTime = System.currentTimeMillis() // remember the start time
(1..20).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge(20) { myRequestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//결과
1: First at 174 ms from start
2: First at 270 ms from start
3: First at 370 ms from start
4: First at 471 ms from start
5: First at 571 ms from start
6: First at 672 ms from start
7: First at 772 ms from start
8: First at 873 ms from start
9: First at 973 ms from start
10: First at 1074 ms from start
11: First at 1174 ms from start
12: First at 1274 ms from start
13: First at 1375 ms from start
14: First at 1475 ms from start
15: First at 1576 ms from start
16: First at 1676 ms from start
17: First at 1777 ms from start
18: First at 1877 ms from start
19: First at 1977 ms from start
20: First at 2079 ms from start
1: Second at 2174 ms from start
2: Second at 2270 ms from start
3: Second at 2371 ms from start
4: Second at 2471 ms from start
5: Second at 2572 ms from start
6: Second at 2672 ms from start
7: Second at 2773 ms from start
8: Second at 2873 ms from start
9: Second at 2973 ms from start
10: Second at 3074 ms from start
11: Second at 3174 ms from start
12: Second at 3275 ms from start
13: Second at 3375 ms from start
14: Second at 3476 ms from start
15: Second at 3576 ms from start
16: Second at 3677 ms from start
17: Second at 3777 ms from start
18: Second at 3877 ms from start
19: Second at 3978 ms from start
20: Second at 4080 ms from start
당연하게 20개의 실행흐름이 동시적으로 처리하는 형태를 띈다.
3. flatMapLatest
flatMapLatest을 이용하여 사용한다.
collectLatest를 생각하면되는데 중간연산자 flatMapLatest로 넘겨준 처리함수에서 suspend중인데 새로운 값이 emit되면 그냥 중간연산 작업을 취소 때려버린다(cancel해버림).
그래서 결론적으로 예시를 보면 이것도 감이온다.
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//결과
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
당연히 이거 보면 감이올것이다.
주의점!!!
중간연산자 함수가 중간에 suspend되는 지점이 없다면 취소되지 않는다. 이거 비선점형 + cancel 방식의 환장 콜라보를 따르는 것이다.
어쩃든 중간자 함수로 넘기는 처리함수가 suspend가 되어야함으로 suspned 지점이 없다면 중간에 조치를 취해야할것이다,(yield 호출하기, isActive 확인하기)
'코틀린 > 코루틴' 카테고리의 다른 글
flow 5주차 1차 스터디 flow끝까지 (1) | 2023.11.29 |
---|---|
Flow 4주차 2차 스터디 CatchingDeclaratively까지 (2) | 2023.11.23 |
Flow 2주차 3차 스터디 Processing the latest value까지 (1) | 2023.11.23 |
Flow 2주차 2차 스터디 Flows are sequential 까지 (0) | 2023.11.23 |
Flow 2주차 2차 스터디 부록 Cancellation and timeouts(코루틴 cancel) (1) | 2023.11.23 |