Remove usage of RxJava from LibraryUpdateService

(cherry picked from commit 86b9d7e843c90c37f7e7374a20cbbcbf89caf10d)

# Conflicts:
#	app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt
This commit is contained in:
arkon 2021-01-23 11:20:16 -05:00 committed by Jobobby04
parent 355170b8ff
commit e3ee3159fc

View File

@ -32,7 +32,7 @@ import eu.kanade.tachiyomi.ui.library.LibraryGroup
import eu.kanade.tachiyomi.ui.manga.track.TrackItem
import eu.kanade.tachiyomi.util.chapter.NoChaptersException
import eu.kanade.tachiyomi.util.chapter.syncChaptersWithSource
import eu.kanade.tachiyomi.util.lang.runAsObservable
import eu.kanade.tachiyomi.util.lang.launchIO
import eu.kanade.tachiyomi.util.prepUpdateCover
import eu.kanade.tachiyomi.util.shouldDownloadNewChapters
import eu.kanade.tachiyomi.util.storage.getUriCompat
@ -47,14 +47,14 @@ import exh.source.getMainSource
import exh.source.mangaDexSourceIds
import exh.util.executeOnIO
import exh.util.nullIfBlank
import rx.Observable
import rx.Subscription
import rx.schedulers.Schedulers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.cancel
import timber.log.Timber
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
import java.io.File
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
/**
@ -74,17 +74,11 @@ class LibraryUpdateService(
val coverCache: CoverCache = Injekt.get()
) : Service() {
/**
* Wake lock that will be held until the service is destroyed.
*/
private lateinit var wakeLock: PowerManager.WakeLock
private lateinit var notifier: LibraryUpdateNotifier
private lateinit var scope: CoroutineScope
/**
* Subscription where the update is done.
*/
private var subscription: Subscription? = null
private var updateJob: Job? = null
/**
* Defines what should be updated within a service execution.
@ -177,6 +171,7 @@ class LibraryUpdateService(
override fun onCreate() {
super.onCreate()
scope = MainScope()
notifier = LibraryUpdateNotifier(this)
wakeLock = acquireWakeLock(javaClass.name)
@ -188,7 +183,8 @@ class LibraryUpdateService(
* lock.
*/
override fun onDestroy() {
subscription?.unsubscribe()
scope?.cancel()
updateJob?.cancel()
if (wakeLock.isHeld) {
wakeLock.release()
}
@ -216,16 +212,15 @@ class LibraryUpdateService(
?: return START_NOT_STICKY
// Unsubscribe from any previous subscription if needed.
subscription?.unsubscribe()
updateJob?.cancel()
// Update favorite manga. Destroy service when completed or in case of an error.
subscription = Observable
.defer {
val selectedScheme = preferences.libraryUpdatePrioritization().get()
val mangaList = getMangaToUpdate(intent, target)
.sortedWith(rankingScheme[selectedScheme])
val selectedScheme = preferences.libraryUpdatePrioritization().get()
val mangaList = getMangaToUpdate(intent, target)
.sortedWith(rankingScheme[selectedScheme])
// Update either chapter list or manga details.
updateJob = scope.launchIO {
try {
when (target) {
Target.CHAPTERS -> updateChapterList(mangaList)
Target.COVERS -> updateCovers(mangaList)
@ -235,19 +230,13 @@ class LibraryUpdateService(
Target.PUSH_FAVORITES -> pushFavorites()
// SY <--
}
} catch (e: Throwable) {
Timber.e(e)
stopSelf(startId)
} finally {
stopSelf(startId)
}
.subscribeOn(Schedulers.io())
.subscribe(
{
},
{
Timber.e(it)
stopSelf(startId)
},
{
stopSelf(startId)
}
)
}
return START_REDELIVER_INTENT
}
@ -332,7 +321,7 @@ class LibraryUpdateService(
* @param mangaToUpdate the list to update
* @return an observable delivering the progress of each update.
*/
fun updateChapterList(mangaToUpdate: List<LibraryManga>): Observable<LibraryManga> {
suspend fun updateChapterList(mangaToUpdate: List<LibraryManga>) {
// Initialize the variables holding the progress of the updates.
val count = AtomicInteger(0)
// List containing new updates
@ -342,74 +331,63 @@ class LibraryUpdateService(
// Boolean to determine if DownloadManager has downloads
var hasDownloads = false
// Emit each manga and update it sequentially.
return Observable.from(mangaToUpdate)
// Notify manga that will update.
.doOnNext { notifier.showProgressNotification(it, count.andIncrement, mangaToUpdate.size) }
// Update the chapters of the manga
.concatMap { manga ->
mangaToUpdate
.mapNotNull { manga ->
// Notify manga that will update.
notifier.showProgressNotification(manga, count.andIncrement, mangaToUpdate.size)
// SY -->
if (manga.source in LIBRARY_UPDATE_EXCLUDED_SOURCES) {
// Ignore EXH manga, updating chapters for every manga will get you banned
Observable.empty()
} else {
// SY <--
updateManga(manga)
// If there's any error, return empty update and continue.
.onErrorReturn {
val errorMessage = if (it is NoChaptersException) {
getString(R.string.no_chapters_error)
} else {
it.message
}
failedUpdates.add(Pair(manga, errorMessage))
Pair(emptyList(), emptyList())
}
// Filter out mangas without new chapters (or failed).
.filter { (first) -> first.isNotEmpty() }
.doOnNext {
if (manga.shouldDownloadNewChapters(db, preferences)) {
downloadChapters(manga, it.first)
hasDownloads = true
}
}
if (manga.source in LIBRARY_UPDATE_EXCLUDED_SOURCES) return@mapNotNull null
// SY <--
// Update the chapters of the manga
try {
val newChapters = updateManga(manga).first
Pair(manga, newChapters)
} catch (e: Throwable) {
// If there's any error, return empty update and continue.
val errorMessage = if (e is NoChaptersException) {
getString(R.string.no_chapters_error)
} else {
e.message
}
failedUpdates.add(Pair(manga, errorMessage))
Pair(manga, emptyList())
}
// Convert to the manga that contains new chapters.
.map {
Pair(
manga,
(
it.first.sortedByDescending { ch -> ch.source_order }
.toTypedArray()
)
)
}
}
// Add manga with new chapters to the list.
.doOnNext { manga ->
// Add to the list
newUpdates.add(manga)
}
// Notify result of the overall update.
.doOnCompleted {
notifier.cancelProgressNotification()
if (newUpdates.isNotEmpty()) {
notifier.showUpdateNotifications(newUpdates)
if (hasDownloads) {
DownloadService.start(this)
}
// Filter out mangas without new chapters (or failed).
.filter { (_, newChapters) -> newChapters.isNotEmpty() }
.forEach { (manga, newChapters) ->
if (manga.shouldDownloadNewChapters(db, preferences)) {
downloadChapters(manga, newChapters)
hasDownloads = true
}
if (preferences.showLibraryUpdateErrors() && failedUpdates.isNotEmpty()) {
val errorFile = writeErrorFile(failedUpdates)
notifier.showUpdateErrorNotification(
failedUpdates.map { it.first.title },
errorFile.getUriCompat(this)
// Convert to the manga that contains new chapters.
newUpdates.add(
Pair(
manga,
newChapters.sortedByDescending { ch -> ch.source_order }.toTypedArray()
)
}
)
}
.map { (first) -> first }
// Notify result of the overall update.
notifier.cancelProgressNotification()
if (newUpdates.isNotEmpty()) {
notifier.showUpdateNotifications(newUpdates)
if (hasDownloads) {
DownloadService.start(this)
}
}
if (preferences.showLibraryUpdateErrors() && failedUpdates.isNotEmpty()) {
val errorFile = writeErrorFile(failedUpdates)
notifier.showUpdateErrorNotification(
failedUpdates.map { it.first.title },
errorFile.getUriCompat(this)
)
}
}
private fun downloadChapters(manga: Manga, chapters: List<Chapter>) {
@ -429,71 +407,55 @@ class LibraryUpdateService(
* @param manga the manga to update.
* @return a pair of the inserted and removed chapters.
*/
fun updateManga(manga: Manga): Observable<Pair<List<Chapter>, List<Chapter>>> {
suspend fun updateManga(manga: Manga): Pair<List<Chapter>, List<Chapter>> {
val source = sourceManager.getOrStub(manga.source).getMainSource()
// Update manga details metadata in the background
if (preferences.autoUpdateMetadata()) {
runAsObservable({
val updatedManga = source.getMangaDetails(manga.toMangaInfo())
val sManga = updatedManga.toSManga()
// Avoid "losing" existing cover
if (!sManga.thumbnail_url.isNullOrEmpty()) {
manga.prepUpdateCover(coverCache, sManga, false)
} else {
sManga.thumbnail_url = manga.thumbnail_url
}
val updatedManga = source.getMangaDetails(manga.toMangaInfo())
val sManga = updatedManga.toSManga()
// Avoid "losing" existing cover
if (!sManga.thumbnail_url.isNullOrEmpty()) {
manga.prepUpdateCover(coverCache, sManga, false)
} else {
sManga.thumbnail_url = manga.thumbnail_url
}
manga.copyFrom(sManga)
db.insertManga(manga).executeAsBlocking()
manga
})
.onErrorResumeNext { Observable.just(manga) }
.subscribeOn(Schedulers.io())
.subscribe()
manga.copyFrom(sManga)
db.insertManga(manga).executeAsBlocking()
}
// SY -->
if (source is MangaDex && trackManager.mdList.isLogged) {
runAsObservable({
scope.launchIO {
if (source is MangaDex && trackManager.mdList.isLogged) {
val tracks = db.getTracks(manga).executeOnIO()
if (tracks.isEmpty() || tracks.none { it.sync_id == TrackManager.MDLIST }) {
var track = trackManager.mdList.createInitialTracker(manga)
track = trackManager.mdList.refresh(track)
db.insertTrack(track).executeOnIO()
}
})
.onErrorResumeNext { Observable.just(Unit) }
.subscribeOn(Schedulers.io())
.subscribe()
}
}
// SY -->
if (source is MergedSource) {
return source.fetchChaptersAndSync(manga, false)
}
// SY <--
return runAsObservable({
// SY -->
if (source is MergedSource) {
source.fetchChaptersAndSync(manga, false)
} else {
val sourceChapters = source.getChapterList(manga.toMangaInfo())
.map { it.toSChapter() }
syncChaptersWithSource(db, sourceChapters, manga, source)
}
})
// SY <--
val chapters = source.getChapterList(manga.toMangaInfo())
.map { it.toSChapter() }
return syncChaptersWithSource(db, chapters, manga, source)
}
private fun updateCovers(mangaToUpdate: List<LibraryManga>): Observable<LibraryManga> {
private suspend fun updateCovers(mangaToUpdate: List<LibraryManga>) {
var count = 0
return Observable.from(mangaToUpdate)
.doOnNext {
notifier.showProgressNotification(it, count++, mangaToUpdate.size)
}
.flatMap { manga ->
val source = sourceManager.get(manga.source)
?: return@flatMap Observable.empty<LibraryManga>()
mangaToUpdate.forEach { manga ->
notifier.showProgressNotification(manga, count++, mangaToUpdate.size)
runAsObservable({
sourceManager.get(manga.source)?.let { source ->
try {
val networkManga = source.getMangaDetails(manga.toMangaInfo())
val sManga = networkManga.toSManga()
manga.prepUpdateCover(coverCache, sManga, true)
@ -501,103 +463,96 @@ class LibraryUpdateService(
manga.thumbnail_url = it
db.insertManga(manga).executeAsBlocking()
}
manga
})
.onErrorReturn { manga }
}
.doOnCompleted {
notifier.cancelProgressNotification()
} catch (e: Throwable) {
// Ignore errors and continue
Timber.e(e)
}
}
}
notifier.cancelProgressNotification()
}
/**
* Method that updates the metadata of the connected tracking services. It's called in a
* background thread, so it's safe to do heavy operations or network calls here.
*/
private fun updateTrackings(mangaToUpdate: List<LibraryManga>): Observable<LibraryManga> {
private suspend fun updateTrackings(mangaToUpdate: List<LibraryManga>) {
// Initialize the variables holding the progress of the updates.
var count = 0
val loggedServices = trackManager.services.filter { it.isLogged }
// Emit each manga and update it sequentially.
return Observable.from(mangaToUpdate)
mangaToUpdate.forEach { manga ->
// Notify manga that will update.
.doOnNext { notifier.showProgressNotification(it, count++, mangaToUpdate.size) }
// Update the tracking details.
.concatMap { manga ->
val tracks = db.getTracks(manga).executeAsBlocking()
notifier.showProgressNotification(manga, count++, mangaToUpdate.size)
Observable.from(tracks)
.concatMap { track ->
val service = trackManager.getService(track.sync_id)
if (service != null && service in loggedServices) {
runAsObservable({ service.refresh(track) })
.doOnNext { db.insertTrack(it).executeAsBlocking() }
.onErrorReturn { track }
} else {
Observable.empty()
}
// Update the tracking details.
db.getTracks(manga).executeAsBlocking().forEach { track ->
val service = trackManager.getService(track.sync_id)
if (service != null && service in loggedServices) {
try {
val updatedTrack = service.refresh(track)
db.insertTrack(updatedTrack).executeAsBlocking()
} catch (e: Throwable) {
// Ignore errors and continue
Timber.e(e)
}
.map { manga }
}
.doOnCompleted {
notifier.cancelProgressNotification()
}
}
}
notifier.cancelProgressNotification()
}
// SY -->
/**
* filter all follows from Mangadex and only add reading or rereading manga to library
*/
private fun syncFollows(): Observable<LibraryManga> {
private suspend fun syncFollows() {
val count = AtomicInteger(0)
val mangaDex = MdUtil.getEnabledMangaDex(preferences, sourceManager) ?: return Observable.empty()
return runAsObservable({ mangaDex.fetchAllFollows(true) })
.map { listManga ->
listManga.filter { (_, metadata) ->
metadata.follow_status == FollowStatus.RE_READING.int || metadata.follow_status == FollowStatus.READING.int
}
}
.doOnNext { listManga ->
listManga.forEach { (networkManga, metadata) ->
notifier.showProgressNotification(networkManga, count.andIncrement, listManga.size)
var dbManga = db.getManga(networkManga.url, mangaDex.id)
.executeAsBlocking()
if (dbManga == null) {
dbManga = Manga.create(
networkManga.url,
networkManga.title,
mangaDex.id
)
dbManga.date_added = Date().time
}
val mangaDex = MdUtil.getEnabledMangaDex(preferences, sourceManager) ?: return
dbManga.copyFrom(networkManga)
dbManga.favorite = true
val id = db.insertManga(dbManga).executeAsBlocking().insertedId()
if (id != null) {
metadata.mangaId = id
db.insertFlatMetadata(metadata.flatten()).await()
}
}
val mangadexFollows = mangaDex.fetchAllFollows(true)
.filter { (_, metadata) ->
metadata.follow_status == FollowStatus.RE_READING.int || metadata.follow_status == FollowStatus.READING.int
}
.doOnCompleted {
notifier.cancelProgressNotification()
mangadexFollows.forEach { (networkManga, metadata) ->
notifier.showProgressNotification(networkManga, count.andIncrement, mangadexFollows.size)
var dbManga = db.getManga(networkManga.url, mangaDex.id)
.executeOnIO()
if (dbManga == null) {
dbManga = Manga.create(
networkManga.url,
networkManga.title,
mangaDex.id
)
dbManga.date_added = System.currentTimeMillis()
}
.flatMap { Observable.empty() }
dbManga.copyFrom(networkManga)
dbManga.favorite = true
val id = db.insertManga(dbManga).executeOnIO().insertedId()
if (id != null) {
metadata.mangaId = id
db.insertFlatMetadata(metadata.flatten()).await()
}
}
notifier.cancelProgressNotification()
}
/**
* Method that updates the all mangas which are not tracked as "reading" on mangadex
*/
private fun pushFavorites(): Observable<LibraryManga> {
private suspend fun pushFavorites() {
val count = AtomicInteger(0)
val listManga = db.getFavoriteMangas().executeAsBlocking().filter { it.source in mangaDexSourceIds }
// filter all follows from Mangadex and only add reading or rereading manga to library
return Observable.from(if (trackManager.mdList.isLogged) listManga else emptyList())
.flatMap { manga ->
if (trackManager.mdList.isLogged) {
listManga.forEach { manga ->
notifier.showProgressNotification(manga, count.andIncrement, listManga.size)
// Get this manga's trackers from the database
@ -608,18 +563,13 @@ class LibraryUpdateService(
if (tracker.track?.status == FollowStatus.UNFOLLOWED.int) {
tracker.track.status = FollowStatus.READING.int
runAsObservable({ tracker.service.update(tracker.track) })
} else Observable.just(null)
}
.doOnNext { returnedTracker ->
returnedTracker?.let {
db.insertTrack(returnedTracker)
val updatedTrack = tracker.service.update(tracker.track)
db.insertTrack(updatedTrack).executeOnIO()
}
}
.doOnCompleted {
notifier.cancelProgressNotification()
}
.flatMap { Observable.empty() }
}
notifier.cancelProgressNotification()
}
// SY <--