Replace RxJava in DownloadQueueScreenModel (#8872)

(cherry picked from commit 2245658363823e5b3c29e9f28b45d95418f3e57e)
This commit is contained in:
Two-Ai 2023-01-09 23:08:04 -05:00 committed by Jobobby04
parent 4c2e9aa509
commit e099d1a313
2 changed files with 32 additions and 35 deletions

View File

@ -18,9 +18,8 @@ data class Download(
var pages: List<Page>? = null,
) {
@Volatile
@Transient
var totalProgress: Int = 0
val totalProgress: Int
get() = pages?.sumOf(Page::progress) ?: 0
@Volatile
@Transient

View File

@ -11,19 +11,21 @@ import eu.kanade.tachiyomi.data.download.model.Download
import eu.kanade.tachiyomi.databinding.DownloadListBinding
import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.util.system.logcat
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import logcat.LogPriority
import rx.Observable
import rx.Subscription
import rx.android.schedulers.AndroidSchedulers
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
import java.util.concurrent.TimeUnit
class DownloadQueueScreenModel(
private val downloadManager: DownloadManager = Injekt.get(),
@ -40,9 +42,9 @@ class DownloadQueueScreenModel(
var adapter: DownloadAdapter? = null
/**
* Map of subscriptions for active downloads.
* Map of jobs for active downloads.
*/
val progressSubscriptions by lazy { mutableMapOf<Download, Subscription>() }
private val progressJobs = mutableMapOf<Download, Job>()
val listener = object : DownloadAdapter.DownloadItemListener {
/**
@ -130,10 +132,10 @@ class DownloadQueueScreenModel(
}
override fun onDispose() {
for (subscription in progressSubscriptions.values) {
subscription.unsubscribe()
for (job in progressJobs.values) {
job.cancel()
}
progressSubscriptions.clear()
progressJobs.clear()
adapter = null
}
@ -180,16 +182,16 @@ class DownloadQueueScreenModel(
fun onStatusChange(download: Download) {
when (download.status) {
Download.State.DOWNLOADING -> {
observeProgress(download)
launchProgressJob(download)
// Initial update of the downloaded pages
onUpdateDownloadedPages(download)
}
Download.State.DOWNLOADED -> {
unsubscribeProgress(download)
cancelProgressJob(download)
onUpdateProgress(download)
onUpdateDownloadedPages(download)
}
Download.State.ERROR -> unsubscribeProgress(download)
Download.State.ERROR -> cancelProgressJob(download)
else -> {
/* unused */
}
@ -201,29 +203,25 @@ class DownloadQueueScreenModel(
*
* @param download the download to observe its progress.
*/
private fun observeProgress(download: Download) {
val subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
// Get the sum of percentages for all the pages.
.flatMap {
Observable.from(download.pages)
.map(Page::progress)
.reduce { x, y -> x + y }
private fun launchProgressJob(download: Download) {
val job = coroutineScope.launch {
while (download.pages == null) {
delay(50)
}
// Keep only the latest emission to avoid backpressure.
.onBackpressureLatest()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { progress ->
// Update the view only if the progress has changed.
if (download.totalProgress != progress) {
download.totalProgress = progress
val progressFlows = download.pages!!.map(Page::progressFlow)
combine(progressFlows, Array<Int>::sum)
.distinctUntilChanged()
.debounce(50)
.collectLatest {
onUpdateProgress(download)
}
}
}
// Avoid leaking subscriptions
progressSubscriptions.remove(download)?.unsubscribe()
// Avoid leaking jobs
progressJobs.remove(download)?.cancel()
progressSubscriptions[download] = subscription
progressJobs[download] = job
}
/**
@ -231,8 +229,8 @@ class DownloadQueueScreenModel(
*
* @param download the download to unsubscribe.
*/
private fun unsubscribeProgress(download: Download) {
progressSubscriptions.remove(download)?.unsubscribe()
private fun cancelProgressJob(download: Download) {
progressJobs.remove(download)?.cancel()
}
/**