More coroutine tweaks
(cherry picked from commit c9cf9cfff006bc8d87e1d7a7245e213d579bb6d9) # Conflicts: # app/src/main/java/eu/kanade/tachiyomi/App.kt # app/src/main/java/eu/kanade/tachiyomi/AppModule.kt # app/src/main/java/eu/kanade/tachiyomi/data/backup/AbstractBackupRestore.kt # app/src/main/java/eu/kanade/tachiyomi/ui/reader/ReaderPresenter.kt # app/src/main/java/eu/kanade/tachiyomi/util/lang/RxCoroutineBridge.kt
This commit is contained in:
parent
c7bedb96a0
commit
0edff11353
@ -44,9 +44,7 @@ import kotlinx.coroutines.launch
|
||||
import org.conscrypt.Conscrypt
|
||||
import timber.log.Timber
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.InjektScope
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
import uy.kohesive.injekt.registry.default.DefaultRegistrar
|
||||
import java.io.File
|
||||
import java.security.NoSuchAlgorithmException
|
||||
import java.security.Security
|
||||
@ -83,7 +81,6 @@ open class App : Application(), LifecycleObserver {
|
||||
Security.insertProviderAt(Conscrypt.newProvider(), 1)
|
||||
}
|
||||
|
||||
Injekt = InjektScope(DefaultRegistrar())
|
||||
Injekt.importModule(AppModule(this))
|
||||
|
||||
setupNotificationChannels()
|
||||
|
@ -1,6 +1,7 @@
|
||||
package eu.kanade.tachiyomi
|
||||
|
||||
import android.app.Application
|
||||
import android.os.Handler
|
||||
import com.google.gson.Gson
|
||||
import eu.kanade.tachiyomi.data.cache.ChapterCache
|
||||
import eu.kanade.tachiyomi.data.cache.CoverCache
|
||||
@ -13,8 +14,6 @@ import eu.kanade.tachiyomi.extension.ExtensionManager
|
||||
import eu.kanade.tachiyomi.network.NetworkHelper
|
||||
import eu.kanade.tachiyomi.source.SourceManager
|
||||
import exh.eh.EHentaiUpdateHelper
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.serialization.json.Json
|
||||
import uy.kohesive.injekt.api.InjektModule
|
||||
import uy.kohesive.injekt.api.InjektRegistrar
|
||||
@ -56,19 +55,20 @@ class AppModule(val app: Application) : InjektModule {
|
||||
// SY <--
|
||||
|
||||
// Asynchronously init expensive components for a faster cold start
|
||||
Handler().post {
|
||||
get<PreferencesHelper>()
|
||||
|
||||
GlobalScope.launch { get<PreferencesHelper>() }
|
||||
get<NetworkHelper>()
|
||||
|
||||
GlobalScope.launch { get<NetworkHelper>() }
|
||||
get<SourceManager>()
|
||||
|
||||
GlobalScope.launch { get<SourceManager>() }
|
||||
get<DatabaseHelper>()
|
||||
|
||||
GlobalScope.launch { get<DatabaseHelper>() }
|
||||
get<DownloadManager>()
|
||||
|
||||
GlobalScope.launch { get<DownloadManager>() }
|
||||
|
||||
// SY -->
|
||||
GlobalScope.launch { get<CustomMangaManager>() }
|
||||
// SY <--
|
||||
// SY -->
|
||||
get<CustomMangaManager>()
|
||||
// SY <--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -10,7 +10,6 @@ import eu.kanade.tachiyomi.data.database.models.Track
|
||||
import eu.kanade.tachiyomi.data.track.TrackManager
|
||||
import eu.kanade.tachiyomi.source.Source
|
||||
import eu.kanade.tachiyomi.util.chapter.NoChaptersException
|
||||
import eu.kanade.tachiyomi.util.lang.await
|
||||
import exh.eh.EHentaiThrottleManager
|
||||
import kotlinx.coroutines.Job
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
@ -96,7 +95,7 @@ abstract class AbstractBackupRestore<T : AbstractBackupManager>(protected val co
|
||||
if (service != null && service.isLogged) {
|
||||
try {
|
||||
val updatedTrack = service.refresh(track)
|
||||
db.insertTrack(updatedTrack).await()
|
||||
db.insertTrack(updatedTrack).executeAsBlocking()
|
||||
} catch (e: Exception) {
|
||||
errors.add(Date() to "${manga.title} - ${e.message}")
|
||||
}
|
||||
|
@ -15,7 +15,6 @@ import eu.kanade.tachiyomi.source.model.SManga
|
||||
import eu.kanade.tachiyomi.source.model.toSManga
|
||||
import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter
|
||||
import eu.kanade.tachiyomi.ui.browse.source.browse.BrowseSourcePresenter
|
||||
import eu.kanade.tachiyomi.util.lang.await
|
||||
import eu.kanade.tachiyomi.util.lang.runAsObservable
|
||||
import rx.Observable
|
||||
import rx.Subscription
|
||||
@ -257,7 +256,7 @@ open class GlobalSearchPresenter(
|
||||
val networkManga = source.getMangaDetails(manga.toMangaInfo())
|
||||
manga.copyFrom(networkManga.toSManga())
|
||||
manga.initialized = true
|
||||
db.insertManga(manga).await()
|
||||
db.insertManga(manga).executeAsBlocking()
|
||||
return manga
|
||||
}
|
||||
|
||||
|
@ -264,7 +264,7 @@ class MainActivity : BaseViewBindingActivity<MainActivityBinding>() {
|
||||
return
|
||||
}
|
||||
|
||||
launchIO {
|
||||
lifecycleScope.launchIO {
|
||||
try {
|
||||
val pendingUpdates = ExtensionGithubApi().checkForUpdates(this@MainActivity)
|
||||
preferences.extensionUpdatesCount().set(pendingUpdates.size)
|
||||
|
@ -32,7 +32,6 @@ import eu.kanade.tachiyomi.ui.manga.chapter.ChapterItem
|
||||
import eu.kanade.tachiyomi.util.chapter.ChapterSettingsHelper
|
||||
import eu.kanade.tachiyomi.util.chapter.syncChaptersWithSource
|
||||
import eu.kanade.tachiyomi.util.isLocal
|
||||
import eu.kanade.tachiyomi.util.lang.await
|
||||
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
import eu.kanade.tachiyomi.util.lang.withUIContext
|
||||
import eu.kanade.tachiyomi.util.prepUpdateCover
|
||||
@ -289,7 +288,7 @@ class MangaPresenter(
|
||||
manga.prepUpdateCover(coverCache, sManga, manualFetch)
|
||||
manga.copyFrom(sManga)
|
||||
manga.initialized = true
|
||||
db.insertManga(manga).await()
|
||||
db.insertManga(manga).executeAsBlocking()
|
||||
|
||||
withUIContext { view?.onFetchMangaInfoDone() }
|
||||
} catch (e: Throwable) {
|
||||
@ -752,7 +751,7 @@ class MangaPresenter(
|
||||
hasRequested = true
|
||||
|
||||
if (fetchChaptersJob?.isActive == true) return
|
||||
fetchChaptersJob = launchIO {
|
||||
fetchChaptersJob = presenterScope.launchIO {
|
||||
try {
|
||||
if (source !is MergedSource) {
|
||||
val chapters = source.getChapterList(manga.toMangaInfo())
|
||||
@ -884,7 +883,7 @@ class MangaPresenter(
|
||||
}
|
||||
|
||||
launchIO {
|
||||
db.updateChaptersProgress(chapters).await()
|
||||
db.updateChaptersProgress(chapters).executeAsBlocking()
|
||||
|
||||
if (preferences.removeAfterMarkedAsRead() /* SY --> */ && manga.shouldDeleteChapters(db, preferences) /* SY <-- */) {
|
||||
deleteChapters(chapters)
|
||||
@ -915,7 +914,7 @@ class MangaPresenter(
|
||||
selectedChapters
|
||||
.forEach {
|
||||
it.bookmark = bookmarked
|
||||
db.updateChapterProgress(it).await()
|
||||
db.updateChapterProgress(it).executeAsBlocking()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -8,7 +8,6 @@ import eu.kanade.tachiyomi.data.preference.PreferencesHelper
|
||||
import eu.kanade.tachiyomi.data.track.TrackManager
|
||||
import eu.kanade.tachiyomi.data.track.TrackService
|
||||
import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter
|
||||
import eu.kanade.tachiyomi.util.lang.await
|
||||
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
import eu.kanade.tachiyomi.util.lang.withUIContext
|
||||
import eu.kanade.tachiyomi.util.system.toast
|
||||
@ -97,7 +96,7 @@ class TrackPresenter(
|
||||
.map {
|
||||
async {
|
||||
val track = it.service.refresh(it.track!!)
|
||||
db.insertTrack(track).await()
|
||||
db.insertTrack(track).executeAsBlocking()
|
||||
}
|
||||
}
|
||||
.awaitAll()
|
||||
@ -128,7 +127,7 @@ class TrackPresenter(
|
||||
launchIO {
|
||||
try {
|
||||
service.bind(item)
|
||||
db.insertTrack(item).await()
|
||||
db.insertTrack(item).executeAsBlocking()
|
||||
} catch (e: Throwable) {
|
||||
withUIContext { context.toast(e.message) }
|
||||
}
|
||||
@ -146,7 +145,7 @@ class TrackPresenter(
|
||||
launchIO {
|
||||
try {
|
||||
service.update(track)
|
||||
db.insertTrack(track).await()
|
||||
db.insertTrack(track).executeAsBlocking()
|
||||
withUIContext { view?.onRefreshDone() }
|
||||
} catch (e: Throwable) {
|
||||
withUIContext { view?.onRefreshError(e) }
|
||||
|
@ -26,7 +26,6 @@ import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
|
||||
import eu.kanade.tachiyomi.ui.reader.model.ReaderPage
|
||||
import eu.kanade.tachiyomi.ui.reader.model.ViewerChapters
|
||||
import eu.kanade.tachiyomi.util.isLocal
|
||||
import eu.kanade.tachiyomi.util.lang.await
|
||||
import eu.kanade.tachiyomi.util.lang.awaitSingleOrNull
|
||||
import eu.kanade.tachiyomi.util.lang.byteSize
|
||||
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
@ -778,7 +777,7 @@ class ReaderPresenter(
|
||||
val trackManager = Injekt.get<TrackManager>()
|
||||
|
||||
launchIO {
|
||||
db.getTracks(manga).await()
|
||||
db.getTracks(manga).executeAsBlocking()
|
||||
.mapNotNull { track ->
|
||||
val service = trackManager.getService(track.sync_id)
|
||||
if (service != null && service.isLogged && chapterRead > track.last_chapter_read /* SY --> */ && ((service.id == TrackManager.MDLIST && track.status != FollowStatus.UNFOLLOWED.int) || service.id != TrackManager.MDLIST)/* SY <-- */) {
|
||||
@ -789,7 +788,7 @@ class ReaderPresenter(
|
||||
async {
|
||||
runCatching {
|
||||
service.update(track)
|
||||
db.insertTrack(track).await()
|
||||
db.insertTrack(track).executeAsBlocking()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -797,8 +796,8 @@ class ReaderPresenter(
|
||||
}
|
||||
}
|
||||
.awaitAll()
|
||||
.filter { it.isFailure }
|
||||
.forEach { it.exceptionOrNull()?.let { e -> Timber.w(e) } }
|
||||
.mapNotNull { it.exceptionOrNull() }
|
||||
.forEach { Timber.w(it) }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package eu.kanade.tachiyomi.ui.setting.track
|
||||
|
||||
import android.net.Uri
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
|
||||
class AnilistLoginActivity : BaseOAuthLoginActivity() {
|
||||
@ -9,7 +10,7 @@ class AnilistLoginActivity : BaseOAuthLoginActivity() {
|
||||
val regex = "(?:access_token=)(.*?)(?:&)".toRegex()
|
||||
val matchResult = regex.find(data?.fragment.toString())
|
||||
if (matchResult?.groups?.get(1) != null) {
|
||||
launchIO {
|
||||
lifecycleScope.launchIO {
|
||||
trackManager.aniList.login(matchResult.groups[1]!!.value)
|
||||
returnToSettings()
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package eu.kanade.tachiyomi.ui.setting.track
|
||||
|
||||
import android.net.Uri
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
|
||||
class BangumiLoginActivity : BaseOAuthLoginActivity() {
|
||||
@ -8,7 +9,7 @@ class BangumiLoginActivity : BaseOAuthLoginActivity() {
|
||||
override fun handleResult(data: Uri?) {
|
||||
val code = data?.getQueryParameter("code")
|
||||
if (code != null) {
|
||||
launchIO {
|
||||
lifecycleScope.launchIO {
|
||||
trackManager.bangumi.login(code)
|
||||
returnToSettings()
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package eu.kanade.tachiyomi.ui.setting.track
|
||||
|
||||
import android.net.Uri
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
|
||||
class MyAnimeListLoginActivity : BaseOAuthLoginActivity() {
|
||||
@ -8,7 +9,7 @@ class MyAnimeListLoginActivity : BaseOAuthLoginActivity() {
|
||||
override fun handleResult(data: Uri?) {
|
||||
val code = data?.getQueryParameter("code")
|
||||
if (code != null) {
|
||||
launchIO {
|
||||
lifecycleScope.launchIO {
|
||||
trackManager.myAnimeList.login(code)
|
||||
returnToSettings()
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package eu.kanade.tachiyomi.ui.setting.track
|
||||
|
||||
import android.net.Uri
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
|
||||
class ShikimoriLoginActivity : BaseOAuthLoginActivity() {
|
||||
@ -8,7 +9,7 @@ class ShikimoriLoginActivity : BaseOAuthLoginActivity() {
|
||||
override fun handleResult(data: Uri?) {
|
||||
val code = data?.getQueryParameter("code")
|
||||
if (code != null) {
|
||||
launchIO {
|
||||
lifecycleScope.launchIO {
|
||||
trackManager.shikimori.login(code)
|
||||
returnToSettings()
|
||||
}
|
||||
|
@ -1,7 +1,5 @@
|
||||
package eu.kanade.tachiyomi.util.lang
|
||||
|
||||
import com.pushtorefresh.storio.operations.PreparedOperation
|
||||
import com.pushtorefresh.storio.sqlite.operations.get.PreparedGetObject
|
||||
import kotlinx.coroutines.CancellableContinuation
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.CoroutineStart
|
||||
@ -10,11 +8,8 @@ import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.suspendCancellableCoroutine
|
||||
import rx.Completable
|
||||
import rx.CompletableSubscriber
|
||||
import rx.Emitter
|
||||
import rx.Observable
|
||||
import rx.Observer
|
||||
@ -53,49 +48,46 @@ suspend fun <T> Single<T>.await(subscribeOn: Scheduler? = null): T {
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun <T> PreparedOperation<T>.await(): T = asRxSingle().await()
|
||||
suspend fun <T> PreparedGetObject<T>.await(): T? = asRxSingle().await()
|
||||
|
||||
suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) {
|
||||
return suspendCancellableCoroutine { continuation ->
|
||||
val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
|
||||
lateinit var sub: Subscription
|
||||
sub = self.subscribe(
|
||||
{
|
||||
continuation.resume(Unit) {
|
||||
sub.unsubscribe()
|
||||
}
|
||||
},
|
||||
{
|
||||
if (!continuation.isCancelled) {
|
||||
continuation.resumeWithException(it)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
continuation.invokeOnCancellation {
|
||||
sub.unsubscribe()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
|
||||
subscribe(
|
||||
object : CompletableSubscriber {
|
||||
override fun onSubscribe(s: Subscription) {
|
||||
cont.unsubscribeOnCancellation(s)
|
||||
}
|
||||
|
||||
override fun onCompleted() {
|
||||
cont.resume(Unit)
|
||||
}
|
||||
|
||||
override fun onError(e: Throwable) {
|
||||
cont.resumeWithException(e)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
// suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) {
|
||||
// return suspendCancellableCoroutine { continuation ->
|
||||
// val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
|
||||
// lateinit var sub: Subscription
|
||||
// sub = self.subscribe(
|
||||
// {
|
||||
// continuation.resume(Unit) {
|
||||
// sub.unsubscribe()
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// if (!continuation.isCancelled) {
|
||||
// continuation.resumeWithException(it)
|
||||
// }
|
||||
// }
|
||||
// )
|
||||
//
|
||||
// continuation.invokeOnCancellation {
|
||||
// sub.unsubscribe()
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
|
||||
// subscribe(
|
||||
// object : CompletableSubscriber {
|
||||
// override fun onSubscribe(s: Subscription) {
|
||||
// cont.unsubscribeOnCancellation(s)
|
||||
// }
|
||||
//
|
||||
// override fun onCompleted() {
|
||||
// cont.resume(Unit)
|
||||
// }
|
||||
//
|
||||
// override fun onError(e: Throwable) {
|
||||
// cont.resumeWithException(e)
|
||||
// }
|
||||
// }
|
||||
// )
|
||||
// }
|
||||
|
||||
suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
|
||||
cont.unsubscribeOnCancellation(
|
||||
@ -113,27 +105,27 @@ suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
|
||||
)
|
||||
}
|
||||
|
||||
suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T =
|
||||
firstOrDefault(default).awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(
|
||||
Observable.fromCallable(
|
||||
defaultValue
|
||||
)
|
||||
).first().awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
|
||||
// suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
|
||||
//
|
||||
// suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T =
|
||||
// firstOrDefault(default).awaitOne()
|
||||
//
|
||||
// suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
|
||||
//
|
||||
// suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(
|
||||
// Observable.fromCallable(
|
||||
// defaultValue
|
||||
// )
|
||||
// ).first().awaitOne()
|
||||
//
|
||||
// suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitSingleOrDefault(default: T): T =
|
||||
singleOrDefault(default).awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitSingleOrNull(): T? = singleOrDefault(null).awaitOne()
|
||||
// suspend fun <T> Observable<T>.awaitSingleOrDefault(default: T): T =
|
||||
// singleOrDefault(default).awaitOne()
|
||||
//
|
||||
// suspend fun <T> Observable<T>.awaitSingleOrNull(): T? = singleOrDefault(null).awaitOne()
|
||||
|
||||
private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
|
||||
cont.unsubscribeOnCancellation(
|
||||
@ -192,31 +184,31 @@ fun <T : Any?> Observable<T>.asFlow(): Flow<T> = callbackFlow {
|
||||
awaitClose { subscription.unsubscribe() }
|
||||
}
|
||||
|
||||
fun <T : Any?> Flow<T>.asObservable(backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE): Observable<T> {
|
||||
return Observable.create(
|
||||
{ emitter ->
|
||||
/*
|
||||
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
|
||||
* asObservable is already invoked from unconfined
|
||||
*/
|
||||
val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
|
||||
try {
|
||||
collect { emitter.onNext(it) }
|
||||
emitter.onCompleted()
|
||||
} catch (e: Throwable) {
|
||||
// Ignore `CancellationException` as error, since it indicates "normal cancellation"
|
||||
if (e !is CancellationException) {
|
||||
emitter.onError(e)
|
||||
} else {
|
||||
emitter.onCompleted()
|
||||
}
|
||||
}
|
||||
}
|
||||
emitter.setCancellation { job.cancel() }
|
||||
},
|
||||
backpressureMode
|
||||
)
|
||||
}
|
||||
// fun <T : Any> Flow<T>.asObservable(backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE): Observable<T> {
|
||||
// return Observable.create(
|
||||
// { emitter ->
|
||||
// /*
|
||||
// * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
|
||||
// * asObservable is already invoked from unconfined
|
||||
// */
|
||||
// val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
|
||||
// try {
|
||||
// collect { emitter.onNext(it) }
|
||||
// emitter.onCompleted()
|
||||
// } catch (e: Throwable) {
|
||||
// // Ignore `CancellationException` as error, since it indicates "normal cancellation"
|
||||
// if (e !is CancellationException) {
|
||||
// emitter.onError(e)
|
||||
// } else {
|
||||
// emitter.onCompleted()
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// emitter.setCancellation { job.cancel() }
|
||||
// },
|
||||
// backpressureMode
|
||||
// )
|
||||
// }
|
||||
|
||||
fun <T> runAsObservable(
|
||||
block: suspend () -> T,
|
||||
|
Loading…
x
Reference in New Issue
Block a user