Replace some usages of RxJava
(cherry picked from commit 788583e66f6ad355f65f5bd2db303faa85163c5c) # Conflicts: # app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/BrowseSourcePresenter.kt # app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/Pager.kt # app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt
This commit is contained in:
parent
dfa559d30f
commit
bc208bac30
@ -1,10 +1,12 @@
|
||||
package eu.kanade.tachiyomi.data.download.model
|
||||
|
||||
import com.jakewharton.rxrelay.PublishRelay
|
||||
import eu.kanade.core.util.asFlow
|
||||
import eu.kanade.domain.manga.model.Manga
|
||||
import eu.kanade.tachiyomi.data.database.models.Chapter
|
||||
import eu.kanade.tachiyomi.data.download.DownloadStore
|
||||
import eu.kanade.tachiyomi.source.model.Page
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
@ -72,8 +74,11 @@ class DownloadQueue(
|
||||
fun getActiveDownloads(): Observable<Download> =
|
||||
Observable.from(this).filter { download -> download.status == Download.State.DOWNLOADING }
|
||||
|
||||
@Deprecated("Use getStatusAsFlow instead")
|
||||
fun getStatusObservable(): Observable<Download> = statusSubject.onBackpressureBuffer()
|
||||
|
||||
fun getStatusAsFlow(): Flow<Download> = getStatusObservable().asFlow()
|
||||
|
||||
fun getUpdatedObservable(): Observable<List<Download>> = updatedRelay.onBackpressureBuffer()
|
||||
.startWith(Unit)
|
||||
.map { this }
|
||||
@ -84,6 +89,7 @@ class DownloadQueue(
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated("Use getProgressAsFlow instead")
|
||||
fun getProgressObservable(): Observable<Download> {
|
||||
return statusSubject.onBackpressureBuffer()
|
||||
.startWith(getActiveDownloads())
|
||||
@ -103,6 +109,10 @@ class DownloadQueue(
|
||||
.filter { it.status == Download.State.DOWNLOADING }
|
||||
}
|
||||
|
||||
fun getProgressAsFlow(): Flow<Download> {
|
||||
return getProgressObservable().asFlow()
|
||||
}
|
||||
|
||||
private fun setPagesSubject(pages: List<Page>?, subject: PublishSubject<Int>?) {
|
||||
pages?.forEach { it.setStatusSubject(subject) }
|
||||
}
|
||||
|
@ -57,13 +57,4 @@ open class BasePresenter<V> : RxPresenter<V>() {
|
||||
* @param onError function to execute when the observable throws an error.
|
||||
*/
|
||||
fun <T> Observable<T>.subscribeLatestCache(onNext: (V, T) -> Unit, onError: ((V, Throwable) -> Unit) = { _, _ -> }) = compose(deliverLatestCache<T>()).subscribe(split(onNext, onError)).apply { add(this) }
|
||||
|
||||
/**
|
||||
* Subscribes an observable with [deliverReplay] and adds it to the presenter's lifecycle
|
||||
* subscription list.
|
||||
*
|
||||
* @param onNext function to execute when the observable emits an item.
|
||||
* @param onError function to execute when the observable throws an error.
|
||||
*/
|
||||
fun <T> Observable<T>.subscribeReplay(onNext: (V, T) -> Unit, onError: ((V, Throwable) -> Unit) = { _, _ -> }) = compose(deliverReplay<T>()).subscribe(split(onNext, onError)).apply { add(this) }
|
||||
}
|
||||
|
@ -59,6 +59,7 @@ import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.firstOrNull
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
@ -70,9 +71,6 @@ import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.JsonArray
|
||||
import logcat.LogPriority
|
||||
import rx.Subscription
|
||||
import rx.android.schedulers.AndroidSchedulers
|
||||
import rx.schedulers.Schedulers
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import xyz.nulldev.ts.api.http.serializer.FilterSerializer
|
||||
@ -135,7 +133,7 @@ open class BrowseSourcePresenter(
|
||||
/**
|
||||
* Subscription for the pager.
|
||||
*/
|
||||
private var pagerSubscription: Subscription? = null
|
||||
private var pagerJob: Job? = null
|
||||
|
||||
/**
|
||||
* Subscription for one request from the pager.
|
||||
@ -156,7 +154,6 @@ open class BrowseSourcePresenter(
|
||||
super.onCreate(savedState)
|
||||
|
||||
source = sourceManager.get(sourceId) as? CatalogueSource ?: return
|
||||
|
||||
sourceFilters = source.getFilterList()
|
||||
|
||||
// SY -->
|
||||
@ -213,35 +210,46 @@ open class BrowseSourcePresenter(
|
||||
pager = createPager(query, filters)
|
||||
|
||||
val sourceId = source.id
|
||||
|
||||
val sourceDisplayMode = prefs.sourceDisplayMode()
|
||||
|
||||
// Prepare the pager.
|
||||
pagerSubscription?.let { remove(it) }
|
||||
pagerSubscription = pager.results()
|
||||
.observeOn(Schedulers.io())
|
||||
pagerJob?.cancel()
|
||||
pagerJob = presenterScope.launchIO {
|
||||
pager.asFlow()
|
||||
// SY -->
|
||||
.map { (page, mangas, metadata) ->
|
||||
Triple(page, mangas.map { networkToLocalManga(it, sourceId).toDomainManga()!! }, metadata)
|
||||
}
|
||||
// SY <--
|
||||
.doOnNext { initializeMangas(it.second) }
|
||||
// SY -->
|
||||
.map { (page, mangas, metadata) ->
|
||||
page to mangas.mapIndexed { index, manga ->
|
||||
SourceItem(manga, sourceDisplayMode, metadata?.getOrNull(index))
|
||||
}
|
||||
}
|
||||
// SY <--
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribeReplay(
|
||||
{ view, (page, mangas) ->
|
||||
view.onAddPage(page, mangas)
|
||||
},
|
||||
{ _, error ->
|
||||
logcat(LogPriority.ERROR, error)
|
||||
.map { (first, second, third) ->
|
||||
Triple(
|
||||
first,
|
||||
second.map {
|
||||
networkToLocalManga(
|
||||
it,
|
||||
sourceId,
|
||||
).toDomainManga()!!
|
||||
},
|
||||
third,
|
||||
)
|
||||
}
|
||||
// SY <--
|
||||
.onEach { initializeMangas(it.second) }
|
||||
// SY -->
|
||||
.map { (first, second, third) ->
|
||||
first to second.mapIndexed { index, manga ->
|
||||
SourceItem(
|
||||
manga,
|
||||
sourceDisplayMode,
|
||||
third?.getOrNull(index),
|
||||
)
|
||||
}
|
||||
}
|
||||
// SY <--
|
||||
.catch { error ->
|
||||
logcat(LogPriority.ERROR, error)
|
||||
}
|
||||
.collectLatest { (page, mangas) ->
|
||||
withUIContext {
|
||||
view?.onAddPage(page, mangas)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Request first page.
|
||||
requestNext()
|
||||
|
@ -1,11 +1,12 @@
|
||||
package eu.kanade.tachiyomi.ui.browse.source.browse
|
||||
|
||||
import com.jakewharton.rxrelay.PublishRelay
|
||||
import eu.kanade.core.util.asFlow
|
||||
import eu.kanade.tachiyomi.source.model.MangasPage
|
||||
import eu.kanade.tachiyomi.source.model.MetadataMangasPage
|
||||
import eu.kanade.tachiyomi.source.model.SManga
|
||||
import exh.metadata.metadata.base.RaisedSearchMetadata
|
||||
import rx.Observable
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
|
||||
/**
|
||||
* A general pager for source requests (latest updates, popular, search)
|
||||
@ -17,8 +18,8 @@ abstract class Pager(var currentPage: Int = 1) {
|
||||
|
||||
protected val results: PublishRelay</* SY --> */ Triple /* SY <-- */<Int, List<SManga> /* SY --> */, List<RaisedSearchMetadata>? /* SY <-- */>> = PublishRelay.create()
|
||||
|
||||
fun results(): Observable</* SY --> */ Triple /* SY <-- */<Int, List<SManga> /* SY --> */, List<RaisedSearchMetadata>?> /* SY <-- */> {
|
||||
return results.asObservable()
|
||||
fun asFlow(): Flow</* SY --> */ Triple /* SY <-- */<Int, List<SManga> /* SY --> */, List<RaisedSearchMetadata>?> /* SY <-- */> {
|
||||
return results.asObservable().asFlow()
|
||||
}
|
||||
|
||||
abstract suspend fun requestNextPage()
|
||||
|
@ -96,6 +96,7 @@ import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
@ -104,9 +105,6 @@ import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.supervisorScope
|
||||
import kotlinx.coroutines.withContext
|
||||
import logcat.LogPriority
|
||||
import rx.Subscription
|
||||
import rx.android.schedulers.AndroidSchedulers
|
||||
import rx.schedulers.Schedulers
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
@ -176,8 +174,8 @@ class MangaPresenter(
|
||||
/**
|
||||
* Subscription to observe download status changes.
|
||||
*/
|
||||
private var observeDownloadsStatusSubscription: Subscription? = null
|
||||
private var observeDownloadsPageSubscription: Subscription? = null
|
||||
private var observeDownloadsStatusJob: Job? = null
|
||||
private var observeDownloadsPageJob: Job? = null
|
||||
|
||||
private var _trackList: List<TrackItem> = emptyList()
|
||||
val trackList get() = _trackList
|
||||
@ -791,29 +789,29 @@ class MangaPresenter(
|
||||
val isMergedSource = source is MergedSource
|
||||
val mergedIds = if (isMergedSource) successState?.mergedData?.manga?.keys.orEmpty() else emptySet()
|
||||
// SY <--
|
||||
observeDownloadsStatusSubscription?.let { remove(it) }
|
||||
observeDownloadsStatusSubscription = downloadManager.queue.getStatusObservable()
|
||||
.observeOn(Schedulers.io())
|
||||
.onBackpressureBuffer()
|
||||
.filter { download -> /* SY --> */ if (isMergedSource) download.manga.id in mergedIds else /* SY <-- */ download.manga.id == successState?.manga?.id }
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribeLatestCache(
|
||||
{ _, it -> updateDownloadState(it) },
|
||||
{ _, error ->
|
||||
logcat(LogPriority.ERROR, error)
|
||||
},
|
||||
)
|
||||
observeDownloadsStatusJob?.cancel()
|
||||
observeDownloadsStatusJob = presenterScope.launchIO {
|
||||
downloadManager.queue.getStatusAsFlow()
|
||||
.filter { /* SY --> */ if (isMergedSource) it.manga.id in mergedIds else /* SY <-- */ it.manga.id == successState?.manga?.id }
|
||||
.catch { error -> logcat(LogPriority.ERROR, error) }
|
||||
.collectLatest {
|
||||
withUIContext {
|
||||
updateDownloadState(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
observeDownloadsPageSubscription?.let { remove(it) }
|
||||
observeDownloadsPageSubscription = downloadManager.queue.getProgressObservable()
|
||||
.observeOn(Schedulers.io())
|
||||
.onBackpressureBuffer()
|
||||
.filter { download -> /* SY --> */ if (isMergedSource) download.manga.id in mergedIds else /* SY <-- */ download.manga.id == successState?.manga?.id }
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribeLatestCache(
|
||||
{ _, download -> updateDownloadState(download) },
|
||||
{ _, error -> logcat(LogPriority.ERROR, error) },
|
||||
)
|
||||
observeDownloadsPageJob?.cancel()
|
||||
observeDownloadsPageJob = presenterScope.launchIO {
|
||||
downloadManager.queue.getProgressAsFlow()
|
||||
.filter { /* SY --> */ if (isMergedSource) it.manga.id in mergedIds else /* SY <-- */ it.manga.id == successState?.manga?.id }
|
||||
.catch { error -> logcat(LogPriority.ERROR, error) }
|
||||
.collectLatest {
|
||||
withUIContext {
|
||||
updateDownloadState(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateDownloadState(download: Download) {
|
||||
|
@ -17,31 +17,30 @@ import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter
|
||||
import eu.kanade.tachiyomi.ui.recent.DateSectionItem
|
||||
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
import eu.kanade.tachiyomi.util.lang.toDateKey
|
||||
import eu.kanade.tachiyomi.util.lang.withUIContext
|
||||
import eu.kanade.tachiyomi.util.system.logcat
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.flow.map
|
||||
import logcat.LogPriority
|
||||
import rx.Observable
|
||||
import rx.android.schedulers.AndroidSchedulers
|
||||
import rx.schedulers.Schedulers
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import java.text.DateFormat
|
||||
import java.util.Calendar
|
||||
import java.util.Date
|
||||
import java.util.TreeMap
|
||||
|
||||
class UpdatesPresenter : BasePresenter<UpdatesController>() {
|
||||
|
||||
val preferences: PreferencesHelper by injectLazy()
|
||||
private val downloadManager: DownloadManager by injectLazy()
|
||||
private val sourceManager: SourceManager by injectLazy()
|
||||
|
||||
private val handler: DatabaseHandler by injectLazy()
|
||||
private val updateChapter: UpdateChapter by injectLazy()
|
||||
private val setReadStatus: SetReadStatus by injectLazy()
|
||||
class UpdatesPresenter(
|
||||
private val preferences: PreferencesHelper = Injekt.get(),
|
||||
private val downloadManager: DownloadManager = Injekt.get(),
|
||||
private val sourceManager: SourceManager = Injekt.get(),
|
||||
private val handler: DatabaseHandler = Injekt.get(),
|
||||
private val updateChapter: UpdateChapter = Injekt.get(),
|
||||
private val setReadStatus: SetReadStatus = Injekt.get(),
|
||||
) : BasePresenter<UpdatesController>() {
|
||||
|
||||
private val relativeTime: Int = preferences.relativeTime().get()
|
||||
private val dateFormat: DateFormat = preferences.dateFormat()
|
||||
@ -52,39 +51,33 @@ class UpdatesPresenter : BasePresenter<UpdatesController>() {
|
||||
override fun onCreate(savedState: Bundle?) {
|
||||
super.onCreate(savedState)
|
||||
|
||||
getUpdatesObservable()
|
||||
presenterScope.launchIO {
|
||||
subscribeToUpdates()
|
||||
|
||||
downloadManager.queue.getStatusObservable()
|
||||
.observeOn(Schedulers.io())
|
||||
.onBackpressureBuffer()
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribeLatestCache(
|
||||
{ view, it ->
|
||||
downloadManager.queue.getStatusAsFlow()
|
||||
.catch { error -> logcat(LogPriority.ERROR, error) }
|
||||
.collectLatest {
|
||||
withUIContext {
|
||||
onDownloadStatusChange(it)
|
||||
view.onChapterDownloadUpdate(it)
|
||||
},
|
||||
{ _, error ->
|
||||
logcat(LogPriority.ERROR, error)
|
||||
},
|
||||
)
|
||||
view?.onChapterDownloadUpdate(it)
|
||||
}
|
||||
}
|
||||
|
||||
downloadManager.queue.getProgressObservable()
|
||||
.observeOn(Schedulers.io())
|
||||
.onBackpressureBuffer()
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribeLatestCache(UpdatesController::onChapterDownloadUpdate) { _, error ->
|
||||
logcat(LogPriority.ERROR, error)
|
||||
downloadManager.queue.getProgressAsFlow()
|
||||
.catch { error -> logcat(LogPriority.ERROR, error) }
|
||||
.collectLatest {
|
||||
withUIContext {
|
||||
view?.onChapterDownloadUpdate(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get observable containing recent chapters and date
|
||||
*
|
||||
* @return observable containing recent chapters and date
|
||||
*/
|
||||
private fun getUpdatesObservable() {
|
||||
private suspend fun subscribeToUpdates() {
|
||||
// Set date limit for recent chapters
|
||||
presenterScope.launchIO {
|
||||
val cal = Calendar.getInstance().apply {
|
||||
time = Date()
|
||||
add(Calendar.MONTH, -3)
|
||||
@ -123,7 +116,6 @@ class UpdatesPresenter : BasePresenter<UpdatesController>() {
|
||||
preferences.unreadUpdatesCount().set(list.count { !it.chapter.read })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds and assigns the list of downloaded chapters.
|
||||
@ -184,16 +176,14 @@ class UpdatesPresenter : BasePresenter<UpdatesController>() {
|
||||
* @param chapters list of chapters
|
||||
*/
|
||||
fun deleteChapters(chapters: List<UpdatesItem>) {
|
||||
Observable.just(chapters)
|
||||
.doOnNext { deleteChaptersInternal(it) }
|
||||
.subscribeOn(Schedulers.io())
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribeFirst(
|
||||
{ view, _ ->
|
||||
view.onChaptersDeleted()
|
||||
},
|
||||
UpdatesController::onChaptersDeletedError,
|
||||
)
|
||||
launchIO {
|
||||
try {
|
||||
deleteChaptersInternal(chapters)
|
||||
withUIContext { view?.onChaptersDeleted() }
|
||||
} catch (e: Throwable) {
|
||||
withUIContext { view?.onChaptersDeletedError(e) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user