diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt index 33cc2d52d..042c8ae52 100755 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt @@ -1,10 +1,12 @@ package eu.kanade.tachiyomi.data.download.model import com.jakewharton.rxrelay.PublishRelay +import eu.kanade.core.util.asFlow import eu.kanade.domain.manga.model.Manga import eu.kanade.tachiyomi.data.database.models.Chapter import eu.kanade.tachiyomi.data.download.DownloadStore import eu.kanade.tachiyomi.source.model.Page +import kotlinx.coroutines.flow.Flow import rx.Observable import rx.subjects.PublishSubject import java.util.concurrent.CopyOnWriteArrayList @@ -72,8 +74,11 @@ class DownloadQueue( fun getActiveDownloads(): Observable = Observable.from(this).filter { download -> download.status == Download.State.DOWNLOADING } + @Deprecated("Use getStatusAsFlow instead") fun getStatusObservable(): Observable = statusSubject.onBackpressureBuffer() + fun getStatusAsFlow(): Flow = getStatusObservable().asFlow() + fun getUpdatedObservable(): Observable> = updatedRelay.onBackpressureBuffer() .startWith(Unit) .map { this } @@ -84,6 +89,7 @@ class DownloadQueue( } } + @Deprecated("Use getProgressAsFlow instead") fun getProgressObservable(): Observable { return statusSubject.onBackpressureBuffer() .startWith(getActiveDownloads()) @@ -103,6 +109,10 @@ class DownloadQueue( .filter { it.status == Download.State.DOWNLOADING } } + fun getProgressAsFlow(): Flow { + return getProgressObservable().asFlow() + } + private fun setPagesSubject(pages: List?, subject: PublishSubject?) { pages?.forEach { it.setStatusSubject(subject) } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/base/presenter/BasePresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/base/presenter/BasePresenter.kt index d55d169cc..f1da3b693 100755 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/base/presenter/BasePresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/base/presenter/BasePresenter.kt @@ -57,13 +57,4 @@ open class BasePresenter : RxPresenter() { * @param onError function to execute when the observable throws an error. */ fun Observable.subscribeLatestCache(onNext: (V, T) -> Unit, onError: ((V, Throwable) -> Unit) = { _, _ -> }) = compose(deliverLatestCache()).subscribe(split(onNext, onError)).apply { add(this) } - - /** - * Subscribes an observable with [deliverReplay] and adds it to the presenter's lifecycle - * subscription list. - * - * @param onNext function to execute when the observable emits an item. - * @param onError function to execute when the observable throws an error. - */ - fun Observable.subscribeReplay(onNext: (V, T) -> Unit, onError: ((V, Throwable) -> Unit) = { _, _ -> }) = compose(deliverReplay()).subscribe(split(onNext, onError)).apply { add(this) } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/BrowseSourcePresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/BrowseSourcePresenter.kt index fe18b1e95..c54be2b51 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/BrowseSourcePresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/BrowseSourcePresenter.kt @@ -59,6 +59,7 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.launchIn @@ -70,9 +71,6 @@ import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonArray import logcat.LogPriority -import rx.Subscription -import rx.android.schedulers.AndroidSchedulers -import rx.schedulers.Schedulers import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import xyz.nulldev.ts.api.http.serializer.FilterSerializer @@ -135,7 +133,7 @@ open class BrowseSourcePresenter( /** * Subscription for the pager. */ - private var pagerSubscription: Subscription? = null + private var pagerJob: Job? = null /** * Subscription for one request from the pager. @@ -156,7 +154,6 @@ open class BrowseSourcePresenter( super.onCreate(savedState) source = sourceManager.get(sourceId) as? CatalogueSource ?: return - sourceFilters = source.getFilterList() // SY --> @@ -213,35 +210,46 @@ open class BrowseSourcePresenter( pager = createPager(query, filters) val sourceId = source.id - val sourceDisplayMode = prefs.sourceDisplayMode() - // Prepare the pager. - pagerSubscription?.let { remove(it) } - pagerSubscription = pager.results() - .observeOn(Schedulers.io()) - // SY --> - .map { (page, mangas, metadata) -> - Triple(page, mangas.map { networkToLocalManga(it, sourceId).toDomainManga()!! }, metadata) - } - // SY <-- - .doOnNext { initializeMangas(it.second) } - // SY --> - .map { (page, mangas, metadata) -> - page to mangas.mapIndexed { index, manga -> - SourceItem(manga, sourceDisplayMode, metadata?.getOrNull(index)) + pagerJob?.cancel() + pagerJob = presenterScope.launchIO { + pager.asFlow() + // SY --> + .map { (first, second, third) -> + Triple( + first, + second.map { + networkToLocalManga( + it, + sourceId, + ).toDomainManga()!! + }, + third, + ) } - } - // SY <-- - .observeOn(AndroidSchedulers.mainThread()) - .subscribeReplay( - { view, (page, mangas) -> - view.onAddPage(page, mangas) - }, - { _, error -> + // SY <-- + .onEach { initializeMangas(it.second) } + // SY --> + .map { (first, second, third) -> + first to second.mapIndexed { index, manga -> + SourceItem( + manga, + sourceDisplayMode, + third?.getOrNull(index), + ) + } + } + // SY <-- + .catch { error -> logcat(LogPriority.ERROR, error) - }, - ) + } + .collectLatest { (page, mangas) -> + withUIContext { + view?.onAddPage(page, mangas) + } + } + } // Request first page. requestNext() diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/Pager.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/Pager.kt index 6db4bbbff..da628574d 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/Pager.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/Pager.kt @@ -1,11 +1,12 @@ package eu.kanade.tachiyomi.ui.browse.source.browse import com.jakewharton.rxrelay.PublishRelay +import eu.kanade.core.util.asFlow import eu.kanade.tachiyomi.source.model.MangasPage import eu.kanade.tachiyomi.source.model.MetadataMangasPage import eu.kanade.tachiyomi.source.model.SManga import exh.metadata.metadata.base.RaisedSearchMetadata -import rx.Observable +import kotlinx.coroutines.flow.Flow /** * A general pager for source requests (latest updates, popular, search) @@ -17,8 +18,8 @@ abstract class Pager(var currentPage: Int = 1) { protected val results: PublishRelay */ Triple /* SY <-- */ /* SY --> */, List? /* SY <-- */>> = PublishRelay.create() - fun results(): Observable */ Triple /* SY <-- */ /* SY --> */, List?> /* SY <-- */> { - return results.asObservable() + fun asFlow(): Flow */ Triple /* SY <-- */ /* SY --> */, List?> /* SY <-- */> { + return results.asObservable().asFlow() } abstract suspend fun requestNextPage() diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt index 60034bf9b..190af305f 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt @@ -96,6 +96,7 @@ import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach @@ -104,9 +105,6 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.withContext import logcat.LogPriority -import rx.Subscription -import rx.android.schedulers.AndroidSchedulers -import rx.schedulers.Schedulers import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import uy.kohesive.injekt.injectLazy @@ -176,8 +174,8 @@ class MangaPresenter( /** * Subscription to observe download status changes. */ - private var observeDownloadsStatusSubscription: Subscription? = null - private var observeDownloadsPageSubscription: Subscription? = null + private var observeDownloadsStatusJob: Job? = null + private var observeDownloadsPageJob: Job? = null private var _trackList: List = emptyList() val trackList get() = _trackList @@ -791,29 +789,29 @@ class MangaPresenter( val isMergedSource = source is MergedSource val mergedIds = if (isMergedSource) successState?.mergedData?.manga?.keys.orEmpty() else emptySet() // SY <-- - observeDownloadsStatusSubscription?.let { remove(it) } - observeDownloadsStatusSubscription = downloadManager.queue.getStatusObservable() - .observeOn(Schedulers.io()) - .onBackpressureBuffer() - .filter { download -> /* SY --> */ if (isMergedSource) download.manga.id in mergedIds else /* SY <-- */ download.manga.id == successState?.manga?.id } - .observeOn(AndroidSchedulers.mainThread()) - .subscribeLatestCache( - { _, it -> updateDownloadState(it) }, - { _, error -> - logcat(LogPriority.ERROR, error) - }, - ) + observeDownloadsStatusJob?.cancel() + observeDownloadsStatusJob = presenterScope.launchIO { + downloadManager.queue.getStatusAsFlow() + .filter { /* SY --> */ if (isMergedSource) it.manga.id in mergedIds else /* SY <-- */ it.manga.id == successState?.manga?.id } + .catch { error -> logcat(LogPriority.ERROR, error) } + .collectLatest { + withUIContext { + updateDownloadState(it) + } + } + } - observeDownloadsPageSubscription?.let { remove(it) } - observeDownloadsPageSubscription = downloadManager.queue.getProgressObservable() - .observeOn(Schedulers.io()) - .onBackpressureBuffer() - .filter { download -> /* SY --> */ if (isMergedSource) download.manga.id in mergedIds else /* SY <-- */ download.manga.id == successState?.manga?.id } - .observeOn(AndroidSchedulers.mainThread()) - .subscribeLatestCache( - { _, download -> updateDownloadState(download) }, - { _, error -> logcat(LogPriority.ERROR, error) }, - ) + observeDownloadsPageJob?.cancel() + observeDownloadsPageJob = presenterScope.launchIO { + downloadManager.queue.getProgressAsFlow() + .filter { /* SY --> */ if (isMergedSource) it.manga.id in mergedIds else /* SY <-- */ it.manga.id == successState?.manga?.id } + .catch { error -> logcat(LogPriority.ERROR, error) } + .collectLatest { + withUIContext { + updateDownloadState(it) + } + } + } } private fun updateDownloadState(download: Download) { diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt index ce56e45dc..5f4518183 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt @@ -17,31 +17,30 @@ import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter import eu.kanade.tachiyomi.ui.recent.DateSectionItem import eu.kanade.tachiyomi.util.lang.launchIO import eu.kanade.tachiyomi.util.lang.toDateKey +import eu.kanade.tachiyomi.util.lang.withUIContext import eu.kanade.tachiyomi.util.system.logcat import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.map import logcat.LogPriority -import rx.Observable -import rx.android.schedulers.AndroidSchedulers -import rx.schedulers.Schedulers -import uy.kohesive.injekt.injectLazy +import uy.kohesive.injekt.Injekt +import uy.kohesive.injekt.api.get import java.text.DateFormat import java.util.Calendar import java.util.Date import java.util.TreeMap -class UpdatesPresenter : BasePresenter() { - - val preferences: PreferencesHelper by injectLazy() - private val downloadManager: DownloadManager by injectLazy() - private val sourceManager: SourceManager by injectLazy() - - private val handler: DatabaseHandler by injectLazy() - private val updateChapter: UpdateChapter by injectLazy() - private val setReadStatus: SetReadStatus by injectLazy() +class UpdatesPresenter( + private val preferences: PreferencesHelper = Injekt.get(), + private val downloadManager: DownloadManager = Injekt.get(), + private val sourceManager: SourceManager = Injekt.get(), + private val handler: DatabaseHandler = Injekt.get(), + private val updateChapter: UpdateChapter = Injekt.get(), + private val setReadStatus: SetReadStatus = Injekt.get(), +) : BasePresenter() { private val relativeTime: Int = preferences.relativeTime().get() private val dateFormat: DateFormat = preferences.dateFormat() @@ -52,77 +51,70 @@ class UpdatesPresenter : BasePresenter() { override fun onCreate(savedState: Bundle?) { super.onCreate(savedState) - getUpdatesObservable() + presenterScope.launchIO { + subscribeToUpdates() - downloadManager.queue.getStatusObservable() - .observeOn(Schedulers.io()) - .onBackpressureBuffer() - .observeOn(AndroidSchedulers.mainThread()) - .subscribeLatestCache( - { view, it -> - onDownloadStatusChange(it) - view.onChapterDownloadUpdate(it) - }, - { _, error -> - logcat(LogPriority.ERROR, error) - }, - ) + downloadManager.queue.getStatusAsFlow() + .catch { error -> logcat(LogPriority.ERROR, error) } + .collectLatest { + withUIContext { + onDownloadStatusChange(it) + view?.onChapterDownloadUpdate(it) + } + } - downloadManager.queue.getProgressObservable() - .observeOn(Schedulers.io()) - .onBackpressureBuffer() - .observeOn(AndroidSchedulers.mainThread()) - .subscribeLatestCache(UpdatesController::onChapterDownloadUpdate) { _, error -> - logcat(LogPriority.ERROR, error) - } + downloadManager.queue.getProgressAsFlow() + .catch { error -> logcat(LogPriority.ERROR, error) } + .collectLatest { + withUIContext { + view?.onChapterDownloadUpdate(it) + } + } + } } /** * Get observable containing recent chapters and date - * - * @return observable containing recent chapters and date */ - private fun getUpdatesObservable() { + private suspend fun subscribeToUpdates() { // Set date limit for recent chapters - presenterScope.launchIO { - val cal = Calendar.getInstance().apply { - time = Date() - add(Calendar.MONTH, -3) - } - - handler - .subscribeToList { - mangasQueries.getRecentlyUpdated(after = cal.timeInMillis, mangaChapterMapper) - } - .map { mangaChapter -> - val map = TreeMap>> { d1, d2 -> d2.compareTo(d1) } - val byDate = mangaChapter.groupByTo(map) { it.second.dateFetch.toDateKey() } - byDate.flatMap { entry -> - val dateItem = DateSectionItem(entry.key, relativeTime, dateFormat) - entry.value - .sortedWith(compareBy({ it.second.dateFetch }, { it.second.chapterNumber })).asReversed() - .map { UpdatesItem(it.second, it.first, dateItem) } - } - } - .collectLatest { list -> - list.forEach { item -> - // Find an active download for this chapter. - val download = downloadManager.queue.find { it.chapter.id == item.chapter.id } - - // If there's an active download, assign it, otherwise ask the manager if - // the chapter is downloaded and assign it to the status. - if (download != null) { - item.download = download - } - } - setDownloadedChapters(list) - - _updates.value = list - - // Set unread chapter count for bottom bar badge - preferences.unreadUpdatesCount().set(list.count { !it.chapter.read }) - } + val cal = Calendar.getInstance().apply { + time = Date() + add(Calendar.MONTH, -3) } + + handler + .subscribeToList { + mangasQueries.getRecentlyUpdated(after = cal.timeInMillis, mangaChapterMapper) + } + .map { mangaChapter -> + val map = TreeMap>> { d1, d2 -> d2.compareTo(d1) } + val byDate = mangaChapter.groupByTo(map) { it.second.dateFetch.toDateKey() } + byDate.flatMap { entry -> + val dateItem = DateSectionItem(entry.key, relativeTime, dateFormat) + entry.value + .sortedWith(compareBy({ it.second.dateFetch }, { it.second.chapterNumber })).asReversed() + .map { UpdatesItem(it.second, it.first, dateItem) } + } + } + .collectLatest { list -> + list.forEach { item -> + // Find an active download for this chapter. + val download = downloadManager.queue.find { it.chapter.id == item.chapter.id } + + // If there's an active download, assign it, otherwise ask the manager if + // the chapter is downloaded and assign it to the status. + if (download != null) { + item.download = download + } + } + setDownloadedChapters(list) + + _updates.value = list + + // Set unread chapter count for bottom bar badge + preferences.unreadUpdatesCount().set(list.count { !it.chapter.read }) + } } /** @@ -184,16 +176,14 @@ class UpdatesPresenter : BasePresenter() { * @param chapters list of chapters */ fun deleteChapters(chapters: List) { - Observable.just(chapters) - .doOnNext { deleteChaptersInternal(it) } - .subscribeOn(Schedulers.io()) - .observeOn(AndroidSchedulers.mainThread()) - .subscribeFirst( - { view, _ -> - view.onChaptersDeleted() - }, - UpdatesController::onChaptersDeletedError, - ) + launchIO { + try { + deleteChaptersInternal(chapters) + withUIContext { view?.onChaptersDeleted() } + } catch (e: Throwable) { + withUIContext { view?.onChaptersDeletedError(e) } + } + } } /**