Cleanup merged source code so I can modify it easier later on

This commit is contained in:
Jobobby04 2020-08-24 17:28:14 -04:00
parent 80960d87f2
commit 0a502fcf31
4 changed files with 535 additions and 64 deletions

View File

@ -300,8 +300,6 @@ dependencies {
final coroutines_version = '1.3.9'
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_version"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$coroutines_version"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$coroutines_version"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:$coroutines_version"
// For detecting memory leaks; see https://square.github.io/leakcanary/
// debugImplementation 'com.squareup.leakcanary:leakcanary-android:2.4'
@ -313,9 +311,6 @@ dependencies {
// Text distance (EH)
implementation 'info.debatty:java-string-similarity:1.2.1'
// RxJava 2 interop for Realm (EH)
implementation 'com.github.akarnokd:rxjava2-interop:0.13.7'
// Firebase (EH)
implementation 'com.google.firebase:firebase-analytics-ktx:17.5.0'
implementation 'com.google.firebase:firebase-crashlytics-ktx:17.2.1'

View File

@ -0,0 +1,355 @@
package eu.kanade.tachiyomi.source.online
import eu.kanade.tachiyomi.network.GET
import eu.kanade.tachiyomi.network.await
import eu.kanade.tachiyomi.network.newCallWithProgress
import eu.kanade.tachiyomi.source.model.FilterList
import eu.kanade.tachiyomi.source.model.MangasPage
import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.source.model.SChapter
import eu.kanade.tachiyomi.source.model.SManga
import kotlin.jvm.Throws
import kotlinx.coroutines.runBlocking
import okhttp3.Request
import okhttp3.Response
import rx.Observable
/**
* A simple implementation for sources from a website, but for Coroutines.
*/
abstract class SuspendHttpSource : HttpSource() {
/**
* Returns an observable containing a page with a list of manga. Normally it's not needed to
* override this method.
*
* @param page the page number to retrieve.
*/
override fun fetchPopularManga(page: Int): Observable<MangasPage> {
return Observable.just(runBlocking { fetchPopularMangaSuspended(page) })
}
open suspend fun fetchPopularMangaSuspended(page: Int): MangasPage {
val response = client.newCall(popularMangaRequestSuspended(page)).await()
return popularMangaParseSuspended(response)
}
/**
* Returns the request for the popular manga given the page.
*
* @param page the page number to retrieve.
*/
override fun popularMangaRequest(page: Int): Request {
return runBlocking { popularMangaRequestSuspended(page) }
}
protected abstract suspend fun popularMangaRequestSuspended(page: Int): Request
/**
* Parses the response from the site and returns a [MangasPage] object.
*
* @param response the response from the site.
*/
override fun popularMangaParse(response: Response): MangasPage {
return runBlocking { popularMangaParseSuspended(response) }
}
protected abstract suspend fun popularMangaParseSuspended(response: Response): MangasPage
/**
* Returns an observable containing a page with a list of manga. Normally it's not needed to
* override this method.
*
* @param page the page number to retrieve.
* @param query the search query.
* @param filters the list of filters to apply.
*/
override fun fetchSearchManga(page: Int, query: String, filters: FilterList): Observable<MangasPage> {
return Observable.just(runBlocking { fetchSearchMangaSuspended(page, query, filters) })
}
open suspend fun fetchSearchMangaSuspended(page: Int, query: String, filters: FilterList): MangasPage {
val response = client.newCall(searchMangaRequestSuspended(page, query, filters)).await()
return searchMangaParseSuspended(response)
}
/**
* Returns the request for the search manga given the page.
*
* @param page the page number to retrieve.
* @param query the search query.
* @param filters the list of filters to apply.
*/
override fun searchMangaRequest(page: Int, query: String, filters: FilterList): Request {
return runBlocking { searchMangaRequestSuspended(page, query, filters) }
}
protected abstract suspend fun searchMangaRequestSuspended(page: Int, query: String, filters: FilterList): Request
/**
* Parses the response from the site and returns a [MangasPage] object.
*
* @param response the response from the site.
*/
override fun searchMangaParse(response: Response): MangasPage {
return runBlocking { searchMangaParseSuspended(response) }
}
protected abstract suspend fun searchMangaParseSuspended(response: Response): MangasPage
/**
* Returns an observable containing a page with a list of latest manga updates.
*
* @param page the page number to retrieve.
*/
override fun fetchLatestUpdates(page: Int): Observable<MangasPage> {
return Observable.just(runBlocking { fetchLatestUpdatesSuspended(page) })
}
open suspend fun fetchLatestUpdatesSuspended(page: Int): MangasPage {
val response = client.newCall(latestUpdatesRequestSuspended(page)).await()
return latestUpdatesParseSuspended(response)
}
/**
* Returns the request for latest manga given the page.
*
* @param page the page number to retrieve.
*/
override fun latestUpdatesRequest(page: Int): Request {
return runBlocking { latestUpdatesRequestSuspended(page) }
}
protected abstract suspend fun latestUpdatesRequestSuspended(page: Int): Request
/**
* Parses the response from the site and returns a [MangasPage] object.
*
* @param response the response from the site.
*/
override fun latestUpdatesParse(response: Response): MangasPage {
return runBlocking { latestUpdatesParseSuspended(response) }
}
protected abstract suspend fun latestUpdatesParseSuspended(response: Response): MangasPage
/**
* Returns an observable with the updated details for a manga. Normally it's not needed to
* override this method.
*
* @param manga the manga to be updated.
*/
override fun fetchMangaDetails(manga: SManga): Observable<SManga> {
return Observable.just(runBlocking { fetchMangaDetailsSuspended(manga) })
}
open suspend fun fetchMangaDetailsSuspended(manga: SManga): SManga {
val response = client.newCall(mangaDetailsRequestSuspended(manga)).await()
return mangaDetailsParseSuspended(response).apply { initialized = true }
}
/**
* Returns the request for the details of a manga. Override only if it's needed to change the
* url, send different headers or request method like POST.
*
* @param manga the manga to be updated.
*/
override fun mangaDetailsRequest(manga: SManga): Request {
return runBlocking { mangaDetailsRequestSuspended(manga) }
}
open suspend fun mangaDetailsRequestSuspended(manga: SManga): Request {
return GET(baseUrl + manga.url, headers)
}
/**
* Parses the response from the site and returns the details of a manga.
*
* @param response the response from the site.
*/
override fun mangaDetailsParse(response: Response): SManga {
return runBlocking { mangaDetailsParseSuspended(response) }
}
protected abstract suspend fun mangaDetailsParseSuspended(response: Response): SManga
/**
* Returns an observable with the updated chapter list for a manga. Normally it's not needed to
* override this method. If a manga is licensed an empty chapter list observable is returned
*
* @param manga the manga to look for chapters.
*/
override fun fetchChapterList(manga: SManga): Observable<List<SChapter>> {
return try {
Observable.just(runBlocking { fetchChapterListSuspended(manga) })
} catch (e: LicencedException) {
Observable.error(Exception("Licensed - No chapters to show"))
}
}
@Throws(LicencedException::class)
open suspend fun fetchChapterListSuspended(manga: SManga): List<SChapter> {
return if (manga.status != SManga.LICENSED) {
val response = client.newCall(chapterListRequestSuspended(manga)).await()
chapterListParseSuspended(response)
} else {
throw LicencedException("Licensed - No chapters to show")
}
}
/**
* Returns the request for updating the chapter list. Override only if it's needed to override
* the url, send different headers or request method like POST.
*
* @param manga the manga to look for chapters.
*/
override fun chapterListRequest(manga: SManga): Request {
return runBlocking { chapterListRequestSuspended(manga) }
}
protected open suspend fun chapterListRequestSuspended(manga: SManga): Request {
return GET(baseUrl + manga.url, headers)
}
/**
* Parses the response from the site and returns a list of chapters.
*
* @param response the response from the site.
*/
override fun chapterListParse(response: Response): List<SChapter> {
return runBlocking { chapterListParseSuspended(response) }
}
protected abstract suspend fun chapterListParseSuspended(response: Response): List<SChapter>
/**
* Returns an observable with the page list for a chapter.
*
* @param chapter the chapter whose page list has to be fetched.
*/
override fun fetchPageList(chapter: SChapter): Observable<List<Page>> {
return Observable.just(runBlocking { fetchPageListSuspended(chapter) })
}
open suspend fun fetchPageListSuspended(chapter: SChapter): List<Page> {
val response = client.newCall(pageListRequestSuspended(chapter)).await()
return pageListParseSuspended(response)
}
/**
* Returns the request for getting the page list. Override only if it's needed to override the
* url, send different headers or request method like POST.
*
* @param chapter the chapter whose page list has to be fetched.
*/
override fun pageListRequest(chapter: SChapter): Request {
return runBlocking { pageListRequestSuspended(chapter) }
}
protected open suspend fun pageListRequestSuspended(chapter: SChapter): Request {
return GET(baseUrl + chapter.url, headers)
}
/**
* Parses the response from the site and returns a list of pages.
*
* @param response the response from the site.
*/
override fun pageListParse(response: Response): List<Page> {
return runBlocking { pageListParseSuspended(response) }
}
protected abstract suspend fun pageListParseSuspended(response: Response): List<Page>
/**
* Returns an observable with the page containing the source url of the image. If there's any
* error, it will return null instead of throwing an exception.
*
* @param page the page whose source image has to be fetched.
*/
override fun fetchImageUrl(page: Page): Observable<String> {
return Observable.just(runBlocking { fetchImageUrlSuspended(page) })
}
open suspend fun fetchImageUrlSuspended(page: Page): String {
val response = client.newCall(imageUrlRequestSuspended(page)).await()
return imageUrlParseSuspended(response)
}
/**
* Returns the request for getting the url to the source image. Override only if it's needed to
* override the url, send different headers or request method like POST.
*
* @param page the chapter whose page list has to be fetched
*/
override fun imageUrlRequest(page: Page): Request {
return runBlocking { imageUrlRequestSuspended(page) }
}
protected open suspend fun imageUrlRequestSuspended(page: Page): Request {
return GET(page.url, headers)
}
/**
* Parses the response from the site and returns the absolute url to the source image.
*
* @param response the response from the site.
*/
override fun imageUrlParse(response: Response): String {
return runBlocking { imageUrlParseSuspended(response) }
}
protected abstract suspend fun imageUrlParseSuspended(response: Response): String
/**
* Returns an observable with the response of the source image.
*
* @param page the page whose source image has to be downloaded.
*/
override fun fetchImage(page: Page): Observable<Response> {
return Observable.just(runBlocking { fetchImageSuspended(page) })
}
open suspend fun fetchImageSuspended(page: Page): Response {
return client.newCallWithProgress(imageRequestSuspended(page), page).await()
}
/**
* Returns the request for getting the source image. Override only if it's needed to override
* the url, send different headers or request method like POST.
*
* @param page the chapter whose page list has to be fetched
*/
override fun imageRequest(page: Page): Request {
return runBlocking { imageRequestSuspended(page) }
}
protected open suspend fun imageRequestSuspended(page: Page): Request {
return GET(page.imageUrl!!, headers)
}
/**
* Called before inserting a new chapter into database. Use it if you need to override chapter
* fields, like the title or the chapter number. Do not change anything to [manga].
*
* @param chapter the chapter to be added.
* @param manga the manga of the chapter.
*/
override fun prepareNewChapter(chapter: SChapter, manga: SManga) {
runBlocking { prepareNewChapterSuspended(chapter, manga) }
}
open suspend fun prepareNewChapterSuspended(chapter: SChapter, manga: SManga) {
}
/**
* Returns the list of filters for the source.
*/
override fun getFilterList() = runBlocking { getFilterListSuspended() }
open suspend fun getFilterListSuspended() = FilterList()
companion object {
data class LicencedException(override val message: String?) : Exception()
}
}

View File

@ -1,6 +1,5 @@
package eu.kanade.tachiyomi.source.online.all
import android.util.Log
import com.elvishew.xlog.XLog
import com.github.salomonbrys.kotson.fromJson
import com.google.gson.Gson
@ -14,28 +13,28 @@ import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.source.model.SChapter
import eu.kanade.tachiyomi.source.model.SManga
import eu.kanade.tachiyomi.source.online.HttpSource
import eu.kanade.tachiyomi.source.online.SuspendHttpSource
import exh.MERGED_SOURCE_ID
import exh.util.asFlow
import exh.util.await
import hu.akarnokd.rxjava.interop.RxJavaInterop
import exh.util.awaitSingle
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.rx2.asFlowable
import kotlinx.coroutines.rx2.asSingle
import kotlinx.coroutines.withContext
import okhttp3.Response
import rx.Observable
import rx.schedulers.Schedulers
import uy.kohesive.injekt.injectLazy
// TODO LocalSource compatibility
// TODO Disable clear database option
class MergedSource : HttpSource() {
class MergedSource : SuspendHttpSource() {
private val db: DatabaseHelper by injectLazy()
private val sourceManager: SourceManager by injectLazy()
private val gson: Gson by injectLazy()
@ -44,47 +43,47 @@ class MergedSource : HttpSource() {
override val baseUrl = ""
override fun popularMangaRequest(page: Int) = throw UnsupportedOperationException()
override fun popularMangaParse(response: Response) = throw UnsupportedOperationException()
override fun searchMangaRequest(page: Int, query: String, filters: FilterList) = throw UnsupportedOperationException()
override fun searchMangaParse(response: Response) = throw UnsupportedOperationException()
override fun latestUpdatesRequest(page: Int) = throw UnsupportedOperationException()
override fun latestUpdatesParse(response: Response) = throw UnsupportedOperationException()
override suspend fun popularMangaRequestSuspended(page: Int) = throw UnsupportedOperationException()
override suspend fun popularMangaParseSuspended(response: Response) = throw UnsupportedOperationException()
override suspend fun searchMangaRequestSuspended(page: Int, query: String, filters: FilterList) = throw UnsupportedOperationException()
override suspend fun searchMangaParseSuspended(response: Response) = throw UnsupportedOperationException()
override suspend fun latestUpdatesRequestSuspended(page: Int) = throw UnsupportedOperationException()
override suspend fun latestUpdatesParseSuspended(response: Response) = throw UnsupportedOperationException()
override fun fetchMangaDetails(manga: SManga): Observable<SManga> {
return RxJavaInterop.toV1Observable(
readMangaConfig(manga).load(db, sourceManager).take(1).map { loaded ->
SManga.create().apply {
this.copyFrom(loaded.manga)
url = manga.url
}
}.asFlowable()
)
override suspend fun fetchMangaDetailsSuspended(manga: SManga): SManga {
return readMangaConfig(manga).load(db, sourceManager).take(1).map { loaded ->
SManga.create().apply {
this.copyFrom(loaded.manga)
url = manga.url
}
}.first()
}
override fun fetchChapterList(manga: SManga): Observable<List<SChapter>> {
return RxJavaInterop.toV1Single(
GlobalScope.async(Dispatchers.IO) {
val loadedMangas = readMangaConfig(manga).load(db, sourceManager).buffer()
loadedMangas.map { loadedManga ->
async(Dispatchers.IO) {
loadedManga.source.fetchChapterList(loadedManga.manga).map { chapterList ->
chapterList.map { chapter ->
chapter.apply {
url = writeUrlConfig(UrlConfig(loadedManga.source.id, url, loadedManga.manga.url))
}
}
}.toSingle().await(Schedulers.io())
override suspend fun fetchChapterListSuspended(manga: SManga): List<SChapter> {
val loadedMangas = readMangaConfig(manga).load(db, sourceManager).buffer()
return loadedMangas.flatMapMerge { loadedManga ->
withContext(Dispatchers.IO) {
loadedManga.source.fetchChapterList(loadedManga.manga).asFlow().map { chapterList ->
chapterList.map { chapter ->
chapter.apply {
url = writeUrlConfig(
UrlConfig(
loadedManga.source.id,
url,
loadedManga.manga.url
)
)
}
}
}.buffer().map { it.await() }.toList().flatten()
}.asSingle(Dispatchers.IO)
).toObservable()
}
}
}.buffer().toList().flatten()
}
override fun mangaDetailsParse(response: Response) = throw UnsupportedOperationException()
override fun chapterListParse(response: Response) = throw UnsupportedOperationException()
override suspend fun mangaDetailsParseSuspended(response: Response) = throw UnsupportedOperationException()
override suspend fun chapterListParseSuspended(response: Response) = throw UnsupportedOperationException()
override fun fetchPageList(chapter: SChapter): Observable<List<Page>> {
override suspend fun fetchPageListSuspended(chapter: SChapter): List<Page> {
val config = readUrlConfig(chapter.url)
val source = sourceManager.getOrStub(config.source)
return source.fetchPageList(
@ -96,18 +95,17 @@ class MergedSource : HttpSource() {
pages.map { page ->
page.copyWithUrl(writeUrlConfig(UrlConfig(config.source, page.url, config.mangaUrl)))
}
}
}.awaitSingle()
}
override fun fetchImageUrl(page: Page): Observable<String> {
override suspend fun fetchImageUrlSuspended(page: Page): String {
val config = readUrlConfig(page.url)
val source = sourceManager.getOrStub(config.source) as? HttpSource
?: throw UnsupportedOperationException("This source does not support this operation!")
return source.fetchImageUrl(page.copyWithUrl(config.url))
val source = sourceManager.getOrStub(config.source) as? HttpSource ?: throw UnsupportedOperationException("This source does not support this operation!")
return source.fetchImageUrl(page.copyWithUrl(config.url)).awaitSingle()
}
override fun pageListParse(response: Response) = throw UnsupportedOperationException()
override fun imageUrlParse(response: Response) = throw UnsupportedOperationException()
override suspend fun pageListParseSuspended(response: Response) = throw UnsupportedOperationException()
override suspend fun imageUrlParseSuspended(response: Response) = throw UnsupportedOperationException()
override fun fetchImage(page: Page): Observable<Response> {
val config = readUrlConfig(page.url)
@ -116,10 +114,9 @@ class MergedSource : HttpSource() {
return source.fetchImage(page.copyWithUrl(config.url))
}
override fun prepareNewChapter(chapter: SChapter, manga: SManga) {
override suspend fun prepareNewChapterSuspended(chapter: SChapter, manga: SManga) {
val chapterConfig = readUrlConfig(chapter.url)
val source = sourceManager.getOrStub(chapterConfig.source) as? HttpSource
?: throw UnsupportedOperationException("This source does not support this operation!")
val source = sourceManager.getOrStub(chapterConfig.source) as? HttpSource ?: throw UnsupportedOperationException("This source does not support this operation!")
val copiedManga = SManga.create().apply {
this.copyFrom(manga)
url = chapterConfig.mangaUrl
@ -151,7 +148,7 @@ class MergedSource : HttpSource() {
val url: String
) {
suspend fun load(db: DatabaseHelper, sourceManager: SourceManager): LoadedMangaSource? {
val manga = db.getManga(url, source).executeAsBlocking() ?: return null
val manga = db.getManga(url, source).await() ?: return null
val source = sourceManager.getOrStub(source)
return LoadedMangaSource(source, manga)
}
@ -163,12 +160,10 @@ class MergedSource : HttpSource() {
) {
fun load(db: DatabaseHelper, sourceManager: SourceManager): Flow<LoadedMangaSource> {
return children.asFlow().map { mangaSource ->
mangaSource.load(db, sourceManager)
?: run {
XLog.w("> Missing source manga: $mangaSource")
Log.d("MERGED", "> Missing source manga: $mangaSource")
throw IllegalStateException("Missing source manga: $mangaSource")
}
mangaSource.load(db, sourceManager) ?: run {
XLog.w("> Missing source manga: $mangaSource")
throw IllegalStateException("Missing source manga: $mangaSource")
}
}
}

View File

@ -2,12 +2,30 @@ package exh.util
import com.pushtorefresh.storio.operations.PreparedOperation
import com.pushtorefresh.storio.sqlite.operations.get.PreparedGetObject
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import rx.Completable
import rx.CompletableSubscriber
import rx.Emitter
import rx.Observable
import rx.Observer
import rx.Scheduler
import rx.Single
import rx.SingleSubscriber
import rx.Subscriber
import rx.Subscription
import rx.subjects.ReplaySubject
@ -33,6 +51,7 @@ fun <T> Observable<T>.melt(): Observable<T> {
return rs
}
@ExperimentalCoroutinesApi
suspend fun <T> Single<T>.await(subscribeOn: Scheduler? = null): T {
return suspendCancellableCoroutine { continuation ->
val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
@ -59,6 +78,7 @@ suspend fun <T> Single<T>.await(subscribeOn: Scheduler? = null): T {
suspend fun <T> PreparedOperation<T>.await(): T = asRxSingle().await()
suspend fun <T> PreparedGetObject<T>.await(): T? = asRxSingle().await()
@ExperimentalCoroutinesApi
suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) {
return suspendCancellableCoroutine { continuation ->
val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
@ -81,3 +101,109 @@ suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) {
}
}
}
suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableSubscriber {
override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCancellation(s) }
override fun onCompleted() { cont.resume(Unit) }
override fun onError(e: Throwable) { cont.resumeWithException(e) }
})
}
suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
cont.unsubscribeOnCancellation(
subscribe(object : SingleSubscriber<T>() {
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
)
}
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T = firstOrDefault(default).awaitOne()
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(Observable.fromCallable(defaultValue)).first().awaitOne()
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
cont.unsubscribeOnCancellation(
subscribe(object : Subscriber<T>() {
override fun onStart() { request(1) }
override fun onNext(t: T) { cont.resume(t) }
override fun onCompleted() { if (cont.isActive) cont.resumeWithException(IllegalStateException("Should have invoked onNext")) }
override fun onError(e: Throwable) {
/*
* Rx1 observable throws NoSuchElementException if cancellation happened before
* element emission. To mitigate this we try to atomically resume continuation with exception:
* if resume failed, then we know that continuation successfully cancelled itself
*/
val token = cont.tryResumeWithException(e)
if (token != null) {
cont.completeResume(token)
}
}
})
)
}
internal fun <T> CancellableContinuation<T>.unsubscribeOnCancellation(sub: Subscription) =
invokeOnCancellation { sub.unsubscribe() }
@ExperimentalCoroutinesApi
fun <T : Any> Observable<T>.asFlow(): Flow<T> = callbackFlow {
val observer = object : Observer<T> {
override fun onNext(t: T) {
offer(t)
}
override fun onError(e: Throwable) {
close(e)
}
override fun onCompleted() {
close()
}
}
val subscription = subscribe(observer)
awaitClose { subscription.unsubscribe() }
}
@ExperimentalCoroutinesApi
fun <T : Any> Flow<T>.asObservable(backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE): Observable<T> {
return Observable.create(
{ emitter ->
/*
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
* asObservable is already invoked from unconfined
*/
val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
try {
collect { emitter.onNext(it) }
emitter.onCompleted()
} catch (e: Throwable) {
// Ignore `CancellationException` as error, since it indicates "normal cancellation"
if (e !is CancellationException) {
emitter.onError(e)
} else {
emitter.onCompleted()
}
}
}
emitter.setCancellation { job.cancel() }
},
backpressureMode
)
}