diff --git a/app/src/main/java/eu/kanade/tachiyomi/source/online/all/MergedSource.kt b/app/src/main/java/eu/kanade/tachiyomi/source/online/all/MergedSource.kt index 804b37e2f..2a5017680 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/source/online/all/MergedSource.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/source/online/all/MergedSource.kt @@ -19,6 +19,8 @@ import exh.MERGED_SOURCE_ID import exh.merged.sql.models.MergedMangaReference import exh.util.asFlow import exh.util.await +import exh.util.awaitSingle +import exh.util.awaitSingleOrNull import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow @@ -28,10 +30,10 @@ import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.single -import kotlinx.coroutines.flow.singleOrNull +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import okhttp3.Response +import rx.Observable import uy.kohesive.injekt.injectLazy class MergedSource : SuspendHttpSource() { @@ -83,14 +85,13 @@ class MergedSource : SuspendHttpSource() { } } - fun getChaptersFromDB(manga: Manga, editScanlators: Boolean = false, dedupe: Boolean = true): Flow> { + fun getChaptersFromDB(manga: Manga, editScanlators: Boolean = false, dedupe: Boolean = true): Observable> { // TODO more chapter dedupe return db.getChaptersByMergedMangaId(manga.id!!).asRxObservable() - .asFlow() .map { chapterList -> - val mangaReferences = withContext(Dispatchers.IO) { db.getMergedMangaReferences(manga.id!!).await() } - val sources = mangaReferences.map { sourceManager.getOrStub(it.mangaSourceId) to it.mangaId } + val mangaReferences = runBlocking(Dispatchers.IO) { db.getMergedMangaReferences(manga.id!!).await() } ?: emptyList() if (editScanlators) { + val sources = mangaReferences.map { sourceManager.getOrStub(it.mangaSourceId) to it.mangaId } chapterList.onEach { chapter -> val source = sources.firstOrNull { chapter.manga_id == it.second }?.first if (source != null) { @@ -135,7 +136,7 @@ class MergedSource : SuspendHttpSource() { fetchChaptersAndSync(manga, downloadChapters).collect() } emit( - getChaptersFromDB(manga, editScanlators, dedupe).singleOrNull() ?: emptyList() + getChaptersFromDB(manga, editScanlators, dedupe).awaitSingleOrNull() ?: emptyList() ) } } @@ -171,7 +172,7 @@ class MergedSource : SuspendHttpSource() { manga = Manga.create(reference.mangaSourceId).apply { url = reference.mangaUrl } - manga.copyFrom(source.fetchMangaDetails(manga).asFlow().single()) + manga.copyFrom(source.fetchMangaDetails(manga).awaitSingle()) try { manga.id = db.insertManga(manga).await().insertedId() reference.mangaId = manga.id diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/library/LibraryPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/library/LibraryPresenter.kt index ed809b548..f7d251be0 100755 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/library/LibraryPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/library/LibraryPresenter.kt @@ -31,10 +31,10 @@ import exh.favorites.FavoritesSyncHelper import exh.md.utils.FollowStatus import exh.md.utils.MdUtil import exh.util.await +import exh.util.awaitSingleOrNull import exh.util.isLewd import exh.util.nullIfBlank import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.singleOrNull import kotlinx.coroutines.runBlocking import rx.Observable import rx.Subscription @@ -502,7 +502,7 @@ class LibraryPresenter( val chapter = db.getChapters(manga).await().minByOrNull { it.source_order } if (chapter != null && !chapter.read) listOf(chapter) else emptyList() } else if (manga.source == MERGED_SOURCE_ID) { - (sourceManager.getOrStub(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.singleOrNull()?.filter { !it.read } ?: emptyList() + (sourceManager.getOrStub(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.awaitSingleOrNull()?.filter { !it.read } ?: emptyList() } else /* SY <-- */ db.getChapters(manga).executeAsBlocking() .filter { !it.read } @@ -557,7 +557,7 @@ class LibraryPresenter( fun markReadStatus(mangas: List, read: Boolean) { mangas.forEach { manga -> launchIO { - val chapters = if (manga.source == MERGED_SOURCE_ID) (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.singleOrNull() ?: emptyList() else db.getChapters(manga).executeAsBlocking() + val chapters = if (manga.source == MERGED_SOURCE_ID) (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.awaitSingleOrNull() ?: emptyList() else db.getChapters(manga).executeAsBlocking() chapters.forEach { it.read = read if (!read) { @@ -646,7 +646,7 @@ class LibraryPresenter( // SY --> /** Returns first unread chapter of a manga */ fun getFirstUnread(manga: Manga): Chapter? { - val chapters = (if (manga.source == MERGED_SOURCE_ID) (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource).let { runBlocking { it?.getChaptersFromDB(manga)?.singleOrNull() } ?: emptyList() } else db.getChapters(manga).executeAsBlocking()) + val chapters = (if (manga.source == MERGED_SOURCE_ID) (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource).let { runBlocking { it?.getChaptersFromDB(manga)?.awaitSingleOrNull() } ?: emptyList() } else db.getChapters(manga).executeAsBlocking()) return if (manga.source == EH_SOURCE_ID || manga.source == EXH_SOURCE_ID) { val chapter = chapters.sortedBy { it.source_order }.getOrNull(0) if (chapter?.read == false) chapter else null diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt index 265c79113..9c8ff89b8 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt @@ -166,7 +166,7 @@ class MangaPresenter( // Add the subscription that retrieves the chapters from the database, keeps subscribed to // changes, and sends the list of chapters to the relay. add( - (/* SY --> */if (source is MergedSource) source.getChaptersFromDB(manga, true, dedupe).asObservable() else /* SY <-- */ db.getChapters(manga).asRxObservable()) + (/* SY --> */if (source is MergedSource) source.getChaptersFromDB(manga, true, dedupe) else /* SY <-- */ db.getChapters(manga).asRxObservable()) .map { chapters -> // Convert every chapter to a model. chapters.map { it.toModel() } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/ReaderPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/ReaderPresenter.kt index 14da640cb..e853bcada 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/ReaderPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/ReaderPresenter.kt @@ -35,10 +35,10 @@ import exh.EH_SOURCE_ID import exh.EXH_SOURCE_ID import exh.MERGED_SOURCE_ID import exh.md.utils.FollowStatus +import exh.util.awaitSingleOrNull import exh.util.defaultReaderType import exh.util.shouldDeleteChapters import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.singleOrNull import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import rx.Completable @@ -103,7 +103,7 @@ class ReaderPresenter( */ private val chapterList by lazy { val manga = manga!! - val dbChapters = if (manga.source == MERGED_SOURCE_ID) runBlocking { (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.singleOrNull() ?: emptyList() } else db.getChapters(manga).executeAsBlocking() + val dbChapters = if (manga.source == MERGED_SOURCE_ID) runBlocking { (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.awaitSingleOrNull() ?: emptyList() } else db.getChapters(manga).executeAsBlocking() val selectedChapter = dbChapters.find { it.id == chapterId } ?: error("Requested chapter of id $chapterId not found in chapter list") diff --git a/app/src/main/java/exh/util/RxUtil.kt b/app/src/main/java/exh/util/RxUtil.kt index 40903211d..b1b37a8fa 100644 --- a/app/src/main/java/exh/util/RxUtil.kt +++ b/app/src/main/java/exh/util/RxUtil.kt @@ -158,6 +158,10 @@ suspend fun Observable.awaitLast(): T = last().awaitOne() @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) suspend fun Observable.awaitSingle(): T = single().awaitOne() +suspend fun Observable.awaitSingleOrDefault(default: T): T = singleOrDefault(default).awaitOne() + +suspend fun Observable.awaitSingleOrNull(): T? = singleOrDefault(null).awaitOne() + @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) private suspend fun Observable.awaitOne(): T = suspendCancellableCoroutine { cont -> cont.unsubscribeOnCancellation(