Remove unused Rx/Coroutines converters

(cherry picked from commit b49280e347d857c7f1dd17805f601ed2766e650e)
This commit is contained in:
arkon 2023-02-18 10:16:05 -05:00 committed by Jobobby04
parent a163223d83
commit 268f542bbb
3 changed files with 3 additions and 68 deletions

View File

@ -1,61 +0,0 @@
package eu.kanade.core.util
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.launch
import rx.Emitter
import rx.Observable
import rx.Observer
import kotlin.coroutines.CoroutineContext
fun <T : Any> Observable<T>.asFlow(): Flow<T> = callbackFlow {
val observer = object : Observer<T> {
override fun onNext(t: T) {
trySend(t)
}
override fun onError(e: Throwable) {
close(e)
}
override fun onCompleted() {
close()
}
}
val subscription = subscribe(observer)
awaitClose { subscription.unsubscribe() }
}
fun <T : Any> Flow<T>.asObservable(
context: CoroutineContext = Dispatchers.Unconfined,
backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE,
): Observable<T> {
return Observable.create(
{ emitter ->
/*
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
* asObservable is already invoked from unconfined
*/
val job = GlobalScope.launch(context = context, start = CoroutineStart.ATOMIC) {
try {
collect { emitter.onNext(it) }
emitter.onCompleted()
} catch (e: Throwable) {
// Ignore `CancellationException` as error, since it indicates "normal cancellation"
if (e !is CancellationException) {
emitter.onError(e)
} else {
emitter.onCompleted()
}
}
}
emitter.setCancellation { job.cancel() }
},
backpressureMode,
)
}

View File

@ -21,7 +21,6 @@ import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.source.online.HttpSource
import eu.kanade.tachiyomi.source.online.fetchAllImageUrlsFromPageList
import eu.kanade.tachiyomi.util.lang.RetryWithDelay
import eu.kanade.tachiyomi.util.lang.plusAssign
import eu.kanade.tachiyomi.util.storage.DiskUtil
import eu.kanade.tachiyomi.util.storage.DiskUtil.NOMEDIA_FILE
import eu.kanade.tachiyomi.util.storage.saveTo
@ -33,6 +32,7 @@ import logcat.LogPriority
import nl.adaptivity.xmlutil.serialization.XML
import okhttp3.Response
import rx.Observable
import rx.Subscription
import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers
import rx.subscriptions.CompositeSubscription
@ -649,6 +649,8 @@ class Downloader(
return queue.none { it.status.value <= Download.State.DOWNLOADING.value }
}
private operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription)
companion object {
const val TMP_DIR_SUFFIX = "_tmp"
const val WARNING_NOTIF_TIMEOUT_MS = 30_000L

View File

@ -1,6 +0,0 @@
package eu.kanade.tachiyomi.util.lang
import rx.Subscription
import rx.subscriptions.CompositeSubscription
operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription)