Convert Favorites sync to a StateFlow
This commit is contained in:
parent
9a2ed755b7
commit
ee18f94788
@ -51,18 +51,20 @@ import exh.favorites.FavoritesSyncStatus
|
||||
import exh.mangaDexSourceIds
|
||||
import exh.nHentaiSourceIds
|
||||
import exh.ui.LoaderManager
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.drop
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.sample
|
||||
import reactivecircus.flowbinding.android.view.clicks
|
||||
import reactivecircus.flowbinding.appcompat.queryTextChanges
|
||||
import reactivecircus.flowbinding.viewpager.pageSelections
|
||||
import rx.Subscription
|
||||
import rx.android.schedulers.AndroidSchedulers
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.time.ExperimentalTime
|
||||
import kotlin.time.milliseconds
|
||||
|
||||
class LibraryController(
|
||||
bundle: Bundle? = null,
|
||||
@ -145,7 +147,7 @@ class LibraryController(
|
||||
// Old sync status
|
||||
private var oldSyncStatus: FavoritesSyncStatus? = null
|
||||
// Favorites
|
||||
private var favoritesSyncSubscription: Subscription? = null
|
||||
private var favoritesSyncJob: Job? = null
|
||||
val loaderManager = LoaderManager()
|
||||
// <-- EH
|
||||
|
||||
@ -677,18 +679,19 @@ class LibraryController(
|
||||
}
|
||||
|
||||
// SY -->
|
||||
@OptIn(ExperimentalTime::class)
|
||||
override fun onAttach(view: View) {
|
||||
super.onAttach(view)
|
||||
|
||||
// --> EXH
|
||||
cleanupSyncState()
|
||||
favoritesSyncSubscription =
|
||||
favoritesSyncJob =
|
||||
presenter.favoritesSync.status
|
||||
.sample(100, TimeUnit.MILLISECONDS)
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribe {
|
||||
.sample(100.milliseconds)
|
||||
.onEach {
|
||||
updateSyncStatus(it)
|
||||
}
|
||||
.launchIn(scope)
|
||||
// <-- EXH
|
||||
}
|
||||
|
||||
@ -714,8 +717,8 @@ class LibraryController(
|
||||
|
||||
// --> EXH
|
||||
private fun cleanupSyncState() {
|
||||
favoritesSyncSubscription?.unsubscribe()
|
||||
favoritesSyncSubscription = null
|
||||
favoritesSyncJob?.cancel()
|
||||
favoritesSyncJob = null
|
||||
// Close sync status
|
||||
favSyncDialog?.dismiss()
|
||||
favSyncDialog = null
|
||||
@ -733,7 +736,6 @@ class LibraryController(
|
||||
favSyncDialog = buildDialog()
|
||||
?.title(R.string.favorites_syncing)
|
||||
?.cancelable(false)
|
||||
// ?.progress(true, 0)
|
||||
favSyncDialog?.show()
|
||||
}
|
||||
|
||||
@ -763,10 +765,10 @@ class LibraryController(
|
||||
?.cancelable(false)
|
||||
?.positiveButton(R.string.show_gallery) {
|
||||
openManga(status.manga)
|
||||
presenter.favoritesSync.status.onNext(FavoritesSyncStatus.Idle(activity!!))
|
||||
presenter.favoritesSync.status.value = FavoritesSyncStatus.Idle(activity!!)
|
||||
}
|
||||
?.negativeButton(android.R.string.ok) {
|
||||
presenter.favoritesSync.status.onNext(FavoritesSyncStatus.Idle(activity!!))
|
||||
presenter.favoritesSync.status.value = FavoritesSyncStatus.Idle(activity!!)
|
||||
}
|
||||
favSyncDialog?.show()
|
||||
}
|
||||
@ -779,7 +781,7 @@ class LibraryController(
|
||||
?.message(text = activity!!.getString(R.string.favorites_sync_error_string, status.message))
|
||||
?.cancelable(false)
|
||||
?.positiveButton(android.R.string.ok) {
|
||||
presenter.favoritesSync.status.onNext(FavoritesSyncStatus.Idle(activity!!))
|
||||
presenter.favoritesSync.status.value = FavoritesSyncStatus.Idle(activity!!)
|
||||
}
|
||||
favSyncDialog?.show()
|
||||
}
|
||||
@ -792,7 +794,7 @@ class LibraryController(
|
||||
?.message(text = activity!!.getString(R.string.favorites_sync_done_errors_message, status.message))
|
||||
?.cancelable(false)
|
||||
?.positiveButton(android.R.string.ok) {
|
||||
presenter.favoritesSync.status.onNext(FavoritesSyncStatus.Idle(activity!!))
|
||||
presenter.favoritesSync.status.value = FavoritesSyncStatus.Idle(activity!!)
|
||||
}
|
||||
favSyncDialog?.show()
|
||||
}
|
||||
|
@ -30,10 +30,10 @@ import exh.util.wifiManager
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import okhttp3.FormBody
|
||||
import okhttp3.Request
|
||||
import rx.subjects.BehaviorSubject
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
@ -61,7 +61,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
|
||||
private val logger = XLog.tag("EHFavSync").build()
|
||||
|
||||
val status: BehaviorSubject<FavoritesSyncStatus> = BehaviorSubject.create(FavoritesSyncStatus.Idle(context))
|
||||
val status: MutableStateFlow<FavoritesSyncStatus> = MutableStateFlow(FavoritesSyncStatus.Idle(context))
|
||||
|
||||
@Synchronized
|
||||
fun runSync() {
|
||||
@ -69,7 +69,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
return
|
||||
}
|
||||
|
||||
status.onNext(FavoritesSyncStatus.Initializing(context))
|
||||
status.value = FavoritesSyncStatus.Initializing(context)
|
||||
|
||||
scope.launch(Dispatchers.IO) { beginSync() }
|
||||
}
|
||||
@ -77,12 +77,12 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
private suspend fun beginSync() {
|
||||
// Check if logged in
|
||||
if (!prefs.enableExhentai().get()) {
|
||||
status.onNext(FavoritesSyncStatus.Error(context.getString(R.string.please_login)))
|
||||
status.value = FavoritesSyncStatus.Error(context.getString(R.string.please_login))
|
||||
return
|
||||
}
|
||||
|
||||
// Validate library state
|
||||
status.onNext(FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_verifying_library), context = context))
|
||||
status.value = FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_verifying_library), context = context)
|
||||
val libraryManga = db.getLibraryMangas().await()
|
||||
val seenManga = HashSet<Long>(libraryManga.size)
|
||||
libraryManga.forEach {
|
||||
@ -90,10 +90,8 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
|
||||
if (it.id in seenManga) {
|
||||
val inCategories = db.getCategoriesForManga(it).await()
|
||||
status.onNext(
|
||||
FavoritesSyncStatus.BadLibraryState
|
||||
.MangaInMultipleCategories(it, inCategories, context)
|
||||
)
|
||||
status.value = FavoritesSyncStatus.BadLibraryState.MangaInMultipleCategories(it, inCategories, context)
|
||||
|
||||
logger.w(context.getString(R.string.favorites_sync_manga_multiple_categories_error, it.id))
|
||||
return
|
||||
} else {
|
||||
@ -103,10 +101,10 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
|
||||
// Download remote favorites
|
||||
val favorites = try {
|
||||
status.onNext(FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_downloading), context = context))
|
||||
status.value = FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_downloading), context = context)
|
||||
exh.fetchFavorites()
|
||||
} catch (e: Exception) {
|
||||
status.onNext(FavoritesSyncStatus.Error(context.getString(R.string.favorites_sync_failed_to_featch)))
|
||||
status.value = FavoritesSyncStatus.Error(context.getString(R.string.favorites_sync_failed_to_featch))
|
||||
logger.e(context.getString(R.string.favorites_sync_could_not_fetch), e)
|
||||
return
|
||||
}
|
||||
@ -136,17 +134,17 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
storage.getRealm().use { realm ->
|
||||
realm.trans {
|
||||
db.inTransaction {
|
||||
status.onNext(FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_calculating_remote_changes), context = context))
|
||||
status.value = FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_calculating_remote_changes), context = context)
|
||||
val remoteChanges = storage.getChangedRemoteEntries(realm, favorites.first)
|
||||
val localChanges = if (prefs.exhReadOnlySync().get()) {
|
||||
null // Do not build local changes if they are not going to be applied
|
||||
} else {
|
||||
status.onNext(FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_calculating_local_changes), context = context))
|
||||
status.value = FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_calculating_local_changes), context = context)
|
||||
storage.getChangedDbEntries(realm)
|
||||
}
|
||||
|
||||
// Apply remote categories
|
||||
status.onNext(FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_syncing_category_names), context = context))
|
||||
status.value = FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_syncing_category_names), context = context)
|
||||
applyRemoteCategories(favorites.second)
|
||||
|
||||
// Apply change sets
|
||||
@ -155,7 +153,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
applyChangeSetToRemote(errorList, localChanges)
|
||||
}
|
||||
|
||||
status.onNext(FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_cleaning_up), context = context))
|
||||
status.value = FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_cleaning_up), context = context)
|
||||
storage.snapshotEntries(realm)
|
||||
}
|
||||
}
|
||||
@ -169,7 +167,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
logger.w(context.getString(R.string.favorites_sync_ignoring_exception), e)
|
||||
return
|
||||
} catch (e: Exception) {
|
||||
status.onNext(FavoritesSyncStatus.Error(context.getString(R.string.favorites_sync_unknown_error, e.message)))
|
||||
status.value = FavoritesSyncStatus.Error(context.getString(R.string.favorites_sync_unknown_error, e.message))
|
||||
logger.e(context.getString(R.string.favorites_sync_sync_error), e)
|
||||
return
|
||||
} finally {
|
||||
@ -188,9 +186,9 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
}
|
||||
|
||||
if (errorList.isEmpty()) {
|
||||
status.onNext(FavoritesSyncStatus.Idle(context))
|
||||
status.value = FavoritesSyncStatus.Idle(context)
|
||||
} else {
|
||||
status.onNext(FavoritesSyncStatus.CompleteWithErrors(errorList))
|
||||
status.value = FavoritesSyncStatus.CompleteWithErrors(errorList)
|
||||
}
|
||||
}
|
||||
|
||||
@ -258,7 +256,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
if (prefs.exhLenientSync().get()) {
|
||||
errorList += errorString
|
||||
} else {
|
||||
status.onNext(FavoritesSyncStatus.Error(errorString))
|
||||
status.value = FavoritesSyncStatus.Error(errorString)
|
||||
throw IgnoredException()
|
||||
}
|
||||
}
|
||||
@ -286,7 +284,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
private suspend fun applyChangeSetToRemote(errorList: MutableList<String>, changeSet: ChangeSet) {
|
||||
// Apply removals
|
||||
if (changeSet.removed.isNotEmpty()) {
|
||||
status.onNext(FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_removing_galleries, changeSet.removed.size), context = context))
|
||||
status.value = FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_removing_galleries, changeSet.removed.size), context = context)
|
||||
|
||||
val formBody = FormBody.Builder()
|
||||
.add("ddact", "delete")
|
||||
@ -308,7 +306,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
if (prefs.exhLenientSync().get()) {
|
||||
errorList += errorString
|
||||
} else {
|
||||
status.onNext(FavoritesSyncStatus.Error(errorString))
|
||||
status.value = FavoritesSyncStatus.Error(errorString)
|
||||
throw IgnoredException()
|
||||
}
|
||||
}
|
||||
@ -317,13 +315,11 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
// Apply additions
|
||||
throttleManager.resetThrottle()
|
||||
changeSet.added.forEachIndexed { index, it ->
|
||||
status.onNext(
|
||||
FavoritesSyncStatus.Processing(
|
||||
status.value = FavoritesSyncStatus.Processing(
|
||||
context.getString(R.string.favorites_sync_adding_to_remote, index + 1, changeSet.added.size),
|
||||
needWarnThrottle(),
|
||||
context
|
||||
)
|
||||
)
|
||||
|
||||
throttleManager.throttle()
|
||||
|
||||
@ -336,7 +332,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
|
||||
// Apply removals
|
||||
changeSet.removed.forEachIndexed { index, it ->
|
||||
status.onNext(FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_remove_from_local, index + 1, changeSet.removed.size), context = context))
|
||||
status.value = FavoritesSyncStatus.Processing(context.getString(R.string.favorites_sync_remove_from_local, index + 1, changeSet.removed.size), context = context)
|
||||
val url = it.getUrl()
|
||||
|
||||
// Consider both EX and EH sources
|
||||
@ -366,13 +362,11 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
// Apply additions
|
||||
throttleManager.resetThrottle()
|
||||
changeSet.added.forEachIndexed { index, it ->
|
||||
status.onNext(
|
||||
FavoritesSyncStatus.Processing(
|
||||
status.value = FavoritesSyncStatus.Processing(
|
||||
context.getString(R.string.favorites_sync_add_to_local, index + 1, changeSet.added.size),
|
||||
needWarnThrottle(),
|
||||
context
|
||||
)
|
||||
)
|
||||
|
||||
throttleManager.throttle()
|
||||
|
||||
@ -387,7 +381,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
|
||||
if (result is GalleryAddEvent.Fail) {
|
||||
if (result is GalleryAddEvent.Fail.NotFound) {
|
||||
XLog.e(context.getString(R.string.favorites_sync_remote_not_exist, it.getUrl()))
|
||||
XLog.tag("EHFavSync").enableStackTrace(2).e(context.getString(R.string.favorites_sync_remote_not_exist, it.getUrl()))
|
||||
// Skip this gallery, it no longer exists
|
||||
return@forEachIndexed
|
||||
}
|
||||
@ -400,7 +394,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
if (prefs.exhLenientSync().get()) {
|
||||
errorList += errorString
|
||||
} else {
|
||||
status.onNext(FavoritesSyncStatus.Error(errorString))
|
||||
status.value = FavoritesSyncStatus.Error(errorString)
|
||||
throw IgnoredException()
|
||||
}
|
||||
} else if (result is GalleryAddEvent.Success) {
|
||||
|
@ -6,6 +6,7 @@ import eu.kanade.tachiyomi.source.online.all.EHentai
|
||||
import exh.EH_SOURCE_ID
|
||||
import exh.EXH_SOURCE_ID
|
||||
import exh.metadata.metadata.EHentaiSearchMetadata
|
||||
import exh.util.executeOnIO
|
||||
import io.realm.Realm
|
||||
import io.realm.RealmConfiguration
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
@ -20,13 +21,13 @@ class LocalFavoritesStorage {
|
||||
|
||||
fun getRealm(): Realm = Realm.getInstance(realmConfig)
|
||||
|
||||
fun getChangedDbEntries(realm: Realm) =
|
||||
suspend fun getChangedDbEntries(realm: Realm) =
|
||||
getChangedEntries(
|
||||
realm,
|
||||
parseToFavoriteEntries(
|
||||
loadDbCategories(
|
||||
db.getFavoriteMangas()
|
||||
.executeAsBlocking()
|
||||
.executeOnIO()
|
||||
.asSequence()
|
||||
)
|
||||
)
|
||||
@ -37,22 +38,19 @@ class LocalFavoritesStorage {
|
||||
realm,
|
||||
parseToFavoriteEntries(
|
||||
entries.asSequence().map {
|
||||
Pair(
|
||||
it.fav,
|
||||
it.manga.apply {
|
||||
it.fav to it.manga.apply {
|
||||
favorite = true
|
||||
date_added = System.currentTimeMillis()
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
fun snapshotEntries(realm: Realm) {
|
||||
suspend fun snapshotEntries(realm: Realm) {
|
||||
val dbMangas = parseToFavoriteEntries(
|
||||
loadDbCategories(
|
||||
db.getFavoriteMangas()
|
||||
.executeAsBlocking()
|
||||
.executeOnIO()
|
||||
.asSequence()
|
||||
)
|
||||
)
|
||||
@ -100,19 +98,16 @@ class LocalFavoritesStorage {
|
||||
it.category == entry.category
|
||||
}
|
||||
|
||||
private fun loadDbCategories(manga: Sequence<Manga>): Sequence<Pair<Int, Manga>> {
|
||||
val dbCategories = db.getCategories().executeAsBlocking()
|
||||
private suspend fun loadDbCategories(manga: Sequence<Manga>): Sequence<Pair<Int, Manga>> {
|
||||
val dbCategories = db.getCategories().executeOnIO()
|
||||
|
||||
return manga.filter(this::validateDbManga).mapNotNull {
|
||||
val category = db.getCategoriesForManga(it).executeAsBlocking()
|
||||
|
||||
Pair(
|
||||
dbCategories.indexOf(
|
||||
category.firstOrNull()
|
||||
?: return@mapNotNull null
|
||||
),
|
||||
it
|
||||
)
|
||||
) to it
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user