From e3ee3159fc41004097ee3685a6a14d9ec9aedf7e Mon Sep 17 00:00:00 2001 From: arkon Date: Sat, 23 Jan 2021 11:20:16 -0500 Subject: [PATCH] Remove usage of RxJava from LibraryUpdateService (cherry picked from commit 86b9d7e843c90c37f7e7374a20cbbcbf89caf10d) # Conflicts: # app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt --- .../data/library/LibraryUpdateService.kt | 372 ++++++++---------- 1 file changed, 161 insertions(+), 211 deletions(-) diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt b/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt index 50ba16200..06c05be68 100755 --- a/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt @@ -32,7 +32,7 @@ import eu.kanade.tachiyomi.ui.library.LibraryGroup import eu.kanade.tachiyomi.ui.manga.track.TrackItem import eu.kanade.tachiyomi.util.chapter.NoChaptersException import eu.kanade.tachiyomi.util.chapter.syncChaptersWithSource -import eu.kanade.tachiyomi.util.lang.runAsObservable +import eu.kanade.tachiyomi.util.lang.launchIO import eu.kanade.tachiyomi.util.prepUpdateCover import eu.kanade.tachiyomi.util.shouldDownloadNewChapters import eu.kanade.tachiyomi.util.storage.getUriCompat @@ -47,14 +47,14 @@ import exh.source.getMainSource import exh.source.mangaDexSourceIds import exh.util.executeOnIO import exh.util.nullIfBlank -import rx.Observable -import rx.Subscription -import rx.schedulers.Schedulers +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.MainScope +import kotlinx.coroutines.cancel import timber.log.Timber import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import java.io.File -import java.util.Date import java.util.concurrent.atomic.AtomicInteger /** @@ -74,17 +74,11 @@ class LibraryUpdateService( val coverCache: CoverCache = Injekt.get() ) : Service() { - /** - * Wake lock that will be held until the service is destroyed. - */ private lateinit var wakeLock: PowerManager.WakeLock - private lateinit var notifier: LibraryUpdateNotifier + private lateinit var scope: CoroutineScope - /** - * Subscription where the update is done. - */ - private var subscription: Subscription? = null + private var updateJob: Job? = null /** * Defines what should be updated within a service execution. @@ -177,6 +171,7 @@ class LibraryUpdateService( override fun onCreate() { super.onCreate() + scope = MainScope() notifier = LibraryUpdateNotifier(this) wakeLock = acquireWakeLock(javaClass.name) @@ -188,7 +183,8 @@ class LibraryUpdateService( * lock. */ override fun onDestroy() { - subscription?.unsubscribe() + scope?.cancel() + updateJob?.cancel() if (wakeLock.isHeld) { wakeLock.release() } @@ -216,16 +212,15 @@ class LibraryUpdateService( ?: return START_NOT_STICKY // Unsubscribe from any previous subscription if needed. - subscription?.unsubscribe() + updateJob?.cancel() // Update favorite manga. Destroy service when completed or in case of an error. - subscription = Observable - .defer { - val selectedScheme = preferences.libraryUpdatePrioritization().get() - val mangaList = getMangaToUpdate(intent, target) - .sortedWith(rankingScheme[selectedScheme]) + val selectedScheme = preferences.libraryUpdatePrioritization().get() + val mangaList = getMangaToUpdate(intent, target) + .sortedWith(rankingScheme[selectedScheme]) - // Update either chapter list or manga details. + updateJob = scope.launchIO { + try { when (target) { Target.CHAPTERS -> updateChapterList(mangaList) Target.COVERS -> updateCovers(mangaList) @@ -235,19 +230,13 @@ class LibraryUpdateService( Target.PUSH_FAVORITES -> pushFavorites() // SY <-- } + } catch (e: Throwable) { + Timber.e(e) + stopSelf(startId) + } finally { + stopSelf(startId) } - .subscribeOn(Schedulers.io()) - .subscribe( - { - }, - { - Timber.e(it) - stopSelf(startId) - }, - { - stopSelf(startId) - } - ) + } return START_REDELIVER_INTENT } @@ -332,7 +321,7 @@ class LibraryUpdateService( * @param mangaToUpdate the list to update * @return an observable delivering the progress of each update. */ - fun updateChapterList(mangaToUpdate: List): Observable { + suspend fun updateChapterList(mangaToUpdate: List) { // Initialize the variables holding the progress of the updates. val count = AtomicInteger(0) // List containing new updates @@ -342,74 +331,63 @@ class LibraryUpdateService( // Boolean to determine if DownloadManager has downloads var hasDownloads = false - // Emit each manga and update it sequentially. - return Observable.from(mangaToUpdate) - // Notify manga that will update. - .doOnNext { notifier.showProgressNotification(it, count.andIncrement, mangaToUpdate.size) } - // Update the chapters of the manga - .concatMap { manga -> + mangaToUpdate + .mapNotNull { manga -> + // Notify manga that will update. + notifier.showProgressNotification(manga, count.andIncrement, mangaToUpdate.size) + // SY --> - if (manga.source in LIBRARY_UPDATE_EXCLUDED_SOURCES) { - // Ignore EXH manga, updating chapters for every manga will get you banned - Observable.empty() - } else { - // SY <-- - updateManga(manga) - // If there's any error, return empty update and continue. - .onErrorReturn { - val errorMessage = if (it is NoChaptersException) { - getString(R.string.no_chapters_error) - } else { - it.message - } - failedUpdates.add(Pair(manga, errorMessage)) - Pair(emptyList(), emptyList()) - } - // Filter out mangas without new chapters (or failed). - .filter { (first) -> first.isNotEmpty() } - .doOnNext { - if (manga.shouldDownloadNewChapters(db, preferences)) { - downloadChapters(manga, it.first) - hasDownloads = true - } - } + if (manga.source in LIBRARY_UPDATE_EXCLUDED_SOURCES) return@mapNotNull null + // SY <-- + // Update the chapters of the manga + try { + val newChapters = updateManga(manga).first + Pair(manga, newChapters) + } catch (e: Throwable) { + // If there's any error, return empty update and continue. + val errorMessage = if (e is NoChaptersException) { + getString(R.string.no_chapters_error) + } else { + e.message + } + failedUpdates.add(Pair(manga, errorMessage)) + Pair(manga, emptyList()) } - // Convert to the manga that contains new chapters. - .map { - Pair( - manga, - ( - it.first.sortedByDescending { ch -> ch.source_order } - .toTypedArray() - ) - ) - } } - // Add manga with new chapters to the list. - .doOnNext { manga -> - // Add to the list - newUpdates.add(manga) - } - // Notify result of the overall update. - .doOnCompleted { - notifier.cancelProgressNotification() - - if (newUpdates.isNotEmpty()) { - notifier.showUpdateNotifications(newUpdates) - if (hasDownloads) { - DownloadService.start(this) - } + // Filter out mangas without new chapters (or failed). + .filter { (_, newChapters) -> newChapters.isNotEmpty() } + .forEach { (manga, newChapters) -> + if (manga.shouldDownloadNewChapters(db, preferences)) { + downloadChapters(manga, newChapters) + hasDownloads = true } - if (preferences.showLibraryUpdateErrors() && failedUpdates.isNotEmpty()) { - val errorFile = writeErrorFile(failedUpdates) - notifier.showUpdateErrorNotification( - failedUpdates.map { it.first.title }, - errorFile.getUriCompat(this) + // Convert to the manga that contains new chapters. + newUpdates.add( + Pair( + manga, + newChapters.sortedByDescending { ch -> ch.source_order }.toTypedArray() ) - } + ) } - .map { (first) -> first } + + // Notify result of the overall update. + notifier.cancelProgressNotification() + + if (newUpdates.isNotEmpty()) { + notifier.showUpdateNotifications(newUpdates) + if (hasDownloads) { + DownloadService.start(this) + } + } + + if (preferences.showLibraryUpdateErrors() && failedUpdates.isNotEmpty()) { + val errorFile = writeErrorFile(failedUpdates) + notifier.showUpdateErrorNotification( + failedUpdates.map { it.first.title }, + errorFile.getUriCompat(this) + ) + } } private fun downloadChapters(manga: Manga, chapters: List) { @@ -429,71 +407,55 @@ class LibraryUpdateService( * @param manga the manga to update. * @return a pair of the inserted and removed chapters. */ - fun updateManga(manga: Manga): Observable, List>> { + suspend fun updateManga(manga: Manga): Pair, List> { val source = sourceManager.getOrStub(manga.source).getMainSource() // Update manga details metadata in the background if (preferences.autoUpdateMetadata()) { - runAsObservable({ - val updatedManga = source.getMangaDetails(manga.toMangaInfo()) - val sManga = updatedManga.toSManga() - // Avoid "losing" existing cover - if (!sManga.thumbnail_url.isNullOrEmpty()) { - manga.prepUpdateCover(coverCache, sManga, false) - } else { - sManga.thumbnail_url = manga.thumbnail_url - } + val updatedManga = source.getMangaDetails(manga.toMangaInfo()) + val sManga = updatedManga.toSManga() + // Avoid "losing" existing cover + if (!sManga.thumbnail_url.isNullOrEmpty()) { + manga.prepUpdateCover(coverCache, sManga, false) + } else { + sManga.thumbnail_url = manga.thumbnail_url + } - manga.copyFrom(sManga) - db.insertManga(manga).executeAsBlocking() - manga - }) - .onErrorResumeNext { Observable.just(manga) } - .subscribeOn(Schedulers.io()) - .subscribe() + manga.copyFrom(sManga) + db.insertManga(manga).executeAsBlocking() } - // SY --> - if (source is MangaDex && trackManager.mdList.isLogged) { - runAsObservable({ + scope.launchIO { + if (source is MangaDex && trackManager.mdList.isLogged) { val tracks = db.getTracks(manga).executeOnIO() if (tracks.isEmpty() || tracks.none { it.sync_id == TrackManager.MDLIST }) { var track = trackManager.mdList.createInitialTracker(manga) track = trackManager.mdList.refresh(track) db.insertTrack(track).executeOnIO() } - }) - .onErrorResumeNext { Observable.just(Unit) } - .subscribeOn(Schedulers.io()) - .subscribe() + } + } + + // SY --> + if (source is MergedSource) { + return source.fetchChaptersAndSync(manga, false) } // SY <-- - return runAsObservable({ - // SY --> - if (source is MergedSource) { - source.fetchChaptersAndSync(manga, false) - } else { - val sourceChapters = source.getChapterList(manga.toMangaInfo()) - .map { it.toSChapter() } - syncChaptersWithSource(db, sourceChapters, manga, source) - } - }) - // SY <-- + val chapters = source.getChapterList(manga.toMangaInfo()) + .map { it.toSChapter() } + + return syncChaptersWithSource(db, chapters, manga, source) } - private fun updateCovers(mangaToUpdate: List): Observable { + private suspend fun updateCovers(mangaToUpdate: List) { var count = 0 - return Observable.from(mangaToUpdate) - .doOnNext { - notifier.showProgressNotification(it, count++, mangaToUpdate.size) - } - .flatMap { manga -> - val source = sourceManager.get(manga.source) - ?: return@flatMap Observable.empty() + mangaToUpdate.forEach { manga -> + notifier.showProgressNotification(manga, count++, mangaToUpdate.size) - runAsObservable({ + sourceManager.get(manga.source)?.let { source -> + try { val networkManga = source.getMangaDetails(manga.toMangaInfo()) val sManga = networkManga.toSManga() manga.prepUpdateCover(coverCache, sManga, true) @@ -501,103 +463,96 @@ class LibraryUpdateService( manga.thumbnail_url = it db.insertManga(manga).executeAsBlocking() } - manga - }) - .onErrorReturn { manga } - } - .doOnCompleted { - notifier.cancelProgressNotification() + } catch (e: Throwable) { + // Ignore errors and continue + Timber.e(e) + } } + } + + notifier.cancelProgressNotification() } /** * Method that updates the metadata of the connected tracking services. It's called in a * background thread, so it's safe to do heavy operations or network calls here. */ - private fun updateTrackings(mangaToUpdate: List): Observable { + private suspend fun updateTrackings(mangaToUpdate: List) { // Initialize the variables holding the progress of the updates. var count = 0 val loggedServices = trackManager.services.filter { it.isLogged } - // Emit each manga and update it sequentially. - return Observable.from(mangaToUpdate) + mangaToUpdate.forEach { manga -> // Notify manga that will update. - .doOnNext { notifier.showProgressNotification(it, count++, mangaToUpdate.size) } - // Update the tracking details. - .concatMap { manga -> - val tracks = db.getTracks(manga).executeAsBlocking() + notifier.showProgressNotification(manga, count++, mangaToUpdate.size) - Observable.from(tracks) - .concatMap { track -> - val service = trackManager.getService(track.sync_id) - if (service != null && service in loggedServices) { - runAsObservable({ service.refresh(track) }) - .doOnNext { db.insertTrack(it).executeAsBlocking() } - .onErrorReturn { track } - } else { - Observable.empty() - } + // Update the tracking details. + db.getTracks(manga).executeAsBlocking().forEach { track -> + val service = trackManager.getService(track.sync_id) + if (service != null && service in loggedServices) { + try { + val updatedTrack = service.refresh(track) + db.insertTrack(updatedTrack).executeAsBlocking() + } catch (e: Throwable) { + // Ignore errors and continue + Timber.e(e) } - .map { manga } - } - .doOnCompleted { - notifier.cancelProgressNotification() + } } + } + + notifier.cancelProgressNotification() } // SY --> /** * filter all follows from Mangadex and only add reading or rereading manga to library */ - private fun syncFollows(): Observable { + private suspend fun syncFollows() { val count = AtomicInteger(0) - val mangaDex = MdUtil.getEnabledMangaDex(preferences, sourceManager) ?: return Observable.empty() - return runAsObservable({ mangaDex.fetchAllFollows(true) }) - .map { listManga -> - listManga.filter { (_, metadata) -> - metadata.follow_status == FollowStatus.RE_READING.int || metadata.follow_status == FollowStatus.READING.int - } - } - .doOnNext { listManga -> - listManga.forEach { (networkManga, metadata) -> - notifier.showProgressNotification(networkManga, count.andIncrement, listManga.size) - var dbManga = db.getManga(networkManga.url, mangaDex.id) - .executeAsBlocking() - if (dbManga == null) { - dbManga = Manga.create( - networkManga.url, - networkManga.title, - mangaDex.id - ) - dbManga.date_added = Date().time - } + val mangaDex = MdUtil.getEnabledMangaDex(preferences, sourceManager) ?: return - dbManga.copyFrom(networkManga) - dbManga.favorite = true - val id = db.insertManga(dbManga).executeAsBlocking().insertedId() - if (id != null) { - metadata.mangaId = id - db.insertFlatMetadata(metadata.flatten()).await() - } - } + val mangadexFollows = mangaDex.fetchAllFollows(true) + .filter { (_, metadata) -> + metadata.follow_status == FollowStatus.RE_READING.int || metadata.follow_status == FollowStatus.READING.int } - .doOnCompleted { - notifier.cancelProgressNotification() + + mangadexFollows.forEach { (networkManga, metadata) -> + notifier.showProgressNotification(networkManga, count.andIncrement, mangadexFollows.size) + var dbManga = db.getManga(networkManga.url, mangaDex.id) + .executeOnIO() + if (dbManga == null) { + dbManga = Manga.create( + networkManga.url, + networkManga.title, + mangaDex.id + ) + dbManga.date_added = System.currentTimeMillis() } - .flatMap { Observable.empty() } + + dbManga.copyFrom(networkManga) + dbManga.favorite = true + val id = db.insertManga(dbManga).executeOnIO().insertedId() + if (id != null) { + metadata.mangaId = id + db.insertFlatMetadata(metadata.flatten()).await() + } + } + + notifier.cancelProgressNotification() } /** * Method that updates the all mangas which are not tracked as "reading" on mangadex */ - private fun pushFavorites(): Observable { + private suspend fun pushFavorites() { val count = AtomicInteger(0) val listManga = db.getFavoriteMangas().executeAsBlocking().filter { it.source in mangaDexSourceIds } // filter all follows from Mangadex and only add reading or rereading manga to library - return Observable.from(if (trackManager.mdList.isLogged) listManga else emptyList()) - .flatMap { manga -> + if (trackManager.mdList.isLogged) { + listManga.forEach { manga -> notifier.showProgressNotification(manga, count.andIncrement, listManga.size) // Get this manga's trackers from the database @@ -608,18 +563,13 @@ class LibraryUpdateService( if (tracker.track?.status == FollowStatus.UNFOLLOWED.int) { tracker.track.status = FollowStatus.READING.int - runAsObservable({ tracker.service.update(tracker.track) }) - } else Observable.just(null) - } - .doOnNext { returnedTracker -> - returnedTracker?.let { - db.insertTrack(returnedTracker) + val updatedTrack = tracker.service.update(tracker.track) + db.insertTrack(updatedTrack).executeOnIO() } } - .doOnCompleted { - notifier.cancelProgressNotification() - } - .flatMap { Observable.empty() } + } + + notifier.cancelProgressNotification() } // SY <--