Inline DownloadQueue into Downloader (#9159)
* Move statusFlow and progressFlow to DownloadManager * Inline DownloadQueue into Downloader * Move reorderQueue implementation to Downloader (cherry picked from commit b41565f8794e3865ceae914bd00b500dbd2abacf) # Conflicts: # app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadManager.kt # app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt # app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaScreenModel.kt
This commit is contained in:
parent
16a23f89a5
commit
0d3bbe4271
@ -5,12 +5,19 @@ import com.hippo.unifile.UniFile
|
||||
import eu.kanade.domain.download.service.DownloadPreferences
|
||||
import eu.kanade.tachiyomi.R
|
||||
import eu.kanade.tachiyomi.data.download.model.Download
|
||||
import eu.kanade.tachiyomi.data.download.model.DownloadQueue
|
||||
import eu.kanade.tachiyomi.source.Source
|
||||
import eu.kanade.tachiyomi.source.SourceManager
|
||||
import eu.kanade.tachiyomi.source.model.Page
|
||||
import eu.kanade.tachiyomi.util.storage.DiskUtil
|
||||
import exh.log.xLogE
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.drop
|
||||
import kotlinx.coroutines.flow.emitAll
|
||||
import kotlinx.coroutines.flow.flatMapLatest
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.merge
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import logcat.LogPriority
|
||||
import tachiyomi.core.util.lang.launchIO
|
||||
@ -45,11 +52,8 @@ class DownloadManager(
|
||||
*/
|
||||
private val pendingDeleter = DownloadPendingDeleter(context)
|
||||
|
||||
/**
|
||||
* Downloads queue, where the pending chapters are stored.
|
||||
*/
|
||||
val queue: DownloadQueue
|
||||
get() = downloader.queue
|
||||
val queueState
|
||||
get() = downloader.queueState
|
||||
|
||||
// For use by DownloadService only
|
||||
fun downloaderStart() = downloader.start()
|
||||
@ -88,7 +92,7 @@ class DownloadManager(
|
||||
* @param chapterId the chapter to check.
|
||||
*/
|
||||
fun getQueuedDownloadOrNull(chapterId: Long): Download? {
|
||||
return queue.find { it.chapter.id == chapterId }
|
||||
return queueState.value.find { it: Download -> it.chapter.id == chapterId }
|
||||
}
|
||||
|
||||
fun startDownloadNow(chapterId: Long?) {
|
||||
@ -96,7 +100,7 @@ class DownloadManager(
|
||||
val download = getQueuedDownloadOrNull(chapterId)
|
||||
// If not in queue try to start a new download
|
||||
val toAdd = download ?: runBlocking { Download.fromChapterId(chapterId) } ?: return
|
||||
val queue = queue.toMutableList()
|
||||
val queue = queueState.value.toMutableList()
|
||||
download?.let { queue.remove(it) }
|
||||
queue.add(0, toAdd)
|
||||
reorderQueue(queue)
|
||||
@ -115,21 +119,7 @@ class DownloadManager(
|
||||
* @param downloads value to set the download queue to
|
||||
*/
|
||||
fun reorderQueue(downloads: List<Download>) {
|
||||
val wasRunning = downloader.isRunning
|
||||
|
||||
if (downloads.isEmpty()) {
|
||||
downloader.clearQueue()
|
||||
downloader.stop()
|
||||
return
|
||||
}
|
||||
|
||||
downloader.pause()
|
||||
queue.clear()
|
||||
queue.addAll(downloads)
|
||||
|
||||
if (wasRunning) {
|
||||
downloader.start()
|
||||
}
|
||||
downloader.updateQueue(downloads)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -150,7 +140,7 @@ class DownloadManager(
|
||||
*/
|
||||
fun addDownloadsToStartOfQueue(downloads: List<Download>) {
|
||||
if (downloads.isEmpty()) return
|
||||
queue.toMutableList().apply {
|
||||
queueState.value.toMutableList().apply {
|
||||
addAll(0, downloads)
|
||||
reorderQueue(this)
|
||||
}
|
||||
@ -254,7 +244,7 @@ class DownloadManager(
|
||||
fun deleteManga(manga: Manga, source: Source, removeQueued: Boolean = true) {
|
||||
launchIO {
|
||||
if (removeQueued) {
|
||||
queue.remove(manga)
|
||||
downloader.removeFromQueue(manga)
|
||||
}
|
||||
provider.findMangaDir(/* SY --> */ manga.ogTitle /* SY <-- */, source)?.delete()
|
||||
cache.removeManga(manga)
|
||||
@ -274,12 +264,12 @@ class DownloadManager(
|
||||
downloader.pause()
|
||||
}
|
||||
|
||||
queue.remove(chapters)
|
||||
downloader.removeFromQueue(chapters)
|
||||
|
||||
if (wasRunning) {
|
||||
if (queue.isEmpty()) {
|
||||
if (queueState.value.isEmpty()) {
|
||||
downloader.stop()
|
||||
} else if (queue.isNotEmpty()) {
|
||||
} else if (queueState.value.isNotEmpty()) {
|
||||
downloader.start()
|
||||
}
|
||||
}
|
||||
@ -430,6 +420,35 @@ class DownloadManager(
|
||||
}
|
||||
}
|
||||
|
||||
fun statusFlow(): Flow<Download> = queueState
|
||||
.flatMapLatest { downloads ->
|
||||
downloads
|
||||
.map { download ->
|
||||
download.statusFlow.drop(1).map { download }
|
||||
}
|
||||
.merge()
|
||||
}
|
||||
.onStart {
|
||||
emitAll(
|
||||
queueState.value.filter { download -> download.status == Download.State.DOWNLOADING }.asFlow(),
|
||||
)
|
||||
}
|
||||
|
||||
fun progressFlow(): Flow<Download> = queueState
|
||||
.flatMapLatest { downloads ->
|
||||
downloads
|
||||
.map { download ->
|
||||
download.progressFlow.drop(1).map { download }
|
||||
}
|
||||
.merge()
|
||||
}
|
||||
.onStart {
|
||||
emitAll(
|
||||
queueState.value.filter { download -> download.status == Download.State.DOWNLOADING }
|
||||
.asFlow(),
|
||||
)
|
||||
}
|
||||
|
||||
fun renameMangaDir(oldTitle: String, newTitle: String, source: Long) {
|
||||
val sourceDir = provider.findSourceDir(sourceManager.getOrStub(source)) ?: return
|
||||
val mangaDir = sourceDir.findFile(DiskUtil.buildValidFilename(oldTitle), true) ?: return
|
||||
|
@ -12,7 +12,6 @@ import eu.kanade.domain.source.service.SourcePreferences
|
||||
import eu.kanade.tachiyomi.R
|
||||
import eu.kanade.tachiyomi.data.cache.ChapterCache
|
||||
import eu.kanade.tachiyomi.data.download.model.Download
|
||||
import eu.kanade.tachiyomi.data.download.model.DownloadQueue
|
||||
import eu.kanade.tachiyomi.data.library.LibraryUpdateNotifier
|
||||
import eu.kanade.tachiyomi.data.notification.NotificationHandler
|
||||
import eu.kanade.tachiyomi.source.SourceManager
|
||||
@ -28,12 +27,15 @@ import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.flatMapMerge
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.flowOn
|
||||
import kotlinx.coroutines.flow.retryWhen
|
||||
import kotlinx.coroutines.flow.update
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import logcat.LogPriority
|
||||
import nl.adaptivity.xmlutil.serialization.XML
|
||||
@ -62,7 +64,7 @@ import java.util.zip.ZipOutputStream
|
||||
/**
|
||||
* This class is the one in charge of downloading chapters.
|
||||
*
|
||||
* Its [queue] contains the list of chapters to download. In order to download them, the downloader
|
||||
* Its queue contains the list of chapters to download. In order to download them, the downloader
|
||||
* subscription must be running and the list of chapters must be sent to them by [downloadsRelay].
|
||||
*
|
||||
* The queue manipulation must be done in one thread (currently the main thread) to avoid unexpected
|
||||
@ -94,7 +96,8 @@ class Downloader(
|
||||
/**
|
||||
* Queue where active downloads are kept.
|
||||
*/
|
||||
val queue = DownloadQueue(store)
|
||||
val _queueState = MutableStateFlow<List<Download>>(emptyList())
|
||||
val queueState = _queueState.asStateFlow()
|
||||
|
||||
/**
|
||||
* Notifier for the downloader state and progress.
|
||||
@ -126,7 +129,7 @@ class Downloader(
|
||||
init {
|
||||
launchNow {
|
||||
val chapters = async { store.restore() }
|
||||
queue.addAll(chapters.await())
|
||||
addAllToQueue(chapters.await())
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,13 +140,13 @@ class Downloader(
|
||||
* @return true if the downloader is started, false otherwise.
|
||||
*/
|
||||
fun start(): Boolean {
|
||||
if (subscription != null || queue.isEmpty()) {
|
||||
if (subscription != null || queueState.value.isEmpty()) {
|
||||
return false
|
||||
}
|
||||
|
||||
initializeSubscription()
|
||||
|
||||
val pending = queue.filter { it.status != Download.State.DOWNLOADED }
|
||||
val pending = queueState.value.filter { it: Download -> it.status != Download.State.DOWNLOADED }
|
||||
pending.forEach { if (it.status != Download.State.QUEUE) it.status = Download.State.QUEUE }
|
||||
|
||||
isPaused = false
|
||||
@ -157,7 +160,7 @@ class Downloader(
|
||||
*/
|
||||
fun stop(reason: String? = null) {
|
||||
destroySubscription()
|
||||
queue
|
||||
queueState.value
|
||||
.filter { it.status == Download.State.DOWNLOADING }
|
||||
.forEach { it.status = Download.State.ERROR }
|
||||
|
||||
@ -166,7 +169,7 @@ class Downloader(
|
||||
return
|
||||
}
|
||||
|
||||
if (isPaused && queue.isNotEmpty()) {
|
||||
if (isPaused && queueState.value.isNotEmpty()) {
|
||||
notifier.onPaused()
|
||||
} else {
|
||||
notifier.onComplete()
|
||||
@ -185,7 +188,7 @@ class Downloader(
|
||||
*/
|
||||
fun pause() {
|
||||
destroySubscription()
|
||||
queue
|
||||
queueState.value
|
||||
.filter { it.status == Download.State.DOWNLOADING }
|
||||
.forEach { it.status = Download.State.QUEUE }
|
||||
isPaused = true
|
||||
@ -197,7 +200,7 @@ class Downloader(
|
||||
fun clearQueue() {
|
||||
destroySubscription()
|
||||
|
||||
queue.clear()
|
||||
_clearQueue()
|
||||
notifier.dismissProgress()
|
||||
}
|
||||
|
||||
@ -256,7 +259,7 @@ class Downloader(
|
||||
}
|
||||
|
||||
val source = sourceManager.get(manga.source) as? HttpSource ?: return@launchIO
|
||||
val wasEmpty = queue.isEmpty()
|
||||
val wasEmpty = queueState.value.isEmpty()
|
||||
// Called in background thread, the operation can be slow with SAF.
|
||||
val chaptersWithoutDir = async {
|
||||
chapters
|
||||
@ -269,12 +272,12 @@ class Downloader(
|
||||
// Runs in main thread (synchronization needed).
|
||||
val chaptersToQueue = chaptersWithoutDir.await()
|
||||
// Filter out those already enqueued.
|
||||
.filter { chapter -> queue.none { it.chapter.id == chapter.id } }
|
||||
.filter { chapter -> queueState.value.none { it: Download -> it.chapter.id == chapter.id } }
|
||||
// Create a download for each one.
|
||||
.map { Download(source, manga, it) }
|
||||
|
||||
if (chaptersToQueue.isNotEmpty()) {
|
||||
queue.addAll(chaptersToQueue)
|
||||
addAllToQueue(chaptersToQueue)
|
||||
|
||||
if (isRunning) {
|
||||
// Send the list of downloads to the downloader.
|
||||
@ -283,8 +286,8 @@ class Downloader(
|
||||
|
||||
// Start downloader if needed
|
||||
if (autoStart && wasEmpty) {
|
||||
val queuedDownloads = queue.count { it.source !is UnmeteredSource }
|
||||
val maxDownloadsFromSource = queue
|
||||
val queuedDownloads = queueState.value.count { it: Download -> it.source !is UnmeteredSource }
|
||||
val maxDownloadsFromSource = queueState.value
|
||||
.groupBy { it.source }
|
||||
.filterKeys { it !is UnmeteredSource }
|
||||
.maxOfOrNull { it.value.size }
|
||||
@ -648,7 +651,7 @@ class Downloader(
|
||||
// Delete successful downloads from queue
|
||||
if (download.status == Download.State.DOWNLOADED) {
|
||||
// Remove downloaded chapter from queue
|
||||
queue.remove(download)
|
||||
removeFromQueue(download)
|
||||
}
|
||||
if (areAllDownloadsFinished()) {
|
||||
stop()
|
||||
@ -659,7 +662,67 @@ class Downloader(
|
||||
* Returns true if all the queued downloads are in DOWNLOADED or ERROR state.
|
||||
*/
|
||||
private fun areAllDownloadsFinished(): Boolean {
|
||||
return queue.none { it.status.value <= Download.State.DOWNLOADING.value }
|
||||
return queueState.value.none { it: Download -> it.status.value <= Download.State.DOWNLOADING.value }
|
||||
}
|
||||
|
||||
fun addAllToQueue(downloads: List<Download>) {
|
||||
_queueState.update {
|
||||
downloads.forEach { download ->
|
||||
download.status = Download.State.QUEUE
|
||||
}
|
||||
store.addAll(downloads)
|
||||
it + downloads
|
||||
}
|
||||
}
|
||||
|
||||
fun removeFromQueue(download: Download) {
|
||||
_queueState.update {
|
||||
store.remove(download)
|
||||
if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
|
||||
download.status = Download.State.NOT_DOWNLOADED
|
||||
}
|
||||
it - download
|
||||
}
|
||||
}
|
||||
|
||||
fun removeFromQueue(chapters: List<Chapter>) {
|
||||
chapters.forEach { chapter ->
|
||||
queueState.value.find { it.chapter.id == chapter.id }?.let { removeFromQueue(it) }
|
||||
}
|
||||
}
|
||||
|
||||
fun removeFromQueue(manga: Manga) {
|
||||
queueState.value.filter { it.manga.id == manga.id }.forEach { removeFromQueue(it) }
|
||||
}
|
||||
|
||||
fun _clearQueue() {
|
||||
_queueState.update {
|
||||
it.forEach { download ->
|
||||
if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
|
||||
download.status = Download.State.NOT_DOWNLOADED
|
||||
}
|
||||
}
|
||||
store.clear()
|
||||
emptyList()
|
||||
}
|
||||
}
|
||||
|
||||
fun updateQueue(downloads: List<Download>) {
|
||||
val wasRunning = isRunning
|
||||
|
||||
if (downloads.isEmpty()) {
|
||||
clearQueue()
|
||||
stop()
|
||||
return
|
||||
}
|
||||
|
||||
pause()
|
||||
_clearQueue()
|
||||
addAllToQueue(downloads)
|
||||
|
||||
if (wasRunning) {
|
||||
start()
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
@ -1,99 +0,0 @@
|
||||
package eu.kanade.tachiyomi.data.download.model
|
||||
|
||||
import eu.kanade.tachiyomi.data.download.DownloadStore
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.drop
|
||||
import kotlinx.coroutines.flow.emitAll
|
||||
import kotlinx.coroutines.flow.flatMapLatest
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.merge
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlinx.coroutines.flow.update
|
||||
import tachiyomi.domain.chapter.model.Chapter
|
||||
import tachiyomi.domain.manga.model.Manga
|
||||
|
||||
class DownloadQueue(
|
||||
private val store: DownloadStore,
|
||||
) {
|
||||
private val _state = MutableStateFlow<List<Download>>(emptyList())
|
||||
val state = _state.asStateFlow()
|
||||
|
||||
fun addAll(downloads: List<Download>) {
|
||||
_state.update {
|
||||
downloads.forEach { download ->
|
||||
download.status = Download.State.QUEUE
|
||||
}
|
||||
store.addAll(downloads)
|
||||
it + downloads
|
||||
}
|
||||
}
|
||||
|
||||
fun remove(download: Download) {
|
||||
_state.update {
|
||||
store.remove(download)
|
||||
if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
|
||||
download.status = Download.State.NOT_DOWNLOADED
|
||||
}
|
||||
it - download
|
||||
}
|
||||
}
|
||||
|
||||
fun remove(chapter: Chapter) {
|
||||
_state.value.find { it.chapter.id == chapter.id }?.let { remove(it) }
|
||||
}
|
||||
|
||||
fun remove(chapters: List<Chapter>) {
|
||||
chapters.forEach(::remove)
|
||||
}
|
||||
|
||||
fun remove(manga: Manga) {
|
||||
_state.value.filter { it.manga.id == manga.id }.forEach { remove(it) }
|
||||
}
|
||||
|
||||
fun clear() {
|
||||
_state.update {
|
||||
it.forEach { download ->
|
||||
if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
|
||||
download.status = Download.State.NOT_DOWNLOADED
|
||||
}
|
||||
}
|
||||
store.clear()
|
||||
emptyList()
|
||||
}
|
||||
}
|
||||
|
||||
fun statusFlow(): Flow<Download> = state
|
||||
.flatMapLatest { downloads ->
|
||||
downloads
|
||||
.map { download ->
|
||||
download.statusFlow.drop(1).map { download }
|
||||
}
|
||||
.merge()
|
||||
}
|
||||
.onStart { emitAll(getActiveDownloads()) }
|
||||
|
||||
fun progressFlow(): Flow<Download> = state
|
||||
.flatMapLatest { downloads ->
|
||||
downloads
|
||||
.map { download ->
|
||||
download.progressFlow.drop(1).map { download }
|
||||
}
|
||||
.merge()
|
||||
}
|
||||
.onStart { emitAll(getActiveDownloads()) }
|
||||
|
||||
private fun getActiveDownloads(): Flow<Download> =
|
||||
_state.value.filter { download -> download.status == Download.State.DOWNLOADING }.asFlow()
|
||||
|
||||
fun count(predicate: (Download) -> Boolean) = _state.value.count(predicate)
|
||||
fun filter(predicate: (Download) -> Boolean) = _state.value.filter(predicate)
|
||||
fun find(predicate: (Download) -> Boolean) = _state.value.find(predicate)
|
||||
fun <K> groupBy(keySelector: (Download) -> K) = _state.value.groupBy(keySelector)
|
||||
fun isEmpty() = _state.value.isEmpty()
|
||||
fun isNotEmpty() = _state.value.isNotEmpty()
|
||||
fun none(predicate: (Download) -> Boolean) = _state.value.none(predicate)
|
||||
fun toMutableList() = _state.value.toMutableList()
|
||||
}
|
@ -111,7 +111,7 @@ class DownloadQueueScreenModel(
|
||||
|
||||
init {
|
||||
coroutineScope.launch {
|
||||
downloadManager.queue.state
|
||||
downloadManager.queueState
|
||||
.map { downloads ->
|
||||
downloads
|
||||
.groupBy { it.source }
|
||||
@ -136,8 +136,8 @@ class DownloadQueueScreenModel(
|
||||
val isDownloaderRunning
|
||||
get() = downloadManager.isDownloaderRunning
|
||||
|
||||
fun getDownloadStatusFlow() = downloadManager.queue.statusFlow()
|
||||
fun getDownloadProgressFlow() = downloadManager.queue.progressFlow()
|
||||
fun getDownloadStatusFlow() = downloadManager.statusFlow()
|
||||
fun getDownloadProgressFlow() = downloadManager.progressFlow()
|
||||
|
||||
fun startDownloads() {
|
||||
downloadManager.startDownloads()
|
||||
|
@ -630,7 +630,7 @@ class LibraryScreenModel(
|
||||
.forEach ab@{ (mangaId, chapters) ->
|
||||
val mergedManga = mergedMangas[mangaId] ?: return@ab
|
||||
val downloadChapters = chapters.fastFilterNot { chapter ->
|
||||
downloadManager.queue.state.value.fastAny { chapter.id == it.chapter.id } ||
|
||||
downloadManager.queueState.value.fastAny { chapter.id == it.chapter.id } ||
|
||||
downloadManager.isChapterDownloaded(
|
||||
chapter.name,
|
||||
chapter.scanlator,
|
||||
|
@ -857,7 +857,7 @@ class MangaInfoScreenModel(
|
||||
val mergedIds = if (isMergedSource) successState?.mergedData?.manga?.keys.orEmpty() else emptySet()
|
||||
// SY <--
|
||||
coroutineScope.launchIO {
|
||||
downloadManager.queue.statusFlow()
|
||||
downloadManager.statusFlow()
|
||||
.filter { /* SY --> */ if (isMergedSource) it.manga.id in mergedIds else /* SY <-- */ it.manga.id == successState?.manga?.id }
|
||||
.catch { error -> logcat(LogPriority.ERROR, error) }
|
||||
.collect {
|
||||
@ -868,7 +868,7 @@ class MangaInfoScreenModel(
|
||||
}
|
||||
|
||||
coroutineScope.launchIO {
|
||||
downloadManager.queue.progressFlow()
|
||||
downloadManager.progressFlow()
|
||||
.filter { /* SY --> */ if (isMergedSource) it.manga.id in mergedIds else /* SY <-- */ it.manga.id == successState?.manga?.id }
|
||||
.catch { error -> logcat(LogPriority.ERROR, error) }
|
||||
.collect {
|
||||
|
@ -115,7 +115,7 @@ private class MoreScreenModel(
|
||||
coroutineScope.launchIO {
|
||||
combine(
|
||||
downloadManager.isDownloaderRunning,
|
||||
downloadManager.queue.state,
|
||||
downloadManager.queueState,
|
||||
) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) }
|
||||
.collectLatest { (isDownloading, downloadQueueSize) ->
|
||||
val pendingDownloadExists = downloadQueueSize != 0
|
||||
|
@ -109,7 +109,7 @@ class UpdatesScreenModel(
|
||||
}
|
||||
|
||||
coroutineScope.launchIO {
|
||||
merge(downloadManager.queue.statusFlow(), downloadManager.queue.progressFlow())
|
||||
merge(downloadManager.statusFlow(), downloadManager.progressFlow())
|
||||
.catch { logcat(LogPriority.ERROR, it) }
|
||||
.collect(this@UpdatesScreenModel::updateDownloadState)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user