diff --git a/app/src/main/java/eu/kanade/core/util/RxJavaExtensions.kt b/app/src/main/java/eu/kanade/core/util/RxJavaExtensions.kt deleted file mode 100644 index b54fa63ab..000000000 --- a/app/src/main/java/eu/kanade/core/util/RxJavaExtensions.kt +++ /dev/null @@ -1,61 +0,0 @@ -package eu.kanade.core.util - -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.callbackFlow -import kotlinx.coroutines.launch -import rx.Emitter -import rx.Observable -import rx.Observer -import kotlin.coroutines.CoroutineContext - -fun Observable.asFlow(): Flow = callbackFlow { - val observer = object : Observer { - override fun onNext(t: T) { - trySend(t) - } - - override fun onError(e: Throwable) { - close(e) - } - - override fun onCompleted() { - close() - } - } - val subscription = subscribe(observer) - awaitClose { subscription.unsubscribe() } -} - -fun Flow.asObservable( - context: CoroutineContext = Dispatchers.Unconfined, - backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE, -): Observable { - return Observable.create( - { emitter -> - /* - * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if - * asObservable is already invoked from unconfined - */ - val job = GlobalScope.launch(context = context, start = CoroutineStart.ATOMIC) { - try { - collect { emitter.onNext(it) } - emitter.onCompleted() - } catch (e: Throwable) { - // Ignore `CancellationException` as error, since it indicates "normal cancellation" - if (e !is CancellationException) { - emitter.onError(e) - } else { - emitter.onCompleted() - } - } - } - emitter.setCancellation { job.cancel() } - }, - backpressureMode, - ) -} diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt index 7d193d615..e5ec430f3 100755 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt @@ -21,7 +21,6 @@ import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.source.online.HttpSource import eu.kanade.tachiyomi.source.online.fetchAllImageUrlsFromPageList import eu.kanade.tachiyomi.util.lang.RetryWithDelay -import eu.kanade.tachiyomi.util.lang.plusAssign import eu.kanade.tachiyomi.util.storage.DiskUtil import eu.kanade.tachiyomi.util.storage.DiskUtil.NOMEDIA_FILE import eu.kanade.tachiyomi.util.storage.saveTo @@ -33,6 +32,7 @@ import logcat.LogPriority import nl.adaptivity.xmlutil.serialization.XML import okhttp3.Response import rx.Observable +import rx.Subscription import rx.android.schedulers.AndroidSchedulers import rx.schedulers.Schedulers import rx.subscriptions.CompositeSubscription @@ -649,6 +649,8 @@ class Downloader( return queue.none { it.status.value <= Download.State.DOWNLOADING.value } } + private operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription) + companion object { const val TMP_DIR_SUFFIX = "_tmp" const val WARNING_NOTIF_TIMEOUT_MS = 30_000L diff --git a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt b/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt deleted file mode 100644 index dc162bf2c..000000000 --- a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt +++ /dev/null @@ -1,6 +0,0 @@ -package eu.kanade.tachiyomi.util.lang - -import rx.Subscription -import rx.subscriptions.CompositeSubscription - -operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription)