Convert E-Hentai Update helper to Flows
This commit is contained in:
parent
1a0109df1d
commit
c049ce9018
@ -53,7 +53,12 @@ import exh.util.asObservable
|
||||
import exh.util.await
|
||||
import exh.util.shouldDeleteChapters
|
||||
import exh.util.trimOrNull
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.NonCancellable
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.withContext
|
||||
import rx.Observable
|
||||
import rx.Subscription
|
||||
@ -77,6 +82,8 @@ class MangaPresenter(
|
||||
private val sourceManager: SourceManager = Injekt.get()
|
||||
) : BasePresenter<MangaController>() {
|
||||
|
||||
val scope = CoroutineScope(Job() + Dispatchers.IO)
|
||||
|
||||
/**
|
||||
* Subscription to update the manga from the source.
|
||||
*/
|
||||
@ -116,7 +123,7 @@ class MangaPresenter(
|
||||
|
||||
private val updateHelper: EHentaiUpdateHelper by injectLazy()
|
||||
|
||||
val redirectUserRelay = BehaviorRelay.create<EXHRedirect>()
|
||||
val redirectUserRelay: BehaviorRelay<EXHRedirect> = BehaviorRelay.create<EXHRedirect>()
|
||||
|
||||
data class EXHRedirect(val manga: Manga, val update: Boolean)
|
||||
|
||||
@ -185,25 +192,22 @@ class MangaPresenter(
|
||||
if (chapters.isNotEmpty() && (source.isEhBasedSource()) && DebugToggles.ENABLE_EXH_ROOT_REDIRECT.enabled) {
|
||||
// Check for gallery in library and accept manga with lowest id
|
||||
// Find chapters sharing same root
|
||||
add(
|
||||
updateHelper.findAcceptedRootAndDiscardOthers(manga.source, chapters)
|
||||
.subscribeOn(Schedulers.io())
|
||||
.subscribe { (acceptedChain, _) ->
|
||||
// Redirect if we are not the accepted root
|
||||
if (manga.id != acceptedChain.manga.id) {
|
||||
// Update if any of our chapters are not in accepted manga's chapters
|
||||
val ourChapterUrls = chapters.map { it.url }.toSet()
|
||||
val acceptedChapterUrls = acceptedChain.chapters.map { it.url }.toSet()
|
||||
val update = (ourChapterUrls - acceptedChapterUrls).isNotEmpty()
|
||||
redirectUserRelay.call(
|
||||
EXHRedirect(
|
||||
acceptedChain.manga,
|
||||
update
|
||||
)
|
||||
updateHelper.findAcceptedRootAndDiscardOthers(manga.source, chapters)
|
||||
.onEach { (acceptedChain, _) ->
|
||||
// Redirect if we are not the accepted root
|
||||
if (manga.id != acceptedChain.manga.id) {
|
||||
// Update if any of our chapters are not in accepted manga's chapters
|
||||
val ourChapterUrls = chapters.map { it.url }.toSet()
|
||||
val acceptedChapterUrls = acceptedChain.chapters.map { it.url }.toSet()
|
||||
val update = (ourChapterUrls - acceptedChapterUrls).isNotEmpty()
|
||||
redirectUserRelay.call(
|
||||
EXHRedirect(
|
||||
acceptedChain.manga,
|
||||
update
|
||||
)
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
}.launchIn(scope)
|
||||
}
|
||||
// SY <--
|
||||
}
|
||||
|
@ -8,8 +8,10 @@ import eu.kanade.tachiyomi.data.database.models.Manga
|
||||
import eu.kanade.tachiyomi.data.database.models.MangaCategory
|
||||
import exh.metadata.metadata.EHentaiSearchMetadata
|
||||
import exh.metadata.metadata.base.getFlatMetadataForManga
|
||||
import rx.Observable
|
||||
import rx.Single
|
||||
import exh.util.await
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
import java.io.File
|
||||
import java.util.Date
|
||||
@ -29,31 +31,25 @@ class EHentaiUpdateHelper(context: Context) {
|
||||
*
|
||||
* @return Triple<Accepted, Discarded, HasNew>
|
||||
*/
|
||||
fun findAcceptedRootAndDiscardOthers(sourceId: Long, chapters: List<Chapter>): Single<Triple<ChapterChain, List<ChapterChain>, Boolean>> {
|
||||
fun findAcceptedRootAndDiscardOthers(sourceId: Long, chapters: List<Chapter>): Flow<Triple<ChapterChain, List<ChapterChain>, Boolean>> {
|
||||
// Find other chains
|
||||
val chainsObservable = Observable.merge(
|
||||
chapters.map { chapter ->
|
||||
db.getChapters(chapter.url).asRxSingle().toObservable()
|
||||
val chainsFlow = chapters.asFlow()
|
||||
.map { chapter ->
|
||||
db.getChapters(chapter.url).await().mapNotNull { it.manga_id }.distinct()
|
||||
}
|
||||
).toList().map { allChapters ->
|
||||
allChapters.flatMap { innerChapters -> innerChapters.map { it.manga_id!! } }.distinct()
|
||||
}.flatMap { mangaIds ->
|
||||
Observable.merge(
|
||||
mangaIds.map { mangaId ->
|
||||
Single.zip(
|
||||
db.getManga(mangaId).asRxSingle(),
|
||||
db.getChapters(mangaId).asRxSingle()
|
||||
) { manga, chapters ->
|
||||
ChapterChain(manga, chapters)
|
||||
}.toObservable().filter {
|
||||
it.manga.source == sourceId
|
||||
.map { mangaIds ->
|
||||
mangaIds
|
||||
.mapNotNull { mangaId ->
|
||||
(db.getManga(mangaId).await() ?: return@mapNotNull null) to db.getChapters(mangaId).await()
|
||||
}
|
||||
}
|
||||
)
|
||||
}.toList()
|
||||
.map {
|
||||
ChapterChain(it.first, it.second)
|
||||
}
|
||||
.filter { it.manga.source == sourceId }
|
||||
}
|
||||
|
||||
// Accept oldest chain
|
||||
val chainsWithAccepted = chainsObservable.map { chains ->
|
||||
val chainsWithAccepted = chainsFlow.map { chains ->
|
||||
val acceptedChain = chains.minByOrNull { it.manga.id!! }!!
|
||||
|
||||
acceptedChain to chains
|
||||
@ -160,7 +156,7 @@ class EHentaiUpdateHelper(context: Context) {
|
||||
|
||||
Triple(newAccepted, toDiscard, new)
|
||||
} else Triple(accepted, emptyList(), false)
|
||||
}.toSingle()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,12 +29,12 @@ import exh.util.cancellable
|
||||
import exh.util.jobScheduler
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.cancelAndJoin
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.mapNotNull
|
||||
import kotlinx.coroutines.flow.single
|
||||
import kotlinx.coroutines.flow.toList
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
@ -228,7 +228,7 @@ class EHentaiUpdateWorker : JobService(), CoroutineScope {
|
||||
|
||||
// Find accepted root and discard others
|
||||
val (acceptedRoot, discardedRoots, hasNew) =
|
||||
updateHelper.findAcceptedRootAndDiscardOthers(manga.source, chapters).await()
|
||||
updateHelper.findAcceptedRootAndDiscardOthers(manga.source, chapters).single()
|
||||
|
||||
if ((new.isNotEmpty() && manga.id == acceptedRoot.manga.id) ||
|
||||
(hasNew && updatedManga.none { it.first.id == acceptedRoot.manga.id })
|
||||
|
Loading…
x
Reference in New Issue
Block a user