From 0a502fcf31dbdab0a81a067f18f36305656e2f59 Mon Sep 17 00:00:00 2001 From: Jobobby04 Date: Mon, 24 Aug 2020 17:28:14 -0400 Subject: [PATCH] Cleanup merged source code so I can modify it easier later on --- app/build.gradle | 5 - .../source/online/SuspendHttpSource.kt | 355 ++++++++++++++++++ .../source/online/all/MergedSource.kt | 113 +++--- app/src/main/java/exh/util/RxUtil.kt | 126 +++++++ 4 files changed, 535 insertions(+), 64 deletions(-) create mode 100644 app/src/main/java/eu/kanade/tachiyomi/source/online/SuspendHttpSource.kt diff --git a/app/build.gradle b/app/build.gradle index 51110a3c5..fef0e5604 100755 --- a/app/build.gradle +++ b/app/build.gradle @@ -300,8 +300,6 @@ dependencies { final coroutines_version = '1.3.9' implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_version" implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$coroutines_version" - implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$coroutines_version" - implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:$coroutines_version" // For detecting memory leaks; see https://square.github.io/leakcanary/ // debugImplementation 'com.squareup.leakcanary:leakcanary-android:2.4' @@ -313,9 +311,6 @@ dependencies { // Text distance (EH) implementation 'info.debatty:java-string-similarity:1.2.1' - // RxJava 2 interop for Realm (EH) - implementation 'com.github.akarnokd:rxjava2-interop:0.13.7' - // Firebase (EH) implementation 'com.google.firebase:firebase-analytics-ktx:17.5.0' implementation 'com.google.firebase:firebase-crashlytics-ktx:17.2.1' diff --git a/app/src/main/java/eu/kanade/tachiyomi/source/online/SuspendHttpSource.kt b/app/src/main/java/eu/kanade/tachiyomi/source/online/SuspendHttpSource.kt new file mode 100644 index 000000000..da7973a2c --- /dev/null +++ b/app/src/main/java/eu/kanade/tachiyomi/source/online/SuspendHttpSource.kt @@ -0,0 +1,355 @@ +package eu.kanade.tachiyomi.source.online + +import eu.kanade.tachiyomi.network.GET +import eu.kanade.tachiyomi.network.await +import eu.kanade.tachiyomi.network.newCallWithProgress +import eu.kanade.tachiyomi.source.model.FilterList +import eu.kanade.tachiyomi.source.model.MangasPage +import eu.kanade.tachiyomi.source.model.Page +import eu.kanade.tachiyomi.source.model.SChapter +import eu.kanade.tachiyomi.source.model.SManga +import kotlin.jvm.Throws +import kotlinx.coroutines.runBlocking +import okhttp3.Request +import okhttp3.Response +import rx.Observable + +/** + * A simple implementation for sources from a website, but for Coroutines. + */ +abstract class SuspendHttpSource : HttpSource() { + + /** + * Returns an observable containing a page with a list of manga. Normally it's not needed to + * override this method. + * + * @param page the page number to retrieve. + */ + override fun fetchPopularManga(page: Int): Observable { + return Observable.just(runBlocking { fetchPopularMangaSuspended(page) }) + } + + open suspend fun fetchPopularMangaSuspended(page: Int): MangasPage { + val response = client.newCall(popularMangaRequestSuspended(page)).await() + return popularMangaParseSuspended(response) + } + + /** + * Returns the request for the popular manga given the page. + * + * @param page the page number to retrieve. + */ + override fun popularMangaRequest(page: Int): Request { + return runBlocking { popularMangaRequestSuspended(page) } + } + + protected abstract suspend fun popularMangaRequestSuspended(page: Int): Request + + /** + * Parses the response from the site and returns a [MangasPage] object. + * + * @param response the response from the site. + */ + override fun popularMangaParse(response: Response): MangasPage { + return runBlocking { popularMangaParseSuspended(response) } + } + + protected abstract suspend fun popularMangaParseSuspended(response: Response): MangasPage + + /** + * Returns an observable containing a page with a list of manga. Normally it's not needed to + * override this method. + * + * @param page the page number to retrieve. + * @param query the search query. + * @param filters the list of filters to apply. + */ + override fun fetchSearchManga(page: Int, query: String, filters: FilterList): Observable { + return Observable.just(runBlocking { fetchSearchMangaSuspended(page, query, filters) }) + } + + open suspend fun fetchSearchMangaSuspended(page: Int, query: String, filters: FilterList): MangasPage { + val response = client.newCall(searchMangaRequestSuspended(page, query, filters)).await() + return searchMangaParseSuspended(response) + } + + /** + * Returns the request for the search manga given the page. + * + * @param page the page number to retrieve. + * @param query the search query. + * @param filters the list of filters to apply. + */ + override fun searchMangaRequest(page: Int, query: String, filters: FilterList): Request { + return runBlocking { searchMangaRequestSuspended(page, query, filters) } + } + + protected abstract suspend fun searchMangaRequestSuspended(page: Int, query: String, filters: FilterList): Request + + /** + * Parses the response from the site and returns a [MangasPage] object. + * + * @param response the response from the site. + */ + override fun searchMangaParse(response: Response): MangasPage { + return runBlocking { searchMangaParseSuspended(response) } + } + + protected abstract suspend fun searchMangaParseSuspended(response: Response): MangasPage + + /** + * Returns an observable containing a page with a list of latest manga updates. + * + * @param page the page number to retrieve. + */ + override fun fetchLatestUpdates(page: Int): Observable { + return Observable.just(runBlocking { fetchLatestUpdatesSuspended(page) }) + } + + open suspend fun fetchLatestUpdatesSuspended(page: Int): MangasPage { + val response = client.newCall(latestUpdatesRequestSuspended(page)).await() + return latestUpdatesParseSuspended(response) + } + + /** + * Returns the request for latest manga given the page. + * + * @param page the page number to retrieve. + */ + override fun latestUpdatesRequest(page: Int): Request { + return runBlocking { latestUpdatesRequestSuspended(page) } + } + + protected abstract suspend fun latestUpdatesRequestSuspended(page: Int): Request + + /** + * Parses the response from the site and returns a [MangasPage] object. + * + * @param response the response from the site. + */ + override fun latestUpdatesParse(response: Response): MangasPage { + return runBlocking { latestUpdatesParseSuspended(response) } + } + + protected abstract suspend fun latestUpdatesParseSuspended(response: Response): MangasPage + + /** + * Returns an observable with the updated details for a manga. Normally it's not needed to + * override this method. + * + * @param manga the manga to be updated. + */ + override fun fetchMangaDetails(manga: SManga): Observable { + return Observable.just(runBlocking { fetchMangaDetailsSuspended(manga) }) + } + + open suspend fun fetchMangaDetailsSuspended(manga: SManga): SManga { + val response = client.newCall(mangaDetailsRequestSuspended(manga)).await() + return mangaDetailsParseSuspended(response).apply { initialized = true } + } + + /** + * Returns the request for the details of a manga. Override only if it's needed to change the + * url, send different headers or request method like POST. + * + * @param manga the manga to be updated. + */ + override fun mangaDetailsRequest(manga: SManga): Request { + return runBlocking { mangaDetailsRequestSuspended(manga) } + } + + open suspend fun mangaDetailsRequestSuspended(manga: SManga): Request { + return GET(baseUrl + manga.url, headers) + } + + /** + * Parses the response from the site and returns the details of a manga. + * + * @param response the response from the site. + */ + override fun mangaDetailsParse(response: Response): SManga { + return runBlocking { mangaDetailsParseSuspended(response) } + } + + protected abstract suspend fun mangaDetailsParseSuspended(response: Response): SManga + + /** + * Returns an observable with the updated chapter list for a manga. Normally it's not needed to + * override this method. If a manga is licensed an empty chapter list observable is returned + * + * @param manga the manga to look for chapters. + */ + override fun fetchChapterList(manga: SManga): Observable> { + return try { + Observable.just(runBlocking { fetchChapterListSuspended(manga) }) + } catch (e: LicencedException) { + Observable.error(Exception("Licensed - No chapters to show")) + } + } + + @Throws(LicencedException::class) + open suspend fun fetchChapterListSuspended(manga: SManga): List { + return if (manga.status != SManga.LICENSED) { + val response = client.newCall(chapterListRequestSuspended(manga)).await() + chapterListParseSuspended(response) + } else { + throw LicencedException("Licensed - No chapters to show") + } + } + + /** + * Returns the request for updating the chapter list. Override only if it's needed to override + * the url, send different headers or request method like POST. + * + * @param manga the manga to look for chapters. + */ + override fun chapterListRequest(manga: SManga): Request { + return runBlocking { chapterListRequestSuspended(manga) } + } + + protected open suspend fun chapterListRequestSuspended(manga: SManga): Request { + return GET(baseUrl + manga.url, headers) + } + + /** + * Parses the response from the site and returns a list of chapters. + * + * @param response the response from the site. + */ + override fun chapterListParse(response: Response): List { + return runBlocking { chapterListParseSuspended(response) } + } + + protected abstract suspend fun chapterListParseSuspended(response: Response): List + + /** + * Returns an observable with the page list for a chapter. + * + * @param chapter the chapter whose page list has to be fetched. + */ + override fun fetchPageList(chapter: SChapter): Observable> { + return Observable.just(runBlocking { fetchPageListSuspended(chapter) }) + } + + open suspend fun fetchPageListSuspended(chapter: SChapter): List { + val response = client.newCall(pageListRequestSuspended(chapter)).await() + return pageListParseSuspended(response) + } + + /** + * Returns the request for getting the page list. Override only if it's needed to override the + * url, send different headers or request method like POST. + * + * @param chapter the chapter whose page list has to be fetched. + */ + override fun pageListRequest(chapter: SChapter): Request { + return runBlocking { pageListRequestSuspended(chapter) } + } + + protected open suspend fun pageListRequestSuspended(chapter: SChapter): Request { + return GET(baseUrl + chapter.url, headers) + } + + /** + * Parses the response from the site and returns a list of pages. + * + * @param response the response from the site. + */ + override fun pageListParse(response: Response): List { + return runBlocking { pageListParseSuspended(response) } + } + + protected abstract suspend fun pageListParseSuspended(response: Response): List + + /** + * Returns an observable with the page containing the source url of the image. If there's any + * error, it will return null instead of throwing an exception. + * + * @param page the page whose source image has to be fetched. + */ + override fun fetchImageUrl(page: Page): Observable { + return Observable.just(runBlocking { fetchImageUrlSuspended(page) }) + } + + open suspend fun fetchImageUrlSuspended(page: Page): String { + val response = client.newCall(imageUrlRequestSuspended(page)).await() + return imageUrlParseSuspended(response) + } + + /** + * Returns the request for getting the url to the source image. Override only if it's needed to + * override the url, send different headers or request method like POST. + * + * @param page the chapter whose page list has to be fetched + */ + override fun imageUrlRequest(page: Page): Request { + return runBlocking { imageUrlRequestSuspended(page) } + } + + protected open suspend fun imageUrlRequestSuspended(page: Page): Request { + return GET(page.url, headers) + } + + /** + * Parses the response from the site and returns the absolute url to the source image. + * + * @param response the response from the site. + */ + override fun imageUrlParse(response: Response): String { + return runBlocking { imageUrlParseSuspended(response) } + } + + protected abstract suspend fun imageUrlParseSuspended(response: Response): String + + /** + * Returns an observable with the response of the source image. + * + * @param page the page whose source image has to be downloaded. + */ + override fun fetchImage(page: Page): Observable { + return Observable.just(runBlocking { fetchImageSuspended(page) }) + } + + open suspend fun fetchImageSuspended(page: Page): Response { + return client.newCallWithProgress(imageRequestSuspended(page), page).await() + } + + /** + * Returns the request for getting the source image. Override only if it's needed to override + * the url, send different headers or request method like POST. + * + * @param page the chapter whose page list has to be fetched + */ + override fun imageRequest(page: Page): Request { + return runBlocking { imageRequestSuspended(page) } + } + + protected open suspend fun imageRequestSuspended(page: Page): Request { + return GET(page.imageUrl!!, headers) + } + + /** + * Called before inserting a new chapter into database. Use it if you need to override chapter + * fields, like the title or the chapter number. Do not change anything to [manga]. + * + * @param chapter the chapter to be added. + * @param manga the manga of the chapter. + */ + override fun prepareNewChapter(chapter: SChapter, manga: SManga) { + runBlocking { prepareNewChapterSuspended(chapter, manga) } + } + + open suspend fun prepareNewChapterSuspended(chapter: SChapter, manga: SManga) { + } + + /** + * Returns the list of filters for the source. + */ + override fun getFilterList() = runBlocking { getFilterListSuspended() } + + open suspend fun getFilterListSuspended() = FilterList() + + companion object { + data class LicencedException(override val message: String?) : Exception() + } +} diff --git a/app/src/main/java/eu/kanade/tachiyomi/source/online/all/MergedSource.kt b/app/src/main/java/eu/kanade/tachiyomi/source/online/all/MergedSource.kt index 6e77e18ac..bd08aa245 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/source/online/all/MergedSource.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/source/online/all/MergedSource.kt @@ -1,6 +1,5 @@ package eu.kanade.tachiyomi.source.online.all -import android.util.Log import com.elvishew.xlog.XLog import com.github.salomonbrys.kotson.fromJson import com.google.gson.Gson @@ -14,28 +13,28 @@ import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.source.model.SChapter import eu.kanade.tachiyomi.source.model.SManga import eu.kanade.tachiyomi.source.online.HttpSource +import eu.kanade.tachiyomi.source.online.SuspendHttpSource import exh.MERGED_SOURCE_ID +import exh.util.asFlow import exh.util.await -import hu.akarnokd.rxjava.interop.RxJavaInterop +import exh.util.awaitSingle import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.take import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.rx2.asFlowable -import kotlinx.coroutines.rx2.asSingle +import kotlinx.coroutines.withContext import okhttp3.Response import rx.Observable -import rx.schedulers.Schedulers import uy.kohesive.injekt.injectLazy // TODO LocalSource compatibility // TODO Disable clear database option -class MergedSource : HttpSource() { +class MergedSource : SuspendHttpSource() { private val db: DatabaseHelper by injectLazy() private val sourceManager: SourceManager by injectLazy() private val gson: Gson by injectLazy() @@ -44,47 +43,47 @@ class MergedSource : HttpSource() { override val baseUrl = "" - override fun popularMangaRequest(page: Int) = throw UnsupportedOperationException() - override fun popularMangaParse(response: Response) = throw UnsupportedOperationException() - override fun searchMangaRequest(page: Int, query: String, filters: FilterList) = throw UnsupportedOperationException() - override fun searchMangaParse(response: Response) = throw UnsupportedOperationException() - override fun latestUpdatesRequest(page: Int) = throw UnsupportedOperationException() - override fun latestUpdatesParse(response: Response) = throw UnsupportedOperationException() + override suspend fun popularMangaRequestSuspended(page: Int) = throw UnsupportedOperationException() + override suspend fun popularMangaParseSuspended(response: Response) = throw UnsupportedOperationException() + override suspend fun searchMangaRequestSuspended(page: Int, query: String, filters: FilterList) = throw UnsupportedOperationException() + override suspend fun searchMangaParseSuspended(response: Response) = throw UnsupportedOperationException() + override suspend fun latestUpdatesRequestSuspended(page: Int) = throw UnsupportedOperationException() + override suspend fun latestUpdatesParseSuspended(response: Response) = throw UnsupportedOperationException() - override fun fetchMangaDetails(manga: SManga): Observable { - return RxJavaInterop.toV1Observable( - readMangaConfig(manga).load(db, sourceManager).take(1).map { loaded -> - SManga.create().apply { - this.copyFrom(loaded.manga) - url = manga.url - } - }.asFlowable() - ) + override suspend fun fetchMangaDetailsSuspended(manga: SManga): SManga { + return readMangaConfig(manga).load(db, sourceManager).take(1).map { loaded -> + SManga.create().apply { + this.copyFrom(loaded.manga) + url = manga.url + } + }.first() } - override fun fetchChapterList(manga: SManga): Observable> { - return RxJavaInterop.toV1Single( - GlobalScope.async(Dispatchers.IO) { - val loadedMangas = readMangaConfig(manga).load(db, sourceManager).buffer() - loadedMangas.map { loadedManga -> - async(Dispatchers.IO) { - loadedManga.source.fetchChapterList(loadedManga.manga).map { chapterList -> - chapterList.map { chapter -> - chapter.apply { - url = writeUrlConfig(UrlConfig(loadedManga.source.id, url, loadedManga.manga.url)) - } - } - }.toSingle().await(Schedulers.io()) + override suspend fun fetchChapterListSuspended(manga: SManga): List { + val loadedMangas = readMangaConfig(manga).load(db, sourceManager).buffer() + return loadedMangas.flatMapMerge { loadedManga -> + withContext(Dispatchers.IO) { + loadedManga.source.fetchChapterList(loadedManga.manga).asFlow().map { chapterList -> + chapterList.map { chapter -> + chapter.apply { + url = writeUrlConfig( + UrlConfig( + loadedManga.source.id, + url, + loadedManga.manga.url + ) + ) + } } - }.buffer().map { it.await() }.toList().flatten() - }.asSingle(Dispatchers.IO) - ).toObservable() + } + } + }.buffer().toList().flatten() } - override fun mangaDetailsParse(response: Response) = throw UnsupportedOperationException() - override fun chapterListParse(response: Response) = throw UnsupportedOperationException() + override suspend fun mangaDetailsParseSuspended(response: Response) = throw UnsupportedOperationException() + override suspend fun chapterListParseSuspended(response: Response) = throw UnsupportedOperationException() - override fun fetchPageList(chapter: SChapter): Observable> { + override suspend fun fetchPageListSuspended(chapter: SChapter): List { val config = readUrlConfig(chapter.url) val source = sourceManager.getOrStub(config.source) return source.fetchPageList( @@ -96,18 +95,17 @@ class MergedSource : HttpSource() { pages.map { page -> page.copyWithUrl(writeUrlConfig(UrlConfig(config.source, page.url, config.mangaUrl))) } - } + }.awaitSingle() } - override fun fetchImageUrl(page: Page): Observable { + override suspend fun fetchImageUrlSuspended(page: Page): String { val config = readUrlConfig(page.url) - val source = sourceManager.getOrStub(config.source) as? HttpSource - ?: throw UnsupportedOperationException("This source does not support this operation!") - return source.fetchImageUrl(page.copyWithUrl(config.url)) + val source = sourceManager.getOrStub(config.source) as? HttpSource ?: throw UnsupportedOperationException("This source does not support this operation!") + return source.fetchImageUrl(page.copyWithUrl(config.url)).awaitSingle() } - override fun pageListParse(response: Response) = throw UnsupportedOperationException() - override fun imageUrlParse(response: Response) = throw UnsupportedOperationException() + override suspend fun pageListParseSuspended(response: Response) = throw UnsupportedOperationException() + override suspend fun imageUrlParseSuspended(response: Response) = throw UnsupportedOperationException() override fun fetchImage(page: Page): Observable { val config = readUrlConfig(page.url) @@ -116,10 +114,9 @@ class MergedSource : HttpSource() { return source.fetchImage(page.copyWithUrl(config.url)) } - override fun prepareNewChapter(chapter: SChapter, manga: SManga) { + override suspend fun prepareNewChapterSuspended(chapter: SChapter, manga: SManga) { val chapterConfig = readUrlConfig(chapter.url) - val source = sourceManager.getOrStub(chapterConfig.source) as? HttpSource - ?: throw UnsupportedOperationException("This source does not support this operation!") + val source = sourceManager.getOrStub(chapterConfig.source) as? HttpSource ?: throw UnsupportedOperationException("This source does not support this operation!") val copiedManga = SManga.create().apply { this.copyFrom(manga) url = chapterConfig.mangaUrl @@ -151,7 +148,7 @@ class MergedSource : HttpSource() { val url: String ) { suspend fun load(db: DatabaseHelper, sourceManager: SourceManager): LoadedMangaSource? { - val manga = db.getManga(url, source).executeAsBlocking() ?: return null + val manga = db.getManga(url, source).await() ?: return null val source = sourceManager.getOrStub(source) return LoadedMangaSource(source, manga) } @@ -163,12 +160,10 @@ class MergedSource : HttpSource() { ) { fun load(db: DatabaseHelper, sourceManager: SourceManager): Flow { return children.asFlow().map { mangaSource -> - mangaSource.load(db, sourceManager) - ?: run { - XLog.w("> Missing source manga: $mangaSource") - Log.d("MERGED", "> Missing source manga: $mangaSource") - throw IllegalStateException("Missing source manga: $mangaSource") - } + mangaSource.load(db, sourceManager) ?: run { + XLog.w("> Missing source manga: $mangaSource") + throw IllegalStateException("Missing source manga: $mangaSource") + } } } diff --git a/app/src/main/java/exh/util/RxUtil.kt b/app/src/main/java/exh/util/RxUtil.kt index 25be43130..5e17e5361 100644 --- a/app/src/main/java/exh/util/RxUtil.kt +++ b/app/src/main/java/exh/util/RxUtil.kt @@ -2,12 +2,30 @@ package exh.util import com.pushtorefresh.storio.operations.PreparedOperation import com.pushtorefresh.storio.sqlite.operations.get.PreparedGetObject +import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import rx.Completable +import rx.CompletableSubscriber +import rx.Emitter import rx.Observable +import rx.Observer import rx.Scheduler import rx.Single +import rx.SingleSubscriber +import rx.Subscriber import rx.Subscription import rx.subjects.ReplaySubject @@ -33,6 +51,7 @@ fun Observable.melt(): Observable { return rs } +@ExperimentalCoroutinesApi suspend fun Single.await(subscribeOn: Scheduler? = null): T { return suspendCancellableCoroutine { continuation -> val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this @@ -59,6 +78,7 @@ suspend fun Single.await(subscribeOn: Scheduler? = null): T { suspend fun PreparedOperation.await(): T = asRxSingle().await() suspend fun PreparedGetObject.await(): T? = asRxSingle().await() +@ExperimentalCoroutinesApi suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) { return suspendCancellableCoroutine { continuation -> val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this @@ -81,3 +101,109 @@ suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) { } } } + +suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont -> + subscribe(object : CompletableSubscriber { + override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCancellation(s) } + override fun onCompleted() { cont.resume(Unit) } + override fun onError(e: Throwable) { cont.resumeWithException(e) } + }) +} + +suspend fun Single.await(): T = suspendCancellableCoroutine { cont -> + cont.unsubscribeOnCancellation( + subscribe(object : SingleSubscriber() { + override fun onSuccess(t: T) { cont.resume(t) } + override fun onError(error: Throwable) { cont.resumeWithException(error) } + }) + ) +} + +@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) +suspend fun Observable.awaitFirst(): T = first().awaitOne() + +@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) +suspend fun Observable.awaitFirstOrDefault(default: T): T = firstOrDefault(default).awaitOne() + +@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) +suspend fun Observable.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne() + +@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) +suspend fun Observable.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(Observable.fromCallable(defaultValue)).first().awaitOne() + +@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) +suspend fun Observable.awaitLast(): T = last().awaitOne() + +@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) +suspend fun Observable.awaitSingle(): T = single().awaitOne() + +@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) +private suspend fun Observable.awaitOne(): T = suspendCancellableCoroutine { cont -> + cont.unsubscribeOnCancellation( + subscribe(object : Subscriber() { + override fun onStart() { request(1) } + override fun onNext(t: T) { cont.resume(t) } + override fun onCompleted() { if (cont.isActive) cont.resumeWithException(IllegalStateException("Should have invoked onNext")) } + override fun onError(e: Throwable) { + /* + * Rx1 observable throws NoSuchElementException if cancellation happened before + * element emission. To mitigate this we try to atomically resume continuation with exception: + * if resume failed, then we know that continuation successfully cancelled itself + */ + val token = cont.tryResumeWithException(e) + if (token != null) { + cont.completeResume(token) + } + } + }) + ) +} + +internal fun CancellableContinuation.unsubscribeOnCancellation(sub: Subscription) = + invokeOnCancellation { sub.unsubscribe() } + +@ExperimentalCoroutinesApi +fun Observable.asFlow(): Flow = callbackFlow { + val observer = object : Observer { + override fun onNext(t: T) { + offer(t) + } + + override fun onError(e: Throwable) { + close(e) + } + + override fun onCompleted() { + close() + } + } + val subscription = subscribe(observer) + awaitClose { subscription.unsubscribe() } +} + +@ExperimentalCoroutinesApi +fun Flow.asObservable(backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE): Observable { + return Observable.create( + { emitter -> + /* + * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if + * asObservable is already invoked from unconfined + */ + val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) { + try { + collect { emitter.onNext(it) } + emitter.onCompleted() + } catch (e: Throwable) { + // Ignore `CancellationException` as error, since it indicates "normal cancellation" + if (e !is CancellationException) { + emitter.onError(e) + } else { + emitter.onCompleted() + } + } + } + emitter.setCancellation { job.cancel() } + }, + backpressureMode + ) +}