Use Flow in ExtensionManager and SourceManager (#7547)

- Replace ExtensionManager relay and observable with Flow
- Inverse SourceManager dependency
    - SourceManager observers ExtensionManager flow
- Separate SourceData from SourceRepository as it created a circular dependency

(cherry picked from commit 35ec5936587799f33a264f57729cb4b75c5a0f72)

# Conflicts:
#	app/src/main/java/eu/kanade/tachiyomi/source/SourceManager.kt
This commit is contained in:
Andreas 2022-07-16 21:08:15 +02:00 committed by Jobobby04
parent dd68ef4ba8
commit d8042f7182
14 changed files with 158 additions and 232 deletions

View File

@ -0,0 +1,23 @@
package eu.kanade.data.source
import eu.kanade.data.DatabaseHandler
import eu.kanade.domain.source.model.SourceData
import eu.kanade.domain.source.repository.SourceDataRepository
import kotlinx.coroutines.flow.Flow
class SourceDataRepositoryImpl(
private val handler: DatabaseHandler,
) : SourceDataRepository {
override fun subscribeAll(): Flow<List<SourceData>> {
return handler.subscribeToList { sourcesQueries.findAll(sourceDataMapper) }
}
override suspend fun getSourceData(id: Long): SourceData? {
return handler.awaitOneOrNull { sourcesQueries.findOne(id, sourceDataMapper) }
}
override suspend fun upsertSourceData(id: Long, lang: String, name: String) {
handler.await { sourcesQueries.upsert(id, lang, name) }
}
}

View File

@ -2,7 +2,6 @@ package eu.kanade.data.source
import eu.kanade.data.DatabaseHandler
import eu.kanade.domain.source.model.Source
import eu.kanade.domain.source.model.SourceData
import eu.kanade.domain.source.repository.SourceRepository
import eu.kanade.tachiyomi.source.LocalSource
import eu.kanade.tachiyomi.source.SourceManager
@ -51,12 +50,4 @@ class SourceRepositoryImpl(
}
}
}
override suspend fun getSourceData(id: Long): SourceData? {
return handler.awaitOneOrNull { sourcesQueries.getSourceData(id, sourceDataMapper) }
}
override suspend fun upsertSourceData(id: Long, lang: String, name: String) {
handler.await { sourcesQueries.upsert(id, lang, name) }
}
}

View File

@ -4,6 +4,7 @@ import eu.kanade.data.category.CategoryRepositoryImpl
import eu.kanade.data.chapter.ChapterRepositoryImpl
import eu.kanade.data.history.HistoryRepositoryImpl
import eu.kanade.data.manga.MangaRepositoryImpl
import eu.kanade.data.source.SourceDataRepositoryImpl
import eu.kanade.data.source.SourceRepositoryImpl
import eu.kanade.data.track.TrackRepositoryImpl
import eu.kanade.domain.category.interactor.CreateCategoryWithName
@ -47,14 +48,13 @@ import eu.kanade.domain.manga.interactor.UpdateManga
import eu.kanade.domain.manga.repository.MangaRepository
import eu.kanade.domain.source.interactor.GetEnabledSources
import eu.kanade.domain.source.interactor.GetLanguagesWithSources
import eu.kanade.domain.source.interactor.GetSourceData
import eu.kanade.domain.source.interactor.GetSourcesWithFavoriteCount
import eu.kanade.domain.source.interactor.GetSourcesWithNonLibraryManga
import eu.kanade.domain.source.interactor.SetMigrateSorting
import eu.kanade.domain.source.interactor.ToggleLanguage
import eu.kanade.domain.source.interactor.ToggleSource
import eu.kanade.domain.source.interactor.ToggleSourcePin
import eu.kanade.domain.source.interactor.UpsertSourceData
import eu.kanade.domain.source.repository.SourceDataRepository
import eu.kanade.domain.source.repository.SourceRepository
import eu.kanade.domain.track.interactor.DeleteTrack
import eu.kanade.domain.track.interactor.GetTracks
@ -120,15 +120,14 @@ class DomainModule : InjektModule {
addFactory { GetExtensionLanguages(get(), get()) }
addSingletonFactory<SourceRepository> { SourceRepositoryImpl(get(), get()) }
addSingletonFactory<SourceDataRepository> { SourceDataRepositoryImpl(get()) }
addFactory { GetEnabledSources(get(), get()) }
addFactory { GetLanguagesWithSources(get(), get()) }
addFactory { GetSourceData(get()) }
addFactory { GetSourcesWithFavoriteCount(get(), get()) }
addFactory { GetSourcesWithNonLibraryManga(get()) }
addFactory { SetMigrateSorting(get()) }
addFactory { ToggleLanguage(get()) }
addFactory { ToggleSource(get()) }
addFactory { ToggleSourcePin(get()) }
addFactory { UpsertSourceData(get()) }
}
}

