Rewrite IndexPresenter in flow

This commit is contained in:
Jobobby04 2022-03-13 19:23:03 -04:00
parent a5e691271b
commit d31d99a416

View File

@ -14,23 +14,30 @@ import eu.kanade.tachiyomi.source.model.toSManga
import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter
import eu.kanade.tachiyomi.ui.browse.source.browse.BrowseSourcePresenter.Companion.toItems
import eu.kanade.tachiyomi.util.lang.awaitSingle
import eu.kanade.tachiyomi.util.lang.runAsObservable
import eu.kanade.tachiyomi.util.lang.launchIO
import eu.kanade.tachiyomi.util.lang.withUIContext
import eu.kanade.tachiyomi.util.system.logcat
import exh.log.xLogE
import exh.savedsearches.EXHSavedSearch
import exh.savedsearches.JsonSavedSearch
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.json.Json
import logcat.LogPriority
import rx.Observable
import rx.Subscription
import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers
import rx.subjects.PublishSubject
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
import xyz.nulldev.ts.api.http.serializer.FilterSerializer
@ -50,15 +57,10 @@ open class IndexPresenter(
val preferences: PreferencesHelper = Injekt.get()
) : BasePresenter<IndexController>() {
/**
* Fetches the different sources by user settings.
*/
private var fetchSourcesSubscription: Subscription? = null
/**
* Subject which fetches image of given manga.
*/
private val fetchImageSubject = PublishSubject.create<List<Pair<Manga, Boolean>>>()
private val fetchImageFlow = MutableSharedFlow<Pair<List<Manga>, Boolean>>()
/**
* Modifiable list of filters.
@ -74,7 +76,7 @@ open class IndexPresenter(
/**
* Subscription for fetching images of manga.
*/
private var fetchImageSubscription: Subscription? = null
private var fetchImageJob: Job? = null
val latestItems = MutableStateFlow<List<IndexCardItem>?>(null)
@ -84,12 +86,6 @@ open class IndexPresenter(
query = ""
}
override fun onDestroy() {
fetchSourcesSubscription?.unsubscribe()
fetchImageSubscription?.unsubscribe()
super.onDestroy()
}
override fun onCreate(savedState: Bundle?) {
super.onCreate(savedState)
@ -150,30 +146,35 @@ open class IndexPresenter(
* @param manga the list of manga to initialize.
*/
private fun fetchImage(manga: List<Manga>, isLatest: Boolean) {
fetchImageSubject.onNext(manga.map { it to isLatest })
presenterScope.launchIO {
fetchImageFlow.emit(manga to isLatest)
}
}
/**
* Subscribes to the initializer of manga details and updates the view if needed.
*/
private fun initializeFetchImageSubscription() {
fetchImageSubscription?.unsubscribe()
fetchImageSubscription = fetchImageSubject.observeOn(Schedulers.io())
.flatMap { pair ->
Observable.from(pair).filter { it.first.thumbnail_url == null && !it.first.initialized }
.concatMap { getMangaDetailsObservable(it.first, source, it.second) }
fetchImageJob?.cancel()
fetchImageFlow
.flatMapConcat { (manga, isLatest) ->
manga.asFlow()
.filter { it.thumbnail_url == null && !it.initialized }
.map {
getMangaDetailsFlow(it, source, isLatest)
}
}
.onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ pair ->
@Suppress("DEPRECATION")
view?.onMangaInitialized(pair.first, pair.second)
},
{ error ->
logcat(LogPriority.ERROR, error)
.buffer(Channel.RENDEZVOUS)
.flowOn(Dispatchers.IO)
.onEach { (manga, isLatest) ->
withUIContext {
view?.onMangaInitialized(manga, isLatest)
}
)
}
.catch {
logcat(LogPriority.ERROR, it)
}
.launchIn(presenterScope)
}
/**
@ -182,15 +183,12 @@ open class IndexPresenter(
* @param manga the manga to initialize.
* @return an observable of the manga to initialize
*/
private fun getMangaDetailsObservable(manga: Manga, source: Source, isLatest: Boolean): Observable<Pair<Manga, Boolean>> {
return runAsObservable {
val networkManga = source.getMangaDetails(manga.toMangaInfo())
manga.copyFrom(networkManga.toSManga())
manga.initialized = true
db.insertManga(manga).executeAsBlocking()
manga to isLatest
}
.onErrorResumeNext { Observable.just(manga to isLatest) }
private suspend fun getMangaDetailsFlow(manga: Manga, source: Source, isLatest: Boolean): Pair<Manga, Boolean> {
val networkManga = source.getMangaDetails(manga.toMangaInfo())
manga.copyFrom(networkManga.toSManga())
manga.initialized = true
db.insertManga(manga).executeAsBlocking()
return manga to isLatest
}
/**