diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/index/IndexPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/index/IndexPresenter.kt index bc8ff9dff..f8aef3787 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/index/IndexPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/index/IndexPresenter.kt @@ -14,23 +14,30 @@ import eu.kanade.tachiyomi.source.model.toSManga import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter import eu.kanade.tachiyomi.ui.browse.source.browse.BrowseSourcePresenter.Companion.toItems import eu.kanade.tachiyomi.util.lang.awaitSingle -import eu.kanade.tachiyomi.util.lang.runAsObservable +import eu.kanade.tachiyomi.util.lang.launchIO import eu.kanade.tachiyomi.util.lang.withUIContext import eu.kanade.tachiyomi.util.system.logcat import exh.log.xLogE import exh.savedsearches.EXHSavedSearch import exh.savedsearches.JsonSavedSearch import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.flatMapConcat +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.serialization.decodeFromString import kotlinx.serialization.json.Json import logcat.LogPriority -import rx.Observable -import rx.Subscription -import rx.android.schedulers.AndroidSchedulers -import rx.schedulers.Schedulers -import rx.subjects.PublishSubject import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import xyz.nulldev.ts.api.http.serializer.FilterSerializer @@ -50,15 +57,10 @@ open class IndexPresenter( val preferences: PreferencesHelper = Injekt.get() ) : BasePresenter() { - /** - * Fetches the different sources by user settings. - */ - private var fetchSourcesSubscription: Subscription? = null - /** * Subject which fetches image of given manga. */ - private val fetchImageSubject = PublishSubject.create>>() + private val fetchImageFlow = MutableSharedFlow, Boolean>>() /** * Modifiable list of filters. @@ -74,7 +76,7 @@ open class IndexPresenter( /** * Subscription for fetching images of manga. */ - private var fetchImageSubscription: Subscription? = null + private var fetchImageJob: Job? = null val latestItems = MutableStateFlow?>(null) @@ -84,12 +86,6 @@ open class IndexPresenter( query = "" } - override fun onDestroy() { - fetchSourcesSubscription?.unsubscribe() - fetchImageSubscription?.unsubscribe() - super.onDestroy() - } - override fun onCreate(savedState: Bundle?) { super.onCreate(savedState) @@ -150,30 +146,35 @@ open class IndexPresenter( * @param manga the list of manga to initialize. */ private fun fetchImage(manga: List, isLatest: Boolean) { - fetchImageSubject.onNext(manga.map { it to isLatest }) + presenterScope.launchIO { + fetchImageFlow.emit(manga to isLatest) + } } /** * Subscribes to the initializer of manga details and updates the view if needed. */ private fun initializeFetchImageSubscription() { - fetchImageSubscription?.unsubscribe() - fetchImageSubscription = fetchImageSubject.observeOn(Schedulers.io()) - .flatMap { pair -> - Observable.from(pair).filter { it.first.thumbnail_url == null && !it.first.initialized } - .concatMap { getMangaDetailsObservable(it.first, source, it.second) } + fetchImageJob?.cancel() + fetchImageFlow + .flatMapConcat { (manga, isLatest) -> + manga.asFlow() + .filter { it.thumbnail_url == null && !it.initialized } + .map { + getMangaDetailsFlow(it, source, isLatest) + } } - .onBackpressureBuffer() - .observeOn(AndroidSchedulers.mainThread()) - .subscribe( - { pair -> - @Suppress("DEPRECATION") - view?.onMangaInitialized(pair.first, pair.second) - }, - { error -> - logcat(LogPriority.ERROR, error) + .buffer(Channel.RENDEZVOUS) + .flowOn(Dispatchers.IO) + .onEach { (manga, isLatest) -> + withUIContext { + view?.onMangaInitialized(manga, isLatest) } - ) + } + .catch { + logcat(LogPriority.ERROR, it) + } + .launchIn(presenterScope) } /** @@ -182,15 +183,12 @@ open class IndexPresenter( * @param manga the manga to initialize. * @return an observable of the manga to initialize */ - private fun getMangaDetailsObservable(manga: Manga, source: Source, isLatest: Boolean): Observable> { - return runAsObservable { - val networkManga = source.getMangaDetails(manga.toMangaInfo()) - manga.copyFrom(networkManga.toSManga()) - manga.initialized = true - db.insertManga(manga).executeAsBlocking() - manga to isLatest - } - .onErrorResumeNext { Observable.just(manga to isLatest) } + private suspend fun getMangaDetailsFlow(manga: Manga, source: Source, isLatest: Boolean): Pair { + val networkManga = source.getMangaDetails(manga.toMangaInfo()) + manga.copyFrom(networkManga.toSManga()) + manga.initialized = true + db.insertManga(manga).executeAsBlocking() + return manga to isLatest } /**