View File

@ -14,7 +14,7 @@ class GetExtensionLanguages(
fun subscribe(): Flow<List<String>> {
return combine(
preferences.enabledLanguages().asFlow(),
extensionManager.getAvailableExtensionsObservable().asFlow(),
extensionManager.getAvailableExtensionsFlow(),
) { enabledLanguage, availableExtensions ->
availableExtensions
.map { it.lang }

View File

@ -1,6 +1,5 @@
package eu.kanade.domain.extension.interactor
import eu.kanade.core.util.asFlow
import eu.kanade.tachiyomi.data.preference.PreferencesHelper
import eu.kanade.tachiyomi.extension.ExtensionManager
import eu.kanade.tachiyomi.extension.model.Extension
@ -15,7 +14,7 @@ class GetExtensionUpdates(
fun subscribe(): Flow<List<Extension.Installed>> {
val showNsfwSources = preferences.showNsfwSource().get()
return extensionManager.getInstalledExtensionsObservable().asFlow()
return extensionManager.getInstalledExtensionsFlow()
.map { installed ->
installed
.filter { it.hasUpdate && (showNsfwSources || it.isNsfw.not()) }

View File

@ -19,9 +19,9 @@ class GetExtensions(
return combine(
preferences.enabledLanguages().asFlow(),
extensionManager.getInstalledExtensionsObservable().asFlow(),
extensionManager.getUntrustedExtensionsObservable().asFlow(),
extensionManager.getAvailableExtensionsObservable().asFlow(),
extensionManager.getInstalledExtensionsFlow(),
extensionManager.getUntrustedExtensionsFlow(),
extensionManager.getAvailableExtensionsFlow(),
) { _activeLanguages, _installed, _untrusted, _available ->
val installed = _installed

View File

@ -1,20 +0,0 @@
package eu.kanade.domain.source.interactor
import eu.kanade.domain.source.model.SourceData
import eu.kanade.domain.source.repository.SourceRepository
import eu.kanade.tachiyomi.util.system.logcat
import logcat.LogPriority
class GetSourceData(
private val repository: SourceRepository,
) {
suspend fun await(id: Long): SourceData? {
return try {
repository.getSourceData(id)
} catch (e: Exception) {
logcat(LogPriority.ERROR, e)
null
}
}
}

View File

@ -1,19 +0,0 @@
package eu.kanade.domain.source.interactor
import eu.kanade.domain.source.model.SourceData
import eu.kanade.domain.source.repository.SourceRepository
import eu.kanade.tachiyomi.util.system.logcat
import logcat.LogPriority
class UpsertSourceData(
private val repository: SourceRepository,
) {
suspend fun await(sourceData: SourceData) {
try {
repository.upsertSourceData(sourceData.id, sourceData.lang, sourceData.name)
} catch (e: Exception) {
logcat(LogPriority.ERROR, e)
}
}
}

View File

@ -0,0 +1,12 @@
package eu.kanade.domain.source.repository
import eu.kanade.domain.source.model.SourceData
import kotlinx.coroutines.flow.Flow
interface SourceDataRepository {
fun subscribeAll(): Flow<List<SourceData>>
suspend fun getSourceData(id: Long): SourceData?
suspend fun upsertSourceData(id: Long, lang: String, name: String)
}

View File

@ -1,7 +1,6 @@
package eu.kanade.domain.source.repository
import eu.kanade.domain.source.model.Source
import eu.kanade.domain.source.model.SourceData
import kotlinx.coroutines.flow.Flow
import eu.kanade.tachiyomi.source.Source as LoadedSource
@ -14,8 +13,4 @@ interface SourceRepository {
fun getSourcesWithFavoriteCount(): Flow<List<Pair<Source, Long>>>
fun getSourcesWithNonLibraryManga(): Flow<List<Pair<LoadedSource, Long>>>
suspend fun getSourceData(id: Long): SourceData?
suspend fun upsertSourceData(id: Long, lang: String, name: String)
}

View File

@ -101,10 +101,10 @@ class AppModule(val app: Application) : InjektModule {
addSingletonFactory { NetworkHelper(app) }
addSingletonFactory { SourceManager(app).also { get<ExtensionManager>().init(it) } }
addSingletonFactory { ExtensionManager(app) }
addSingletonFactory { SourceManager(app, get(), get()) }
addSingletonFactory { DownloadManager(app) }
addSingletonFactory { TrackManager(app) }

View File

@ -15,7 +15,6 @@ import eu.kanade.tachiyomi.extension.util.ExtensionInstallReceiver
import eu.kanade.tachiyomi.extension.util.ExtensionInstaller
import eu.kanade.tachiyomi.extension.util.ExtensionLoader
import eu.kanade.tachiyomi.source.Source
import eu.kanade.tachiyomi.source.SourceManager
import eu.kanade.tachiyomi.util.lang.launchNow
import eu.kanade.tachiyomi.util.preference.plusAssign
import eu.kanade.tachiyomi.util.system.logcat
@ -102,22 +101,23 @@ class ExtensionManager(
// SY <--
}
/**
* Relay used to notify the available extensions.
*/
private val availableExtensionsRelay = BehaviorRelay.create<List<Extension.Available>>()
/**
* List of the currently available extensions.
*/
var availableExtensions = emptyList<Extension.Available>()
private set(value) {
field = value
availableExtensionsRelay.call(value)
availableExtensionsFlow.value = field
updatedInstalledExtensionsStatuses(value)
setupAvailableExtensionsSourcesDataMap(value)
}
private val availableExtensionsFlow = MutableStateFlow(availableExtensions)
fun getAvailableExtensionsFlow(): StateFlow<List<Extension.Available>> {
return availableExtensionsFlow.asStateFlow()
}
private var availableExtensionsSourcesData: Map<Long, SourceData> = mapOf()
private fun setupAvailableExtensionsSourcesDataMap(extensions: List<Extension.Available>) {
@ -133,30 +133,22 @@ class ExtensionManager(
var unalteredAvailableExtensions = emptyList<Extension.Available>()
// SY <--
/**
* Relay used to notify the untrusted extensions.
*/
private val untrustedExtensionsRelay = BehaviorRelay.create<List<Extension.Untrusted>>()
/**
* List of the currently untrusted extensions.
*/
var untrustedExtensions = emptyList<Extension.Untrusted>()
private set(value) {
field = value
untrustedExtensionsRelay.call(value)
untrustedExtensionsFlow.value = field
}
/**
* The source manager where the sources of the extensions are added.
*/
private lateinit var sourceManager: SourceManager
private val untrustedExtensionsFlow = MutableStateFlow(untrustedExtensions)
/**
* Initializes this manager with the given source manager.
*/
fun init(sourceManager: SourceManager) {
this.sourceManager = sourceManager
fun getUntrustedExtensionsFlow(): StateFlow<List<Extension.Untrusted>> {
return untrustedExtensionsFlow.asStateFlow()
}
init {
initExtensions()
ExtensionInstallReceiver(InstallationListener()).register(context)
}
@ -170,9 +162,6 @@ class ExtensionManager(
installedExtensions = extensions
.filterIsInstance<LoadResult.Success>()
.map { it.extension }
installedExtensions
.flatMap { it.sources }
.forEach { sourceManager.registerSource(it) }
untrustedExtensions = extensions
.filterIsInstance<LoadResult.Untrusted>()
@ -198,27 +187,6 @@ class ExtensionManager(
}
// EXH <--
/**
* Returns the relay of the installed extensions as an observable.
*/
fun getInstalledExtensionsObservable(): Observable<List<Extension.Installed>> {
return installedExtensionsRelay.asObservable()
}
/**
* Returns the relay of the available extensions as an observable.
*/
fun getAvailableExtensionsObservable(): Observable<List<Extension.Available>> {
return availableExtensionsRelay.asObservable()
}
/**
* Returns the relay of the untrusted extensions as an observable.
*/
fun getUntrustedExtensionsObservable(): Observable<List<Extension.Untrusted>> {
return untrustedExtensionsRelay.asObservable()
}
/**
* Finds the available extensions in the [api] and updates [availableExtensions].
*/
@ -378,7 +346,6 @@ class ExtensionManager(
// SY <--
installedExtensions += extension
extension.sources.forEach { sourceManager.registerSource(it) }
}
/**
@ -399,11 +366,9 @@ class ExtensionManager(
val oldExtension = mutInstalledExtensions.find { it.pkgName == extension.pkgName }
if (oldExtension != null) {
mutInstalledExtensions -= oldExtension
extension.sources.forEach { sourceManager.unregisterSource(it) }
}
mutInstalledExtensions += extension
installedExtensions = mutInstalledExtensions
extension.sources.forEach { sourceManager.registerSource(it) }
}
/**
@ -416,7 +381,6 @@ class ExtensionManager(
val installedExtension = installedExtensions.find { it.pkgName == pkgName }
if (installedExtension != null) {
installedExtensions -= installedExtension
installedExtension.sources.forEach { sourceManager.unregisterSource(it) }
}
val untrustedExtension = untrustedExtensions.find { it.pkgName == pkgName }
if (untrustedExtension != null) {

View File

@ -1,9 +1,8 @@
package eu.kanade.tachiyomi.source
import android.content.Context
import eu.kanade.domain.source.interactor.GetSourceData
import eu.kanade.domain.source.interactor.UpsertSourceData
import eu.kanade.domain.source.model.SourceData
import eu.kanade.domain.source.repository.SourceDataRepository
import eu.kanade.tachiyomi.R
import eu.kanade.tachiyomi.data.preference.PreferencesHelper
import eu.kanade.tachiyomi.extension.ExtensionManager
@ -21,7 +20,6 @@ import eu.kanade.tachiyomi.source.online.english.EightMuses
import eu.kanade.tachiyomi.source.online.english.HBrowse
import eu.kanade.tachiyomi.source.online.english.Pururin
import eu.kanade.tachiyomi.source.online.english.Tsumino
import eu.kanade.tachiyomi.util.lang.launchIO
import exh.log.xLogD
import exh.source.BlacklistedSources
import exh.source.DelegatedHttpSource
@ -30,6 +28,7 @@ import exh.source.EIGHTMUSES_SOURCE_ID
import exh.source.EXH_SOURCE_ID
import exh.source.EnhancedHttpSource
import exh.source.HBROWSE_SOURCE_ID
import exh.source.MERGED_SOURCE_ID
import exh.source.PERV_EDEN_EN_SOURCE_ID
import exh.source.PERV_EDEN_IT_SOURCE_ID
import exh.source.PURURIN_SOURCE_ID
@ -40,11 +39,10 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import rx.Observable
import tachiyomi.source.model.ChapterInfo
@ -52,46 +50,102 @@ import tachiyomi.source.model.MangaInfo
import uy.kohesive.injekt.injectLazy
import kotlin.reflect.KClass
class SourceManager(private val context: Context) {
class SourceManager(
private val context: Context,
private val extensionManager: ExtensionManager,
private val sourceRepository: SourceDataRepository,
) {
private val extensionManager: ExtensionManager by injectLazy()
private val getSourceData: GetSourceData by injectLazy()
private val upsertSourceData: UpsertSourceData by injectLazy()
private val scope = CoroutineScope(Job() + Dispatchers.IO)
private var sourcesMap = emptyMap<Long, Source>()
set(value) {
field = value
sourcesMapFlow.value = field
}
private val sourcesMapFlow = MutableStateFlow(sourcesMap)
private val sourcesMap = mutableMapOf<Long, Source>()
private val stubSourcesMap = mutableMapOf<Long, StubSource>()
private val _catalogueSources: MutableStateFlow<List<CatalogueSource>> = MutableStateFlow(listOf())
val catalogueSources: Flow<List<CatalogueSource>> = _catalogueSources
val onlineSources: Flow<List<HttpSource>> =
_catalogueSources.map { sources -> sources.filterIsInstance<HttpSource>() }
val catalogueSources: Flow<List<CatalogueSource>> = sourcesMapFlow.map { it.values.filterIsInstance<CatalogueSource>() }
val onlineSources: Flow<List<HttpSource>> = catalogueSources.map { sources -> sources.filterIsInstance<HttpSource>() }
// SY -->
private val prefs: PreferencesHelper by injectLazy()
private val scope = CoroutineScope(Job() + Dispatchers.Main)
private val preferences: PreferencesHelper by injectLazy()
// SY <--
init {
createInternalSources().forEach { registerSource(it) }
// SY -->
// Create internal sources
createEHSources().forEach { registerSource(it) }
// Watch the preference and manage Exhentai
prefs.enableExhentai().asFlow()
.drop(1)
.onEach {
if (it) {
registerSource(EHentai(EXH_SOURCE_ID, true, context))
} else {
sourcesMap.remove(EXH_SOURCE_ID)
scope.launch {
extensionManager.getInstalledExtensionsFlow()
// SY -->
.combine(preferences.enableExhentai().asFlow()) { extensions, enableExhentai ->
extensions to enableExhentai
}
}.launchIn(scope)
// SY <--
.collectLatest { (extensions, enableExhentai) ->
val mutableMap = mutableMapOf<Long, Source>(LocalSource.ID to LocalSource(context)).apply {
// SY -->
put(EH_SOURCE_ID, EHentai(EH_SOURCE_ID, false, context))
if (enableExhentai) {
put(EXH_SOURCE_ID, EHentai(EXH_SOURCE_ID, true, context))
}
put(MERGED_SOURCE_ID, MergedSource())
// SY <--
}
extensions.forEach { extension ->
extension.sources.mapNotNull { it.toInternalSource() }.forEach {
mutableMap[it.id] = it
registerStubSource(it.toSourceData())
}
}
sourcesMap = mutableMap
}
}
registerSource(MergedSource())
// SY <--
scope.launch {
sourceRepository.subscribeAll()
.collectLatest { sources ->
val mutableMap = stubSourcesMap.toMutableMap()
sources.forEach {
mutableMap[it.id] = StubSource(it)
}
}
}
}
private fun Source.toInternalSource(): Source? {
// EXH -->
val sourceQName = this::class.qualifiedName
val factories = DELEGATED_SOURCES.entries.filter { it.value.factory }.map { it.value.originalSourceQualifiedClassName }
val delegate = if (sourceQName != null) {
val matched = factories.find { sourceQName.startsWith(it) }
if (matched != null) {
DELEGATED_SOURCES[matched]
} else DELEGATED_SOURCES[sourceQName]
} else null
val newSource = if (this is HttpSource && delegate != null) {
xLogD("Delegating source: %s -> %s!", sourceQName, delegate.newSourceClass.qualifiedName)
val enhancedSource = EnhancedHttpSource(
this,
delegate.newSourceClass.constructors.find { it.parameters.size == 2 }!!.call(this, context),
)
currentDelegatedSources[enhancedSource.originalSource.id] = DelegatedSource(
enhancedSource.originalSource.name,
enhancedSource.originalSource.id,
enhancedSource.originalSource::class.qualifiedName ?: delegate.originalSourceQualifiedClassName,
(enhancedSource.enhancedSource as DelegatedHttpSource)::class,
delegate.factory,
)
enhancedSource
} else this
return if (id in BlacklistedSources.BLACKLISTED_EXT_SOURCES) {
xLogD("Removing blacklisted source: (id: %s, name: %s, lang: %s)!", id, name, (this as? CatalogueSource)?.lang)
null
} else newSource
// EXH <--
}
fun get(sourceKey: Long): Source? {
@ -127,91 +181,15 @@ class SourceManager(private val context: Context) {
}
// SY <--
internal fun registerSource(source: Source) {
// EXH -->
val sourceQName = source::class.qualifiedName
val factories = DELEGATED_SOURCES.entries.filter { it.value.factory }.map { it.value.originalSourceQualifiedClassName }
val delegate = if (sourceQName != null) {
val matched = factories.find { sourceQName.startsWith(it) }
if (matched != null) {
DELEGATED_SOURCES[matched]
} else DELEGATED_SOURCES[sourceQName]
} else null
val newSource = if (source is HttpSource && delegate != null) {
xLogD("Delegating source: %s -> %s!", sourceQName, delegate.newSourceClass.qualifiedName)
val enhancedSource = EnhancedHttpSource(
source,
delegate.newSourceClass.constructors.find { it.parameters.size == 2 }!!.call(source, context),
)
currentDelegatedSources[enhancedSource.originalSource.id] = DelegatedSource(
enhancedSource.originalSource.name,
enhancedSource.originalSource.id,
enhancedSource.originalSource::class.qualifiedName ?: delegate.originalSourceQualifiedClassName,
(enhancedSource.enhancedSource as DelegatedHttpSource)::class,
delegate.factory,
)
enhancedSource
} else source
if (source.id in BlacklistedSources.BLACKLISTED_EXT_SOURCES) {
xLogD("Removing blacklisted source: (id: %s, name: %s, lang: %s)!", source.id, source.name, (source as? CatalogueSource)?.lang)
return
}
// EXH <--
if (!sourcesMap.containsKey(source.id)) {
sourcesMap[source.id] = newSource
}
registerStubSource(source.toSourceData())
triggerCatalogueSources()
}
private fun registerStubSource(sourceData: SourceData) {
launchIO {
val dbSourceData = getSourceData.await(sourceData.id)
if (dbSourceData != sourceData) {
upsertSourceData.await(sourceData)
}
if (stubSourcesMap[sourceData.id]?.toSourceData() != sourceData) {
stubSourcesMap[sourceData.id] = StubSource(sourceData)
}
scope.launch {
val (id, lang, name) = sourceData
sourceRepository.upsertSourceData(id, lang, name)
}
}
internal fun unregisterSource(source: Source) {
sourcesMap.remove(source.id)
triggerCatalogueSources()
// SY -->
currentDelegatedSources.remove(source.id)
// SY <--
}
private fun triggerCatalogueSources() {
_catalogueSources.update {
sourcesMap.values.filterIsInstance<CatalogueSource>()
}
}
private fun createInternalSources(): List<Source> = listOf(
LocalSource(context),
)
// SY -->
private fun createEHSources(): List<Source> {
val sources = listOf<HttpSource>(
EHentai(EH_SOURCE_ID, false, context),
)
return if (prefs.enableExhentai().get()) {
sources + EHentai(EXH_SOURCE_ID, true, context)
} else sources
}
// SY <--
private suspend fun createStubSource(id: Long): StubSource {
getSourceData.await(id)?.let {
sourceRepository.getSourceData(id)?.let {
return StubSource(it)
}
extensionManager.getSourceData(id)?.let {

View File

@ -4,7 +4,11 @@ CREATE TABLE sources(
name TEXT NOT NULL
);
getSourceData:
findAll:
SELECT *
FROM sources;
findOne:
SELECT *
FROM sources
WHERE _id = :id;