Remove usage of PublishRelay in DownloadQueue

(cherry picked from commit 37118088d44d6597e7a3c01687d8570fe4bb3c4f)
This commit is contained in:
arkon 2022-11-26 10:07:51 -05:00 committed by Jobobby04
parent 44c324f145
commit aaddb4bf00
4 changed files with 24 additions and 15 deletions

View File

@ -44,11 +44,11 @@ class DownloadCache(
private val downloadPreferences: DownloadPreferences = Injekt.get(),
) {
private val scope = CoroutineScope(Dispatchers.IO)
private val _changes: Channel<Unit> = Channel(Channel.UNLIMITED)
val changes = _changes.receiveAsFlow().onStart { emit(Unit) }
private val scope = CoroutineScope(Dispatchers.IO)
private val notifier by lazy { DownloadNotifier(context) }
/**

View File

@ -1,12 +1,18 @@
package eu.kanade.tachiyomi.data.download.model
import com.jakewharton.rxrelay.PublishRelay
import eu.kanade.core.util.asFlow
import eu.kanade.domain.chapter.model.Chapter
import eu.kanade.domain.manga.model.Manga
import eu.kanade.tachiyomi.data.download.DownloadStore
import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.util.lang.launchNonCancellable
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.receiveAsFlow
import rx.Observable
import rx.subjects.PublishSubject
import java.util.concurrent.CopyOnWriteArrayList
@ -16,9 +22,12 @@ class DownloadQueue(
private val queue: MutableList<Download> = CopyOnWriteArrayList(),
) : List<Download> by queue {
private val scope = CoroutineScope(Dispatchers.IO)
private val statusSubject = PublishSubject.create<Download>()
private val updatedRelay = PublishRelay.create<Unit>()
private val _updates: Channel<Unit> = Channel(Channel.UNLIMITED)
val updates = _updates.receiveAsFlow().onStart { emit(Unit) }.map { queue }
fun addAll(downloads: List<Download>) {
downloads.forEach { download ->
@ -28,7 +37,9 @@ class DownloadQueue(
}
queue.addAll(downloads)
store.addAll(downloads)
updatedRelay.call(Unit)
scope.launchNonCancellable {
_updates.send(Unit)
}
}
fun remove(download: Download) {
@ -40,7 +51,9 @@ class DownloadQueue(
download.status = Download.State.NOT_DOWNLOADED
}
if (removed) {
updatedRelay.call(Unit)
scope.launchNonCancellable {
_updates.send(Unit)
}
}
}
@ -68,7 +81,9 @@ class DownloadQueue(
}
queue.clear()
store.clear()
updatedRelay.call(Unit)
scope.launchNonCancellable {
_updates.send(Unit)
}
}
private fun getActiveDownloads(): Observable<Download> =
@ -80,12 +95,6 @@ class DownloadQueue(
fun statusFlow(): Flow<Download> = getStatusObservable().asFlow()
private fun getUpdatedObservable(): Observable<List<Download>> = updatedRelay.onBackpressureBuffer()
.startWith(Unit)
.map { this }
fun updatedFlow(): Flow<List<Download>> = getUpdatedObservable().asFlow()
private fun setPagesFor(download: Download) {
if (download.status == Download.State.DOWNLOADED || download.status == Download.State.ERROR) {
setPagesSubject(download.pages, null)

View File

@ -34,7 +34,7 @@ class DownloadPresenter : BasePresenter<DownloadController>() {
super.onCreate(savedState)
presenterScope.launch {
downloadQueue.updatedFlow()
downloadQueue.updates
.catch { logcat(LogPriority.ERROR, it) }
.map { downloads ->
downloads

View File

@ -90,7 +90,7 @@ private class MoreScreenModel(
coroutineScope.launchIO {
combine(
DownloadService.isRunning,
downloadManager.queue.updatedFlow(),
downloadManager.queue.updates,
) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) }
.collectLatest { (isDownloading, downloadQueueSize) ->
val pendingDownloadExists = downloadQueueSize != 0