diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt index af6fb5d10..6c3cf9ac3 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt @@ -23,6 +23,7 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.withTimeout import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit /** @@ -322,9 +323,9 @@ class DownloadCache( * Returns a new map containing only the key entries of [transform] that are not null. */ private inline fun Map.mapNotNullKeys(transform: (Map.Entry) -> R?): MutableMap { - val destination = LinkedHashMap() - forEach { element -> transform(element)?.let { destination[it] = element.value } } - return destination + val mutableMap = ConcurrentHashMap() + forEach { element -> transform(element)?.let { mutableMap[it] = element.value } } + return mutableMap } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt index 96ce16153..760111e9c 100755 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt @@ -78,13 +78,13 @@ class DownloadQueue( .startWith(getActiveDownloads()) .onBackpressureBuffer() - fun getStatusAsFlow(): Flow = getStatusObservable().asFlow() + fun statusFlow(): Flow = getStatusObservable().asFlow() private fun getUpdatedObservable(): Observable> = updatedRelay.onBackpressureBuffer() .startWith(Unit) .map { this } - fun getUpdatedAsFlow(): Flow> = getUpdatedObservable().asFlow() + fun updatedFlow(): Flow> = getUpdatedObservable().asFlow() private fun setPagesFor(download: Download) { if (download.status == Download.State.DOWNLOADED || download.status == Download.State.ERROR) { @@ -111,7 +111,7 @@ class DownloadQueue( .filter { it.status == Download.State.DOWNLOADING } } - fun getProgressAsFlow(): Flow = getProgressObservable().asFlow() + fun progressFlow(): Flow = getProgressObservable().asFlow() private fun setPagesSubject(pages: List?, subject: PublishSubject?) { pages?.forEach { it.setStatusSubject(subject) } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/SourcesPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/SourcesPresenter.kt index 27962fc2f..5853fec9d 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/SourcesPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/SourcesPresenter.kt @@ -20,7 +20,9 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.launchIn @@ -57,7 +59,7 @@ class SourcesPresenter( fun onCreate() { // SY --> combine( - getEnabledSources.subscribe(), + getEnabledSources.subscribe().debounce(500), // Avoid crashes due to LazyColumn rendering getSourceCategories.subscribe(), getShowLatest.subscribe(controllerMode), flowOf(controllerMode == SourcesController.Mode.CATALOGUE), diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt index 300e884dc..458c666c8 100755 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt @@ -34,7 +34,7 @@ class DownloadPresenter : BasePresenter() { super.onCreate(savedState) presenterScope.launch { - downloadQueue.getUpdatedAsFlow() + downloadQueue.updatedFlow() .catch { error -> logcat(LogPriority.ERROR, error) } .map { downloads -> downloads @@ -49,9 +49,9 @@ class DownloadPresenter : BasePresenter() { } } - fun getDownloadStatusFlow() = downloadQueue.getStatusAsFlow() + fun getDownloadStatusFlow() = downloadQueue.statusFlow() - fun getDownloadProgressFlow() = downloadQueue.getProgressAsFlow() + fun getDownloadProgressFlow() = downloadQueue.progressFlow() /** * Pauses the download queue. diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt index 3ea8051a7..2d8302a9c 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt @@ -50,6 +50,7 @@ import eu.kanade.domain.ui.UiPreferences import eu.kanade.tachiyomi.R import eu.kanade.tachiyomi.data.database.models.Track import eu.kanade.tachiyomi.data.database.models.toDomainManga +import eu.kanade.tachiyomi.data.download.DownloadCache import eu.kanade.tachiyomi.data.download.DownloadManager import eu.kanade.tachiyomi.data.download.model.Download import eu.kanade.tachiyomi.data.library.CustomMangaManager @@ -136,6 +137,7 @@ class MangaPresenter( private val trackManager: TrackManager = Injekt.get(), private val sourceManager: SourceManager = Injekt.get(), private val downloadManager: DownloadManager = Injekt.get(), + private val downloadCache: DownloadCache = Injekt.get(), private val getMangaAndChapters: GetMangaWithChapters = Injekt.get(), // SY --> private val uiPreferences: UiPreferences = Injekt.get(), @@ -174,9 +176,6 @@ class MangaPresenter( private val successState: MangaScreenState.Success? get() = state.value as? MangaScreenState.Success - private var observeDownloadsStatusJob: Job? = null - private var observeDownloadsPageJob: Job? = null - private var _trackList: List = emptyList() val trackList get() = _trackList @@ -264,7 +263,7 @@ class MangaPresenter( } // For UI changes - presenterScope.launch { + presenterScope.launchIO { getMangaAndChapters.subscribe(mangaId) .distinctUntilChanged() // SY --> @@ -332,6 +331,7 @@ class MangaPresenter( ) { state, mergedData -> state.copy(mergedData = mergedData) } + .combine(downloadCache.changes) { state, _ -> state } // SY <-- .collectLatest { (manga, chapters /* SY --> */, flatMetadata, mergedData /* SY <-- */) -> val chapterItems = chapters.toChapterItemsParams(manga /* SY --> */, mergedData /* SY <-- */) @@ -345,20 +345,11 @@ class MangaPresenter( // SY <-- ) } - - observeDownloads() } } - basePreferences.incognitoMode() - .asHotFlow { incognitoMode = it } - .launchIn(presenterScope) + observeDownloads() - basePreferences.downloadedOnly() - .asHotFlow { downloadedOnlyMode = it } - .launchIn(presenterScope) - - // This block runs once on create presenterScope.launchIO { val manga = getMangaAndChapters.awaitManga(mangaId) // SY --> @@ -382,7 +373,7 @@ class MangaPresenter( val needRefreshInfo = !manga.initialized val needRefreshChapter = chapters.isEmpty() - // Show what we have earlier. + // Show what we have earlier _state.update { val source = sourceManager.getOrStub(manga.source) MangaScreenState.Success( @@ -427,6 +418,14 @@ class MangaPresenter( // Initial loading finished updateSuccessState { it.copy(isRefreshingData = false) } } + + basePreferences.incognitoMode() + .asHotFlow { incognitoMode = it } + .launchIn(presenterScope) + + basePreferences.downloadedOnly() + .asHotFlow { downloadedOnlyMode = it } + .launchIn(presenterScope) } fun fetchAllFromSource(manualFetch: Boolean = true) { @@ -901,9 +900,8 @@ class MangaPresenter( val isMergedSource = source is MergedSource val mergedIds = if (isMergedSource) successState?.mergedData?.manga?.keys.orEmpty() else emptySet() // SY <-- - observeDownloadsStatusJob?.cancel() - observeDownloadsStatusJob = presenterScope.launchIO { - downloadManager.queue.getStatusAsFlow() + presenterScope.launchIO { + downloadManager.queue.statusFlow() .filter { /* SY --> */ if (isMergedSource) it.manga.id in mergedIds else /* SY <-- */ it.manga.id == successState?.manga?.id } .catch { error -> logcat(LogPriority.ERROR, error) } .collect { @@ -913,9 +911,8 @@ class MangaPresenter( } } - observeDownloadsPageJob?.cancel() - observeDownloadsPageJob = presenterScope.launchIO { - downloadManager.queue.getProgressAsFlow() + presenterScope.launchIO { + downloadManager.queue.progressFlow() .filter { /* SY --> */ if (isMergedSource) it.manga.id in mergedIds else /* SY <-- */ it.manga.id == successState?.manga?.id } .catch { error -> logcat(LogPriority.ERROR, error) } .collect { diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt index a1dbd8204..992abfa8d 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt @@ -41,7 +41,7 @@ class MorePresenter( presenterScope.launchIO { combine( DownloadService.isRunning, - downloadManager.queue.getUpdatedAsFlow(), + downloadManager.queue.updatedFlow(), ) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) } .collectLatest { (isDownloading, downloadQueueSize) -> val pendingDownloadExists = downloadQueueSize != 0 diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt index 85ab87d18..656620e73 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt @@ -18,6 +18,7 @@ import eu.kanade.domain.updates.model.UpdatesWithRelations import eu.kanade.presentation.components.ChapterDownloadAction import eu.kanade.presentation.updates.UpdatesState import eu.kanade.presentation.updates.UpdatesStateImpl +import eu.kanade.tachiyomi.data.download.DownloadCache import eu.kanade.tachiyomi.data.download.DownloadManager import eu.kanade.tachiyomi.data.download.DownloadService import eu.kanade.tachiyomi.data.download.model.Download @@ -27,11 +28,12 @@ import eu.kanade.tachiyomi.util.lang.launchIO import eu.kanade.tachiyomi.util.lang.launchNonCancellable import eu.kanade.tachiyomi.util.lang.withUIContext import eu.kanade.tachiyomi.util.system.logcat -import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch @@ -50,6 +52,7 @@ class UpdatesPresenter( private val getManga: GetManga = Injekt.get(), private val sourceManager: SourceManager = Injekt.get(), private val downloadManager: DownloadManager = Injekt.get(), + private val downloadCache: DownloadCache = Injekt.get(), private val getChapter: GetChapter = Injekt.get(), basePreferences: BasePreferences = Injekt.get(), uiPreferences: UiPreferences = Injekt.get(), @@ -70,12 +73,6 @@ class UpdatesPresenter( // First and last selected index in list private val selectedPositions: Array = arrayOf(-1, -1) - /** - * Subscription to observe download status changes. - */ - private var observeDownloadsStatusJob: Job? = null - private var observeDownloadsPageJob: Job? = null - override fun onCreate(savedState: Bundle?) { super.onCreate(savedState) @@ -86,10 +83,11 @@ class UpdatesPresenter( add(Calendar.MONTH, -3) } - observeDownloads() - - getUpdates.subscribe(calendar) - .distinctUntilChanged() + combine( + getUpdates.subscribe(calendar).distinctUntilChanged(), + downloadCache.changes, + ) { updates, _ -> updates } + .debounce(500) // Avoid crashes due to LazyColumn rendering .catch { logcat(LogPriority.ERROR, it) _events.send(Event.InternalError) @@ -99,6 +97,26 @@ class UpdatesPresenter( state.isLoading = false } } + + presenterScope.launchIO { + downloadManager.queue.statusFlow() + .catch { error -> logcat(LogPriority.ERROR, error) } + .collect { + withUIContext { + updateDownloadState(it) + } + } + } + + presenterScope.launchIO { + downloadManager.queue.progressFlow() + .catch { error -> logcat(LogPriority.ERROR, error) } + .collect { + withUIContext { + updateDownloadState(it) + } + } + } } private fun List.toUpdateItems(): List { @@ -127,30 +145,6 @@ class UpdatesPresenter( } } - private suspend fun observeDownloads() { - observeDownloadsStatusJob?.cancel() - observeDownloadsStatusJob = presenterScope.launchIO { - downloadManager.queue.getStatusAsFlow() - .catch { error -> logcat(LogPriority.ERROR, error) } - .collect { - withUIContext { - updateDownloadState(it) - } - } - } - - observeDownloadsPageJob?.cancel() - observeDownloadsPageJob = presenterScope.launchIO { - downloadManager.queue.getProgressAsFlow() - .catch { error -> logcat(LogPriority.ERROR, error) } - .collect { - withUIContext { - updateDownloadState(it) - } - } - } - } - /** * Update status of chapters. *