From 4965da340b981a9e6ca62b79168567d819733e2d Mon Sep 17 00:00:00 2001 From: Two-Ai <81279822+Two-Ai@users.noreply.github.com> Date: Sat, 21 Jan 2023 16:46:16 -0500 Subject: [PATCH] Replace RxJava in HttpPageLoader downloader (#8955) * Convert downloader Observable to flow Uses `runInterruptible` to turn the blocking call to `queue.take()` into a cancellable call. Flow collection is ended by cancelling the scope in `recycle`. This means the `HttpPageLoader` can't be reused after calling `recycle`, but this was true with the `Observable` as well.) * Convert load Observables to suspending function Inlining the Observables allows for some simplification of the error handling. Behavior should be otherwise identical. * Convert cleanup Completable to coroutine Uses global `launchIO`, not ideal but similar to previous behavior. Can't be scheduled on the local `scope` as this runs after `scope` is cancelled. (cherry picked from commit e4bc8990fbe2aa4bc31977f1061bac0c70d7a58f) # Conflicts: # app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt --- .../ui/reader/loader/HttpPageLoader.kt | 152 +++++++----------- 1 file changed, 56 insertions(+), 96 deletions(-) diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt index 39d94eee7..d670fb7b7 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt @@ -8,19 +8,23 @@ import eu.kanade.tachiyomi.source.online.HttpSource import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter import eu.kanade.tachiyomi.ui.reader.model.ReaderPage import eu.kanade.tachiyomi.ui.reader.setting.ReaderPreferences -import eu.kanade.tachiyomi.util.lang.plusAssign -import eu.kanade.tachiyomi.util.system.logcat +import eu.kanade.tachiyomi.util.lang.awaitSingle +import eu.kanade.tachiyomi.util.lang.launchIO import exh.source.isEhBasedSource import exh.util.DataSaver import exh.util.DataSaver.Companion.fetchImage import kotlinx.coroutines.CancellationException -import logcat.LogPriority -import rx.Completable +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.runInterruptible import rx.Observable import rx.schedulers.Schedulers import rx.subjects.PublishSubject import rx.subjects.SerializedSubject -import rx.subscriptions.CompositeSubscription import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import java.util.concurrent.PriorityBlockingQueue @@ -40,16 +44,13 @@ class HttpPageLoader( // SY <-- ) : PageLoader() { + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + /** * A queue used to manage requests one by one while allowing priorities. */ private val queue = PriorityBlockingQueue() - /** - * Current active subscriptions. - */ - private val subscriptions = CompositeSubscription() - private val preloadSize = /* SY --> */ readerPreferences.preloadSize().get() // SY <-- // SY --> @@ -60,20 +61,17 @@ class HttpPageLoader( // EXH --> repeat(readerPreferences.readerThreads().get()) { // EXH <-- - subscriptions += Observable.defer { Observable.just(queue.take().page) } - .filter { it.status == Page.State.QUEUE } - .concatMap { source.fetchImageFromCacheThenNet(it) } - .repeat() - .subscribeOn(Schedulers.io()) - .subscribe( - { - }, - { error -> - if (error !is InterruptedException) { - logcat(LogPriority.ERROR, error) - } - }, - ) + scope.launchIO { + flow { + while (true) { + emit(runInterruptible { queue.take() }.page) + } + } + .filter { it.status == Page.State.QUEUE } + .collect { + loadPage(it) + } + } // EXH --> } // EXH <-- @@ -84,21 +82,23 @@ class HttpPageLoader( */ override fun recycle() { super.recycle() - subscriptions.unsubscribe() + scope.cancel() queue.clear() // Cache current page list progress for online chapters to allow a faster reopen val pages = chapter.pages if (pages != null) { - Completable - .fromAction { + launchIO { + try { // Convert to pages without reader information val pagesToSave = pages.map { Page(it.index, it.url, it.imageUrl) } chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave) + } catch (e: Throwable) { + if (e is CancellationException) { + throw e + } } - .onErrorComplete() - .subscribeOn(Schedulers.io()) - .subscribe() + } } } @@ -233,81 +233,41 @@ class HttpPageLoader( } /** - * Returns an observable of the page with the downloaded image. + * Loads the page, retrieving the image URL and downloading the image if necessary. + * Downloaded images are stored in the chapter cache. * * @param page the page whose source image has to be downloaded. */ - private fun HttpSource.fetchImageFromCacheThenNet(page: ReaderPage): Observable { - return if (page.imageUrl.isNullOrEmpty()) { - getImageUrl(page).flatMap { getCachedImage(it) } - } else { - getCachedImage(page) + private suspend fun loadPage(page: ReaderPage) { + try { + if (page.imageUrl.isNullOrEmpty()) { + page.status = Page.State.LOAD_PAGE + page.imageUrl = source.fetchImageUrl(page).awaitSingle() + } + val imageUrl = page.imageUrl!! + + if (!chapterCache.isImageInCache(imageUrl)) { + page.status = Page.State.DOWNLOAD_IMAGE + val imageResponse = source.fetchImage(page, dataSaver).awaitSingle() + chapterCache.putImageToCache(imageUrl, imageResponse) + } + + page.stream = { chapterCache.getImageFile(imageUrl).inputStream() } + page.status = Page.State.READY + } catch (e: Throwable) { + page.status = Page.State.ERROR + if (e is CancellationException) { + throw e + } } } - private fun HttpSource.getImageUrl(page: ReaderPage): Observable { - page.status = Page.State.LOAD_PAGE - return fetchImageUrl(page) - .doOnError { page.status = Page.State.ERROR } - .onErrorReturn { null } - .doOnNext { page.imageUrl = it } - .map { page } - } - - /** - * Returns an observable of the page that gets the image from the chapter or fallbacks to - * network and copies it to the cache calling [cacheImage]. - * - * @param page the page. - */ - private fun HttpSource.getCachedImage(page: ReaderPage): Observable { - val imageUrl = page.imageUrl ?: return Observable.just(page) - - return Observable.just(page) - .flatMap { - if (!chapterCache.isImageInCache(imageUrl)) { - cacheImage(page) - } else { - Observable.just(page) - } - } - .doOnNext { - page.stream = { chapterCache.getImageFile(imageUrl).inputStream() } - page.status = Page.State.READY - } - .doOnError { page.status = Page.State.ERROR } - .onErrorReturn { page } - } - - /** - * Returns an observable of the page that downloads the image to [ChapterCache]. - * - * @param page the page. - */ - private fun HttpSource.cacheImage(page: ReaderPage): Observable { - page.status = Page.State.DOWNLOAD_IMAGE - return fetchImage(page, dataSaver) - .doOnNext { - chapterCache.putImageToCache(page.imageUrl!!, it) - } - .map { page } - } - // EXH --> fun boostPage(page: ReaderPage) { if (page.status == Page.State.QUEUE) { - subscriptions += Observable.just(page) - .concatMap { source.fetchImageFromCacheThenNet(it) } - .subscribeOn(Schedulers.io()) - .subscribe( - { - }, - { error -> - if (error !is InterruptedException) { - logcat(LogPriority.ERROR, error) - } - }, - ) + scope.launchIO { + loadPage(page) + } } } // EXH <--