Cleanup some merged manga code

This commit is contained in:
Jobobby04 2020-10-24 13:38:25 -04:00
parent 9a45891ed6
commit 01496ab34c
5 changed files with 20 additions and 15 deletions

View File

@ -19,6 +19,8 @@ import exh.MERGED_SOURCE_ID
import exh.merged.sql.models.MergedMangaReference import exh.merged.sql.models.MergedMangaReference
import exh.util.asFlow import exh.util.asFlow
import exh.util.await import exh.util.await
import exh.util.awaitSingle
import exh.util.awaitSingleOrNull
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
@ -28,10 +30,10 @@ import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.single import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.singleOrNull
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import okhttp3.Response import okhttp3.Response
import rx.Observable
import uy.kohesive.injekt.injectLazy import uy.kohesive.injekt.injectLazy
class MergedSource : SuspendHttpSource() { class MergedSource : SuspendHttpSource() {
@ -83,14 +85,13 @@ class MergedSource : SuspendHttpSource() {
} }
} }
fun getChaptersFromDB(manga: Manga, editScanlators: Boolean = false, dedupe: Boolean = true): Flow<List<Chapter>> { fun getChaptersFromDB(manga: Manga, editScanlators: Boolean = false, dedupe: Boolean = true): Observable<List<Chapter>> {
// TODO more chapter dedupe // TODO more chapter dedupe
return db.getChaptersByMergedMangaId(manga.id!!).asRxObservable() return db.getChaptersByMergedMangaId(manga.id!!).asRxObservable()
.asFlow()
.map { chapterList -> .map { chapterList ->
val mangaReferences = withContext(Dispatchers.IO) { db.getMergedMangaReferences(manga.id!!).await() } val mangaReferences = runBlocking(Dispatchers.IO) { db.getMergedMangaReferences(manga.id!!).await() } ?: emptyList()
val sources = mangaReferences.map { sourceManager.getOrStub(it.mangaSourceId) to it.mangaId }
if (editScanlators) { if (editScanlators) {
val sources = mangaReferences.map { sourceManager.getOrStub(it.mangaSourceId) to it.mangaId }
chapterList.onEach { chapter -> chapterList.onEach { chapter ->
val source = sources.firstOrNull { chapter.manga_id == it.second }?.first val source = sources.firstOrNull { chapter.manga_id == it.second }?.first
if (source != null) { if (source != null) {
@ -135,7 +136,7 @@ class MergedSource : SuspendHttpSource() {
fetchChaptersAndSync(manga, downloadChapters).collect() fetchChaptersAndSync(manga, downloadChapters).collect()
} }
emit( emit(
getChaptersFromDB(manga, editScanlators, dedupe).singleOrNull() ?: emptyList<Chapter>() getChaptersFromDB(manga, editScanlators, dedupe).awaitSingleOrNull() ?: emptyList<Chapter>()
) )
} }
} }
@ -171,7 +172,7 @@ class MergedSource : SuspendHttpSource() {
manga = Manga.create(reference.mangaSourceId).apply { manga = Manga.create(reference.mangaSourceId).apply {
url = reference.mangaUrl url = reference.mangaUrl
} }
manga.copyFrom(source.fetchMangaDetails(manga).asFlow().single()) manga.copyFrom(source.fetchMangaDetails(manga).awaitSingle())
try { try {
manga.id = db.insertManga(manga).await().insertedId() manga.id = db.insertManga(manga).await().insertedId()
reference.mangaId = manga.id reference.mangaId = manga.id

View File

@ -31,10 +31,10 @@ import exh.favorites.FavoritesSyncHelper
import exh.md.utils.FollowStatus import exh.md.utils.FollowStatus
import exh.md.utils.MdUtil import exh.md.utils.MdUtil
import exh.util.await import exh.util.await
import exh.util.awaitSingleOrNull
import exh.util.isLewd import exh.util.isLewd
import exh.util.nullIfBlank import exh.util.nullIfBlank
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.singleOrNull
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import rx.Observable import rx.Observable
import rx.Subscription import rx.Subscription
@ -502,7 +502,7 @@ class LibraryPresenter(
val chapter = db.getChapters(manga).await().minByOrNull { it.source_order } val chapter = db.getChapters(manga).await().minByOrNull { it.source_order }
if (chapter != null && !chapter.read) listOf(chapter) else emptyList() if (chapter != null && !chapter.read) listOf(chapter) else emptyList()
} else if (manga.source == MERGED_SOURCE_ID) { } else if (manga.source == MERGED_SOURCE_ID) {
(sourceManager.getOrStub(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.singleOrNull()?.filter { !it.read } ?: emptyList() (sourceManager.getOrStub(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.awaitSingleOrNull()?.filter { !it.read } ?: emptyList()
} else /* SY <-- */ db.getChapters(manga).executeAsBlocking() } else /* SY <-- */ db.getChapters(manga).executeAsBlocking()
.filter { !it.read } .filter { !it.read }
@ -557,7 +557,7 @@ class LibraryPresenter(
fun markReadStatus(mangas: List<Manga>, read: Boolean) { fun markReadStatus(mangas: List<Manga>, read: Boolean) {
mangas.forEach { manga -> mangas.forEach { manga ->
launchIO { launchIO {
val chapters = if (manga.source == MERGED_SOURCE_ID) (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.singleOrNull() ?: emptyList() else db.getChapters(manga).executeAsBlocking() val chapters = if (manga.source == MERGED_SOURCE_ID) (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.awaitSingleOrNull() ?: emptyList() else db.getChapters(manga).executeAsBlocking()
chapters.forEach { chapters.forEach {
it.read = read it.read = read
if (!read) { if (!read) {
@ -646,7 +646,7 @@ class LibraryPresenter(
// SY --> // SY -->
/** Returns first unread chapter of a manga */ /** Returns first unread chapter of a manga */
fun getFirstUnread(manga: Manga): Chapter? { fun getFirstUnread(manga: Manga): Chapter? {
val chapters = (if (manga.source == MERGED_SOURCE_ID) (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource).let { runBlocking { it?.getChaptersFromDB(manga)?.singleOrNull() } ?: emptyList() } else db.getChapters(manga).executeAsBlocking()) val chapters = (if (manga.source == MERGED_SOURCE_ID) (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource).let { runBlocking { it?.getChaptersFromDB(manga)?.awaitSingleOrNull() } ?: emptyList() } else db.getChapters(manga).executeAsBlocking())
return if (manga.source == EH_SOURCE_ID || manga.source == EXH_SOURCE_ID) { return if (manga.source == EH_SOURCE_ID || manga.source == EXH_SOURCE_ID) {
val chapter = chapters.sortedBy { it.source_order }.getOrNull(0) val chapter = chapters.sortedBy { it.source_order }.getOrNull(0)
if (chapter?.read == false) chapter else null if (chapter?.read == false) chapter else null

View File

@ -166,7 +166,7 @@ class MangaPresenter(
// Add the subscription that retrieves the chapters from the database, keeps subscribed to // Add the subscription that retrieves the chapters from the database, keeps subscribed to
// changes, and sends the list of chapters to the relay. // changes, and sends the list of chapters to the relay.
add( add(
(/* SY --> */if (source is MergedSource) source.getChaptersFromDB(manga, true, dedupe).asObservable() else /* SY <-- */ db.getChapters(manga).asRxObservable()) (/* SY --> */if (source is MergedSource) source.getChaptersFromDB(manga, true, dedupe) else /* SY <-- */ db.getChapters(manga).asRxObservable())
.map { chapters -> .map { chapters ->
// Convert every chapter to a model. // Convert every chapter to a model.
chapters.map { it.toModel() } chapters.map { it.toModel() }

View File

@ -35,10 +35,10 @@ import exh.EH_SOURCE_ID
import exh.EXH_SOURCE_ID import exh.EXH_SOURCE_ID
import exh.MERGED_SOURCE_ID import exh.MERGED_SOURCE_ID
import exh.md.utils.FollowStatus import exh.md.utils.FollowStatus
import exh.util.awaitSingleOrNull
import exh.util.defaultReaderType import exh.util.defaultReaderType
import exh.util.shouldDeleteChapters import exh.util.shouldDeleteChapters
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.singleOrNull
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import rx.Completable import rx.Completable
@ -103,7 +103,7 @@ class ReaderPresenter(
*/ */
private val chapterList by lazy { private val chapterList by lazy {
val manga = manga!! val manga = manga!!
val dbChapters = if (manga.source == MERGED_SOURCE_ID) runBlocking { (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.singleOrNull() ?: emptyList() } else db.getChapters(manga).executeAsBlocking() val dbChapters = if (manga.source == MERGED_SOURCE_ID) runBlocking { (sourceManager.get(MERGED_SOURCE_ID) as? MergedSource)?.getChaptersFromDB(manga)?.awaitSingleOrNull() ?: emptyList() } else db.getChapters(manga).executeAsBlocking()
val selectedChapter = dbChapters.find { it.id == chapterId } val selectedChapter = dbChapters.find { it.id == chapterId }
?: error("Requested chapter of id $chapterId not found in chapter list") ?: error("Requested chapter of id $chapterId not found in chapter list")

View File

@ -158,6 +158,10 @@ suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne() suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
suspend fun <T> Observable<T>.awaitSingleOrDefault(default: T): T = singleOrDefault(default).awaitOne()
suspend fun <T> Observable<T>.awaitSingleOrNull(): T? = singleOrDefault(null).awaitOne()
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont -> private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
cont.unsubscribeOnCancellation( cont.unsubscribeOnCancellation(