diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt
index 39d94eee7..d670fb7b7 100644
--- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt
+++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt
@@ -8,19 +8,23 @@ import eu.kanade.tachiyomi.source.online.HttpSource
 import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
 import eu.kanade.tachiyomi.ui.reader.model.ReaderPage
 import eu.kanade.tachiyomi.ui.reader.setting.ReaderPreferences
-import eu.kanade.tachiyomi.util.lang.plusAssign
-import eu.kanade.tachiyomi.util.system.logcat
+import eu.kanade.tachiyomi.util.lang.awaitSingle
+import eu.kanade.tachiyomi.util.lang.launchIO
 import exh.source.isEhBasedSource
 import exh.util.DataSaver
 import exh.util.DataSaver.Companion.fetchImage
 import kotlinx.coroutines.CancellationException
-import logcat.LogPriority
-import rx.Completable
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.SupervisorJob
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.flow.filter
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.runInterruptible
 import rx.Observable
 import rx.schedulers.Schedulers
 import rx.subjects.PublishSubject
 import rx.subjects.SerializedSubject
-import rx.subscriptions.CompositeSubscription
 import uy.kohesive.injekt.Injekt
 import uy.kohesive.injekt.api.get
 import java.util.concurrent.PriorityBlockingQueue
@@ -40,16 +44,13 @@ class HttpPageLoader(
     // SY <--
 ) : PageLoader() {
 
+    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
+
     /**
      * A queue used to manage requests one by one while allowing priorities.
      */
     private val queue = PriorityBlockingQueue<PriorityPage>()
 
-    /**
-     * Current active subscriptions.
-     */
-    private val subscriptions = CompositeSubscription()
-
     private val preloadSize = /* SY --> */ readerPreferences.preloadSize().get() // SY <--
 
     // SY -->
@@ -60,20 +61,17 @@ class HttpPageLoader(
         // EXH -->
         repeat(readerPreferences.readerThreads().get()) {
             // EXH <--
-            subscriptions += Observable.defer { Observable.just(queue.take().page) }
-                .filter { it.status == Page.State.QUEUE }
-                .concatMap { source.fetchImageFromCacheThenNet(it) }
-                .repeat()
-                .subscribeOn(Schedulers.io())
-                .subscribe(
-                    {
-                    },
-                    { error ->
-                        if (error !is InterruptedException) {
-                            logcat(LogPriority.ERROR, error)
-                        }
-                    },
-                )
+            scope.launchIO {
+                flow {
+                    while (true) {
+                        emit(runInterruptible { queue.take() }.page)
+                    }
+                }
+                    .filter { it.status == Page.State.QUEUE }
+                    .collect {
+                        loadPage(it)
+                    }
+            }
             // EXH -->
         }
         // EXH <--
@@ -84,21 +82,23 @@ class HttpPageLoader(
      */
     override fun recycle() {
         super.recycle()
-        subscriptions.unsubscribe()
+        scope.cancel()
         queue.clear()
 
         // Cache current page list progress for online chapters to allow a faster reopen
         val pages = chapter.pages
         if (pages != null) {
-            Completable
-                .fromAction {
+            launchIO {
+                try {
                     // Convert to pages without reader information
                     val pagesToSave = pages.map { Page(it.index, it.url, it.imageUrl) }
                     chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave)
+                } catch (e: Throwable) {
+                    if (e is CancellationException) {
+                        throw e
+                    }
                 }
-                .onErrorComplete()
-                .subscribeOn(Schedulers.io())
-                .subscribe()
+            }
         }
     }
 
@@ -233,81 +233,41 @@ class HttpPageLoader(
     }
 
     /**
-     * Returns an observable of the page with the downloaded image.
+     * Loads the page, retrieving the image URL and downloading the image if necessary.
+     * Downloaded images are stored in the chapter cache.
      *
      * @param page the page whose source image has to be downloaded.
      */
-    private fun HttpSource.fetchImageFromCacheThenNet(page: ReaderPage): Observable<ReaderPage> {
-        return if (page.imageUrl.isNullOrEmpty()) {
-            getImageUrl(page).flatMap { getCachedImage(it) }
-        } else {
-            getCachedImage(page)
+    private suspend fun loadPage(page: ReaderPage) {
+        try {
+            if (page.imageUrl.isNullOrEmpty()) {
+                page.status = Page.State.LOAD_PAGE
+                page.imageUrl = source.fetchImageUrl(page).awaitSingle()
+            }
+            val imageUrl = page.imageUrl!!
+
+            if (!chapterCache.isImageInCache(imageUrl)) {
+                page.status = Page.State.DOWNLOAD_IMAGE
+                val imageResponse = source.fetchImage(page, dataSaver).awaitSingle()
+                chapterCache.putImageToCache(imageUrl, imageResponse)
+            }
+
+            page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
+            page.status = Page.State.READY
+        } catch (e: Throwable) {
+            page.status = Page.State.ERROR
+            if (e is CancellationException) {
+                throw e
+            }
         }
     }
 
-    private fun HttpSource.getImageUrl(page: ReaderPage): Observable<ReaderPage> {
-        page.status = Page.State.LOAD_PAGE
-        return fetchImageUrl(page)
-            .doOnError { page.status = Page.State.ERROR }
-            .onErrorReturn { null }
-            .doOnNext { page.imageUrl = it }
-            .map { page }
-    }
-
-    /**
-     * Returns an observable of the page that gets the image from the chapter or fallbacks to
-     * network and copies it to the cache calling [cacheImage].
-     *
-     * @param page the page.
-     */
-    private fun HttpSource.getCachedImage(page: ReaderPage): Observable<ReaderPage> {
-        val imageUrl = page.imageUrl ?: return Observable.just(page)
-
-        return Observable.just(page)
-            .flatMap {
-                if (!chapterCache.isImageInCache(imageUrl)) {
-                    cacheImage(page)
-                } else {
-                    Observable.just(page)
-                }
-            }
-            .doOnNext {
-                page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
-                page.status = Page.State.READY
-            }
-            .doOnError { page.status = Page.State.ERROR }
-            .onErrorReturn { page }
-    }
-
-    /**
-     * Returns an observable of the page that downloads the image to [ChapterCache].
-     *
-     * @param page the page.
-     */
-    private fun HttpSource.cacheImage(page: ReaderPage): Observable<ReaderPage> {
-        page.status = Page.State.DOWNLOAD_IMAGE
-        return fetchImage(page, dataSaver)
-            .doOnNext {
-                chapterCache.putImageToCache(page.imageUrl!!, it)
-            }
-            .map { page }
-    }
-
     // EXH -->
     fun boostPage(page: ReaderPage) {
         if (page.status == Page.State.QUEUE) {
-            subscriptions += Observable.just(page)
-                .concatMap { source.fetchImageFromCacheThenNet(it) }
-                .subscribeOn(Schedulers.io())
-                .subscribe(
-                    {
-                    },
-                    { error ->
-                        if (error !is InterruptedException) {
-                            logcat(LogPriority.ERROR, error)
-                        }
-                    },
-                )
+            scope.launchIO {
+                loadPage(page)
+            }
         }
     }
     // EXH <--