feat: syncing etag and overall improvement. (#1155)
* chore: don't log the access token from google. Signed-off-by: KaiserBh <kaiserbh@proton.me> * chore: don't log the access token from google. Signed-off-by: KaiserBh <kaiserbh@proton.me> * chore: forgot to add sy stuff. The customInfo and readEntries wasn't taken into account, so when it was disabled it will always sync it because it's true by default in BackupOptions.kt. Should be fixed and now it doesn't reset the check mark. Signed-off-by: KaiserBh <kaiserbh@proton.me> * fix: same device sync. When same device is initiating the sync just update the remote that. Signed-off-by: KaiserBh <kaiserbh@proton.me> * refactor: throw early. When there is network failure or any sort during downloading just throw exception and stop syncing. Signed-off-by: KaiserBh <kaiserbh@proton.me> * refactor(gdrive): stream the json. People with over 3k library can't sync because we are hitting OOM ```java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Failed to allocate a 370950192 byte allocation with 25165824 free bytes and 281MB until OOM, target footprint 333990992, growth limit 603979776```. This should fix that for them but only gdrive. Signed-off-by: KaiserBh <kaiserbh@proton.me> * feat: a demo for sync with new api * refactor: perform early null checks * feat: restore even if push failed * feat: switch to protobuf * chore: show error notification when sync fails. Signed-off-by: KaiserBh <kaiserbh@proton.me> * fix: update order by merge * fix: call pushSyncData twice --------- Signed-off-by: KaiserBh <kaiserbh@proton.me> Co-authored-by: Cologler <skyoflw@gmail.com>
This commit is contained in:
parent
596a8d002f
commit
ed20d25452
@ -13,6 +13,8 @@ class SyncPreferences(
|
||||
fun clientAPIKey() = preferenceStore.getString("sync_client_api_key", "")
|
||||
fun lastSyncTimestamp() = preferenceStore.getLong(Preference.appStateKey("last_sync_timestamp"), 0L)
|
||||
|
||||
fun lastSyncEtag() = preferenceStore.getString("sync_etag", "")
|
||||
|
||||
fun syncInterval() = preferenceStore.getInt("sync_interval", 0)
|
||||
fun syncService() = preferenceStore.getInt("sync_service", 0)
|
||||
|
||||
@ -53,6 +55,11 @@ class SyncPreferences(
|
||||
appSettings = preferenceStore.getBoolean("appSettings", true).get(),
|
||||
sourceSettings = preferenceStore.getBoolean("sourceSettings", true).get(),
|
||||
privateSettings = preferenceStore.getBoolean("privateSettings", true).get(),
|
||||
|
||||
// SY -->
|
||||
customInfo = preferenceStore.getBoolean("customInfo", true).get(),
|
||||
readEntries = preferenceStore.getBoolean("readEntries", true).get()
|
||||
// SY <--
|
||||
)
|
||||
}
|
||||
|
||||
@ -65,6 +72,11 @@ class SyncPreferences(
|
||||
preferenceStore.getBoolean("appSettings", true).set(syncSettings.appSettings)
|
||||
preferenceStore.getBoolean("sourceSettings", true).set(syncSettings.sourceSettings)
|
||||
preferenceStore.getBoolean("privateSettings", true).set(syncSettings.privateSettings)
|
||||
|
||||
// SY -->
|
||||
preferenceStore.getBoolean("customInfo", true).set(syncSettings.customInfo)
|
||||
preferenceStore.getBoolean("readEntries", true).set(syncSettings.readEntries)
|
||||
// SY <--
|
||||
}
|
||||
|
||||
fun getSyncTriggerOptions(): SyncTriggerOptions {
|
||||
|
@ -9,4 +9,9 @@ data class SyncSettings(
|
||||
val appSettings: Boolean = true,
|
||||
val sourceSettings: Boolean = true,
|
||||
val privateSettings: Boolean = false,
|
||||
|
||||
// SY -->
|
||||
val customInfo: Boolean = true,
|
||||
val readEntries: Boolean = true
|
||||
// SY <--
|
||||
)
|
||||
|
@ -124,6 +124,11 @@ private class SyncSettingsSelectorModel(
|
||||
appSettings = syncSettings.appSettings,
|
||||
sourceSettings = syncSettings.sourceSettings,
|
||||
privateSettings = syncSettings.privateSettings,
|
||||
|
||||
// SY -->
|
||||
customInfo = syncSettings.customInfo,
|
||||
readEntries = syncSettings.readEntries,
|
||||
// SY <--
|
||||
)
|
||||
}
|
||||
|
||||
@ -137,6 +142,11 @@ private class SyncSettingsSelectorModel(
|
||||
appSettings = backupOptions.appSettings,
|
||||
sourceSettings = backupOptions.sourceSettings,
|
||||
privateSettings = backupOptions.privateSettings,
|
||||
|
||||
// SY -->
|
||||
customInfo = backupOptions.customInfo,
|
||||
readEntries = backupOptions.readEntries,
|
||||
// SY <--
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -88,7 +88,14 @@ class SyncManager(
|
||||
appSettings = syncOptions.appSettings,
|
||||
sourceSettings = syncOptions.sourceSettings,
|
||||
privateSettings = syncOptions.privateSettings,
|
||||
|
||||
// SY -->
|
||||
customInfo = syncOptions.customInfo,
|
||||
readEntries = syncOptions.readEntries,
|
||||
// SY <--
|
||||
)
|
||||
|
||||
logcat(LogPriority.DEBUG) { "Begin create backup" }
|
||||
val backup = Backup(
|
||||
backupManga = backupCreator.backupMangas(databaseManga, backupOptions),
|
||||
backupCategories = backupCreator.backupCategories(backupOptions),
|
||||
@ -100,9 +107,11 @@ class SyncManager(
|
||||
backupSavedSearches = backupCreator.backupSavedSearches(),
|
||||
// SY <--
|
||||
)
|
||||
logcat(LogPriority.DEBUG) { "End create backup" }
|
||||
|
||||
// Create the SyncData object
|
||||
val syncData = SyncData(
|
||||
deviceId = syncPreferences.uniqueDeviceID(),
|
||||
backup = backup,
|
||||
)
|
||||
|
||||
@ -129,8 +138,22 @@ class SyncManager(
|
||||
|
||||
val remoteBackup = syncService?.doSync(syncData)
|
||||
|
||||
if (remoteBackup == null) {
|
||||
logcat(LogPriority.DEBUG) { "Skip restore due to network issues" }
|
||||
// should we call showSyncError?
|
||||
return
|
||||
}
|
||||
|
||||
if (remoteBackup === syncData.backup){
|
||||
// nothing changed
|
||||
logcat(LogPriority.DEBUG) { "Skip restore due to remote was overwrite from local" }
|
||||
syncPreferences.lastSyncTimestamp().set(Date().time)
|
||||
notifier.showSyncSuccess("Sync completed successfully")
|
||||
return
|
||||
}
|
||||
|
||||
// Stop the sync early if the remote backup is null or empty
|
||||
if (remoteBackup?.backupManga?.size == 0) {
|
||||
if (remoteBackup.backupManga?.size == 0) {
|
||||
notifier.showSyncError("No data found on remote server.")
|
||||
return
|
||||
}
|
||||
@ -143,49 +166,47 @@ class SyncManager(
|
||||
return
|
||||
}
|
||||
|
||||
if (remoteBackup != null) {
|
||||
val (filteredFavorites, nonFavorites) = filterFavoritesAndNonFavorites(remoteBackup)
|
||||
updateNonFavorites(nonFavorites)
|
||||
val (filteredFavorites, nonFavorites) = filterFavoritesAndNonFavorites(remoteBackup)
|
||||
updateNonFavorites(nonFavorites)
|
||||
|
||||
val newSyncData = backup.copy(
|
||||
backupManga = filteredFavorites,
|
||||
backupCategories = remoteBackup.backupCategories,
|
||||
backupSources = remoteBackup.backupSources,
|
||||
backupPreferences = remoteBackup.backupPreferences,
|
||||
backupSourcePreferences = remoteBackup.backupSourcePreferences,
|
||||
val newSyncData = backup.copy(
|
||||
backupManga = filteredFavorites,
|
||||
backupCategories = remoteBackup.backupCategories,
|
||||
backupSources = remoteBackup.backupSources,
|
||||
backupPreferences = remoteBackup.backupPreferences,
|
||||
backupSourcePreferences = remoteBackup.backupSourcePreferences,
|
||||
|
||||
// SY -->
|
||||
backupSavedSearches = remoteBackup.backupSavedSearches,
|
||||
// SY <--
|
||||
// SY -->
|
||||
backupSavedSearches = remoteBackup.backupSavedSearches,
|
||||
// SY <--
|
||||
)
|
||||
|
||||
// It's local sync no need to restore data. (just update remote data)
|
||||
if (filteredFavorites.isEmpty()) {
|
||||
// update the sync timestamp
|
||||
syncPreferences.lastSyncTimestamp().set(Date().time)
|
||||
notifier.showSyncSuccess("Sync completed successfully")
|
||||
return
|
||||
}
|
||||
|
||||
val backupUri = writeSyncDataToCache(context, newSyncData)
|
||||
logcat(LogPriority.DEBUG) { "Got Backup Uri: $backupUri" }
|
||||
if (backupUri != null) {
|
||||
BackupRestoreJob.start(
|
||||
context,
|
||||
backupUri,
|
||||
sync = true,
|
||||
options = RestoreOptions(
|
||||
appSettings = true,
|
||||
sourceSettings = true,
|
||||
library = true,
|
||||
),
|
||||
)
|
||||
|
||||
// It's local sync no need to restore data. (just update remote data)
|
||||
if (filteredFavorites.isEmpty()) {
|
||||
// update the sync timestamp
|
||||
syncPreferences.lastSyncTimestamp().set(Date().time)
|
||||
notifier.showSyncSuccess("Sync completed successfully")
|
||||
return
|
||||
}
|
||||
|
||||
val backupUri = writeSyncDataToCache(context, newSyncData)
|
||||
logcat(LogPriority.DEBUG) { "Got Backup Uri: $backupUri" }
|
||||
if (backupUri != null) {
|
||||
BackupRestoreJob.start(
|
||||
context,
|
||||
backupUri,
|
||||
sync = true,
|
||||
options = RestoreOptions(
|
||||
appSettings = true,
|
||||
sourceSettings = true,
|
||||
library = true,
|
||||
),
|
||||
)
|
||||
|
||||
// update the sync timestamp
|
||||
syncPreferences.lastSyncTimestamp().set(Date().time)
|
||||
} else {
|
||||
logcat(LogPriority.ERROR) { "Failed to write sync data to file" }
|
||||
}
|
||||
// update the sync timestamp
|
||||
syncPreferences.lastSyncTimestamp().set(Date().time)
|
||||
} else {
|
||||
logcat(LogPriority.ERROR) { "Failed to write sync data to file" }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@ import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets
|
||||
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
|
||||
import com.google.api.client.googleapis.auth.oauth2.GoogleTokenResponse
|
||||
import com.google.api.client.http.ByteArrayContent
|
||||
import com.google.api.client.http.InputStreamContent
|
||||
import com.google.api.client.http.javanet.NetHttpTransport
|
||||
import com.google.api.client.json.JsonFactory
|
||||
import com.google.api.client.json.jackson2.JacksonFactory
|
||||
@ -18,9 +19,12 @@ import com.google.api.services.drive.Drive
|
||||
import com.google.api.services.drive.DriveScopes
|
||||
import com.google.api.services.drive.model.File
|
||||
import eu.kanade.domain.sync.SyncPreferences
|
||||
import eu.kanade.tachiyomi.data.backup.models.Backup
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.decodeFromStream
|
||||
import kotlinx.serialization.json.encodeToStream
|
||||
import logcat.LogPriority
|
||||
import logcat.logcat
|
||||
import tachiyomi.core.common.i18n.stringResource
|
||||
@ -30,8 +34,9 @@ import tachiyomi.i18n.MR
|
||||
import tachiyomi.i18n.sy.SYMR
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.IOException
|
||||
import java.io.PipedInputStream
|
||||
import java.io.PipedOutputStream
|
||||
import java.time.Instant
|
||||
import java.util.zip.GZIPInputStream
|
||||
import java.util.zip.GZIPOutputStream
|
||||
@ -65,7 +70,43 @@ class GoogleDriveSyncService(context: Context, json: Json, syncPreferences: Sync
|
||||
|
||||
private val googleDriveService = GoogleDriveService(context)
|
||||
|
||||
override suspend fun beforeSync() {
|
||||
override suspend fun doSync(syncData: SyncData): Backup? {
|
||||
beforeSync()
|
||||
|
||||
try {
|
||||
val remoteSData = pullSyncData()
|
||||
|
||||
if (remoteSData != null ){
|
||||
// Get local unique device ID
|
||||
val localDeviceId = syncPreferences.uniqueDeviceID()
|
||||
val lastSyncDeviceId = remoteSData.deviceId
|
||||
|
||||
// Log the device IDs
|
||||
logcat(LogPriority.DEBUG, "SyncService") {
|
||||
"Local device ID: $localDeviceId, Last sync device ID: $lastSyncDeviceId"
|
||||
}
|
||||
|
||||
// check if the last sync was done by the same device if so overwrite the remote data with the local data
|
||||
return if (lastSyncDeviceId == localDeviceId) {
|
||||
pushSyncData(syncData)
|
||||
syncData.backup
|
||||
}else{
|
||||
// Merge the local and remote sync data
|
||||
val mergedSyncData = mergeSyncData(syncData, remoteSData)
|
||||
pushSyncData(mergedSyncData)
|
||||
mergedSyncData.backup
|
||||
}
|
||||
}
|
||||
|
||||
pushSyncData(syncData)
|
||||
return syncData.backup
|
||||
} catch (e: Exception) {
|
||||
logcat(LogPriority.ERROR, "SyncService") { "Error syncing: ${e.message}" }
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun beforeSync() {
|
||||
try {
|
||||
googleDriveService.refreshToken()
|
||||
val drive = googleDriveService.driveService
|
||||
@ -121,13 +162,9 @@ class GoogleDriveSyncService(context: Context, json: Json, syncPreferences: Sync
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun pullSyncData(): SyncData? {
|
||||
val drive = googleDriveService.driveService
|
||||
|
||||
if (drive == null) {
|
||||
logcat(LogPriority.DEBUG) { "Google Drive service not initialized" }
|
||||
throw Exception(context.stringResource(SYMR.strings.google_drive_not_signed_in))
|
||||
}
|
||||
private fun pullSyncData(): SyncData? {
|
||||
val drive = googleDriveService.driveService ?:
|
||||
throw Exception(context.stringResource(SYMR.strings.google_drive_not_signed_in))
|
||||
|
||||
val fileList = getAppDataFileList(drive)
|
||||
if (fileList.isEmpty()) {
|
||||
@ -138,75 +175,53 @@ class GoogleDriveSyncService(context: Context, json: Json, syncPreferences: Sync
|
||||
val gdriveFileId = fileList[0].id
|
||||
logcat(LogPriority.DEBUG) { "Google Drive File ID: $gdriveFileId" }
|
||||
|
||||
val outputStream = ByteArrayOutputStream()
|
||||
try {
|
||||
drive.files().get(gdriveFileId).executeMediaAndDownloadTo(outputStream)
|
||||
logcat(LogPriority.DEBUG) { "File downloaded successfully" }
|
||||
drive.files().get(gdriveFileId).executeMediaAsInputStream().use { inputStream ->
|
||||
GZIPInputStream(inputStream).use { gzipInputStream ->
|
||||
return Json.decodeFromStream(SyncData.serializer(), gzipInputStream)
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
logcat(LogPriority.ERROR, throwable = e) { "Error downloading file" }
|
||||
return null
|
||||
}
|
||||
|
||||
return withIOContext {
|
||||
try {
|
||||
val gzipInputStream = GZIPInputStream(outputStream.toByteArray().inputStream())
|
||||
val jsonString = gzipInputStream.bufferedReader().use { it.readText() }
|
||||
val syncData = json.decodeFromString(SyncData.serializer(), jsonString)
|
||||
this@GoogleDriveSyncService.logcat(LogPriority.DEBUG) { "JSON deserialized successfully" }
|
||||
syncData
|
||||
} catch (e: Exception) {
|
||||
this@GoogleDriveSyncService.logcat(
|
||||
LogPriority.ERROR,
|
||||
throwable = e,
|
||||
) { "Failed to convert json to sync data with kotlinx.serialization" }
|
||||
throw Exception(e.message, e)
|
||||
}
|
||||
throw Exception("Failed to download sync data: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun pushSyncData(syncData: SyncData) {
|
||||
val jsonData = json.encodeToString(syncData)
|
||||
private suspend fun pushSyncData(syncData: SyncData) {
|
||||
val drive = googleDriveService.driveService
|
||||
?: throw Exception(context.stringResource(SYMR.strings.google_drive_not_signed_in))
|
||||
|
||||
val fileList = getAppDataFileList(drive)
|
||||
val byteArrayOutputStream = ByteArrayOutputStream()
|
||||
withIOContext {
|
||||
GZIPOutputStream(byteArrayOutputStream).use { gzipOutputStream ->
|
||||
gzipOutputStream.write(jsonData.toByteArray(Charsets.UTF_8))
|
||||
}
|
||||
this@GoogleDriveSyncService.logcat(LogPriority.DEBUG) { "JSON serialized successfully" }
|
||||
}
|
||||
|
||||
val byteArrayContent = ByteArrayContent("application/octet-stream", byteArrayOutputStream.toByteArray())
|
||||
PipedOutputStream().use { pos ->
|
||||
PipedInputStream(pos).use { pis ->
|
||||
withIOContext {
|
||||
// Start a coroutine or a background thread to write JSON to the PipedOutputStream
|
||||
launch {
|
||||
GZIPOutputStream(pos).use { gzipOutputStream ->
|
||||
Json.encodeToStream(SyncData.serializer(), syncData, gzipOutputStream)
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
if (fileList.isNotEmpty()) {
|
||||
// File exists, so update it
|
||||
val fileId = fileList[0].id
|
||||
drive.files().update(fileId, null, byteArrayContent).execute()
|
||||
logcat(LogPriority.DEBUG) { "Updated existing sync data file in Google Drive with file ID: $fileId" }
|
||||
} else {
|
||||
// File doesn't exist, so create it
|
||||
val fileMetadata = File().apply {
|
||||
name = remoteFileName
|
||||
mimeType = "application/gzip"
|
||||
parents = listOf("appDataFolder")
|
||||
if (fileList.isNotEmpty()) {
|
||||
val fileId = fileList[0].id
|
||||
val mediaContent = InputStreamContent("application/gzip", pis)
|
||||
drive.files().update(fileId, null, mediaContent).execute()
|
||||
logcat(LogPriority.DEBUG) { "Updated existing sync data file in Google Drive with file ID: $fileId" }
|
||||
} else {
|
||||
val fileMetadata = File().apply {
|
||||
name = remoteFileName
|
||||
mimeType = "application/gzip"
|
||||
parents = listOf("appDataFolder")
|
||||
}
|
||||
val mediaContent = InputStreamContent("application/gzip", pis)
|
||||
val uploadedFile = drive.files().create(fileMetadata, mediaContent)
|
||||
.setFields("id")
|
||||
.execute()
|
||||
logcat(LogPriority.DEBUG) { "Created new sync data file in Google Drive with file ID: ${uploadedFile.id}" }
|
||||
}
|
||||
}
|
||||
val uploadedFile = drive.files().create(fileMetadata, byteArrayContent)
|
||||
.setFields("id")
|
||||
.execute()
|
||||
|
||||
logcat(
|
||||
LogPriority.DEBUG,
|
||||
) { "Created new sync data file in Google Drive with file ID: ${uploadedFile.id}" }
|
||||
}
|
||||
|
||||
// Data has been successfully pushed or updated, delete the lock file
|
||||
deleteLockFile(drive)
|
||||
} catch (e: Exception) {
|
||||
logcat(LogPriority.ERROR, throwable = e) { "Failed to push or update sync data" }
|
||||
throw Exception(context.stringResource(SYMR.strings.error_uploading_sync_data) + ": ${e.message}", e)
|
||||
}
|
||||
}
|
||||
|
||||
@ -393,7 +408,6 @@ class GoogleDriveService(private val context: Context) {
|
||||
}
|
||||
internal suspend fun refreshToken() = withIOContext {
|
||||
val refreshToken = syncPreferences.googleDriveRefreshToken().get()
|
||||
val accessToken = syncPreferences.googleDriveAccessToken().get()
|
||||
|
||||
val jsonFactory: JsonFactory = JacksonFactory.getDefaultInstance()
|
||||
val secrets = GoogleClientSecrets.load(
|
||||
@ -413,16 +427,12 @@ class GoogleDriveService(private val context: Context) {
|
||||
|
||||
credential.refreshToken = refreshToken
|
||||
|
||||
this@GoogleDriveService.logcat(LogPriority.DEBUG) { "Refreshing access token with: $refreshToken" }
|
||||
|
||||
try {
|
||||
credential.refreshToken()
|
||||
val newAccessToken = credential.accessToken
|
||||
// Save the new access token
|
||||
syncPreferences.googleDriveAccessToken().set(newAccessToken)
|
||||
setupGoogleDriveService(newAccessToken, credential.refreshToken)
|
||||
this@GoogleDriveService
|
||||
.logcat(LogPriority.DEBUG) { "Google Access token refreshed old: $accessToken new: $newAccessToken" }
|
||||
} catch (e: TokenResponseException) {
|
||||
if (e.details.error == "invalid_grant") {
|
||||
// The refresh token is invalid, prompt the user to sign in again
|
||||
|
@ -17,6 +17,7 @@ import logcat.logcat
|
||||
|
||||
@Serializable
|
||||
data class SyncData(
|
||||
val deviceId: String = "",
|
||||
val backup: Backup? = null,
|
||||
)
|
||||
|
||||
@ -25,38 +26,7 @@ abstract class SyncService(
|
||||
val json: Json,
|
||||
val syncPreferences: SyncPreferences,
|
||||
) {
|
||||
open suspend fun doSync(syncData: SyncData): Backup? {
|
||||
beforeSync()
|
||||
|
||||
val remoteSData = pullSyncData()
|
||||
|
||||
val finalSyncData =
|
||||
if (remoteSData == null) {
|
||||
pushSyncData(syncData)
|
||||
syncData
|
||||
} else {
|
||||
val mergedSyncData = mergeSyncData(syncData, remoteSData)
|
||||
pushSyncData(mergedSyncData)
|
||||
mergedSyncData
|
||||
}
|
||||
|
||||
return finalSyncData.backup
|
||||
}
|
||||
|
||||
/**
|
||||
* For refreshing tokens and other possible operations before connecting to the remote storage
|
||||
*/
|
||||
open suspend fun beforeSync() {}
|
||||
|
||||
/**
|
||||
* Download sync data from the remote storage
|
||||
*/
|
||||
abstract suspend fun pullSyncData(): SyncData?
|
||||
|
||||
/**
|
||||
* Upload sync data to the remote storage
|
||||
*/
|
||||
abstract suspend fun pushSyncData(syncData: SyncData)
|
||||
abstract suspend fun doSync(syncData: SyncData): Backup?;
|
||||
|
||||
/**
|
||||
* Merges the local and remote sync data into a single JSON string.
|
||||
@ -65,11 +35,17 @@ abstract class SyncService(
|
||||
* @param remoteSyncData The SData containing the remote sync data.
|
||||
* @return The JSON string containing the merged sync data.
|
||||
*/
|
||||
private fun mergeSyncData(localSyncData: SyncData, remoteSyncData: SyncData): SyncData {
|
||||
val mergedMangaList = mergeMangaLists(localSyncData.backup?.backupManga, remoteSyncData.backup?.backupManga)
|
||||
protected fun mergeSyncData(localSyncData: SyncData, remoteSyncData: SyncData): SyncData {
|
||||
val mergedCategoriesList =
|
||||
mergeCategoriesLists(localSyncData.backup?.backupCategories, remoteSyncData.backup?.backupCategories)
|
||||
|
||||
val mergedMangaList = mergeMangaLists(
|
||||
localSyncData.backup?.backupManga,
|
||||
remoteSyncData.backup?.backupManga,
|
||||
localSyncData.backup?.backupCategories ?: emptyList(),
|
||||
remoteSyncData.backup?.backupCategories ?: emptyList(),
|
||||
mergedCategoriesList)
|
||||
|
||||
val mergedSourcesList =
|
||||
mergeSourcesLists(localSyncData.backup?.backupSources, remoteSyncData.backup?.backupSources)
|
||||
val mergedPreferencesList =
|
||||
@ -101,6 +77,7 @@ abstract class SyncService(
|
||||
|
||||
// Create the merged SData object
|
||||
return SyncData(
|
||||
deviceId = syncPreferences.uniqueDeviceID(),
|
||||
backup = mergedBackup,
|
||||
)
|
||||
}
|
||||
@ -117,6 +94,9 @@ abstract class SyncService(
|
||||
private fun mergeMangaLists(
|
||||
localMangaList: List<BackupManga>?,
|
||||
remoteMangaList: List<BackupManga>?,
|
||||
localCategories: List<BackupCategory>,
|
||||
remoteCategories: List<BackupCategory>,
|
||||
mergedCategories: List<BackupCategory>,
|
||||
): List<BackupManga> {
|
||||
val logTag = "MergeMangaLists"
|
||||
|
||||
@ -135,6 +115,18 @@ abstract class SyncService(
|
||||
val localMangaMap = localMangaListSafe.associateBy { mangaCompositeKey(it) }
|
||||
val remoteMangaMap = remoteMangaListSafe.associateBy { mangaCompositeKey(it) }
|
||||
|
||||
val localCategoriesMapByOrder = localCategories.associateBy { it.order }
|
||||
val remoteCategoriesMapByOrder = remoteCategories.associateBy { it.order }
|
||||
val mergedCategoriesMapByName = mergedCategories.associateBy { it.name }
|
||||
|
||||
fun updateCategories(theManga: BackupManga, theMap: Map<Long, BackupCategory>): BackupManga {
|
||||
return theManga.copy(categories = theManga.categories.mapNotNull {
|
||||
theMap[it]?.let { category ->
|
||||
mergedCategoriesMapByName[category.name]?.order
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
logcat(LogPriority.DEBUG, logTag) {
|
||||
"Starting merge. Local list size: ${localMangaListSafe.size}, Remote list size: ${remoteMangaListSafe.size}"
|
||||
}
|
||||
@ -145,20 +137,26 @@ abstract class SyncService(
|
||||
|
||||
// New version comparison logic
|
||||
when {
|
||||
local != null && remote == null -> local
|
||||
local == null && remote != null -> remote
|
||||
local != null && remote == null -> updateCategories(local, localCategoriesMapByOrder)
|
||||
local == null && remote != null -> updateCategories(remote, remoteCategoriesMapByOrder)
|
||||
local != null && remote != null -> {
|
||||
// Compare versions to decide which manga to keep
|
||||
if (local.version >= remote.version) {
|
||||
logcat(LogPriority.DEBUG, logTag) {
|
||||
"Keeping local version of ${local.title} with merged chapters."
|
||||
}
|
||||
local.copy(chapters = mergeChapters(local.chapters, remote.chapters))
|
||||
updateCategories(
|
||||
local.copy(chapters = mergeChapters(local.chapters, remote.chapters)),
|
||||
localCategoriesMapByOrder
|
||||
)
|
||||
} else {
|
||||
logcat(LogPriority.DEBUG, logTag) {
|
||||
"Keeping remote version of ${remote.title} with merged chapters."
|
||||
}
|
||||
remote.copy(chapters = mergeChapters(local.chapters, remote.chapters))
|
||||
updateCategories(
|
||||
remote.copy(chapters = mergeChapters(local.chapters, remote.chapters)),
|
||||
remoteCategoriesMapByOrder
|
||||
)
|
||||
}
|
||||
}
|
||||
else -> null // No manga found for key
|
||||
|
@ -2,23 +2,26 @@ package eu.kanade.tachiyomi.data.sync.service
|
||||
|
||||
import android.content.Context
|
||||
import eu.kanade.domain.sync.SyncPreferences
|
||||
import eu.kanade.tachiyomi.data.backup.models.Backup
|
||||
import eu.kanade.tachiyomi.data.backup.models.BackupSerializer
|
||||
import eu.kanade.tachiyomi.data.sync.SyncNotifier
|
||||
import eu.kanade.tachiyomi.network.GET
|
||||
import eu.kanade.tachiyomi.network.PATCH
|
||||
import eu.kanade.tachiyomi.network.POST
|
||||
import eu.kanade.tachiyomi.network.PUT
|
||||
import eu.kanade.tachiyomi.network.await
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.serialization.SerialName
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.SerializationException
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.protobuf.ProtoBuf
|
||||
import logcat.LogPriority
|
||||
import logcat.logcat
|
||||
import okhttp3.Headers
|
||||
import okhttp3.MediaType.Companion.toMediaTypeOrNull
|
||||
import okhttp3.MediaType.Companion.toMediaType
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.RequestBody.Companion.gzip
|
||||
import okhttp3.RequestBody.Companion.toRequestBody
|
||||
import tachiyomi.core.common.util.system.logcat
|
||||
import org.apache.http.HttpStatus
|
||||
import tachiyomi.core.common.i18n.stringResource
|
||||
import tachiyomi.i18n.MR
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class SyncYomiSyncService(
|
||||
@ -26,140 +29,117 @@ class SyncYomiSyncService(
|
||||
json: Json,
|
||||
syncPreferences: SyncPreferences,
|
||||
private val notifier: SyncNotifier,
|
||||
|
||||
private val protoBuf: ProtoBuf = Injekt.get(),
|
||||
) : SyncService(context, json, syncPreferences) {
|
||||
|
||||
@Serializable
|
||||
enum class SyncStatus {
|
||||
@SerialName("pending")
|
||||
Pending,
|
||||
private class SyncYomiException(message: String?) : Exception(message)
|
||||
|
||||
@SerialName("syncing")
|
||||
Syncing,
|
||||
override suspend fun doSync(syncData: SyncData): Backup? {
|
||||
try {
|
||||
val (remoteData, etag) = pullSyncData()
|
||||
|
||||
@SerialName("success")
|
||||
Success,
|
||||
}
|
||||
|
||||
@Serializable
|
||||
data class LockFile(
|
||||
@SerialName("id")
|
||||
val id: Int?,
|
||||
@SerialName("user_api_key")
|
||||
val userApiKey: String?,
|
||||
@SerialName("acquired_by")
|
||||
val acquiredBy: String?,
|
||||
@SerialName("last_synced")
|
||||
val lastSynced: String?,
|
||||
@SerialName("status")
|
||||
val status: SyncStatus,
|
||||
@SerialName("acquired_at")
|
||||
val acquiredAt: String?,
|
||||
@SerialName("expires_at")
|
||||
val expiresAt: String?,
|
||||
)
|
||||
|
||||
@Serializable
|
||||
data class LockfileCreateRequest(
|
||||
@SerialName("acquired_by")
|
||||
val acquiredBy: String,
|
||||
)
|
||||
|
||||
@Serializable
|
||||
data class LockfilePatchRequest(
|
||||
@SerialName("user_api_key")
|
||||
val userApiKey: String,
|
||||
@SerialName("acquired_by")
|
||||
val acquiredBy: String,
|
||||
)
|
||||
|
||||
override suspend fun beforeSync() {
|
||||
val host = syncPreferences.clientHost().get()
|
||||
val apiKey = syncPreferences.clientAPIKey().get()
|
||||
val lockFileApi = "$host/api/sync/lock"
|
||||
val deviceId = syncPreferences.uniqueDeviceID()
|
||||
val client = OkHttpClient()
|
||||
val headers = Headers.Builder().add("X-API-Token", apiKey).build()
|
||||
val json = Json { ignoreUnknownKeys = true }
|
||||
|
||||
val createLockfileRequest = LockfileCreateRequest(deviceId)
|
||||
val createLockfileJson = json.encodeToString(createLockfileRequest)
|
||||
|
||||
val patchRequest = LockfilePatchRequest(apiKey, deviceId)
|
||||
val patchJson = json.encodeToString(patchRequest)
|
||||
|
||||
val lockFileRequest = GET(
|
||||
url = lockFileApi,
|
||||
headers = headers,
|
||||
)
|
||||
|
||||
val lockFileCreate = POST(
|
||||
url = lockFileApi,
|
||||
headers = headers,
|
||||
body = createLockfileJson.toRequestBody("application/json; charset=utf-8".toMediaTypeOrNull()),
|
||||
)
|
||||
|
||||
val lockFileUpdate = PATCH(
|
||||
url = lockFileApi,
|
||||
headers = headers,
|
||||
body = patchJson.toRequestBody("application/json; charset=utf-8".toMediaTypeOrNull()),
|
||||
)
|
||||
|
||||
// create lock file first
|
||||
client.newCall(lockFileCreate).await()
|
||||
// update lock file acquired_by
|
||||
client.newCall(lockFileUpdate).await()
|
||||
|
||||
var backoff = 2000L // Start with 2 seconds
|
||||
val maxBackoff = 32000L // Maximum backoff time e.g., 32 seconds
|
||||
var lockFile: LockFile
|
||||
do {
|
||||
val response = client.newCall(lockFileRequest).await()
|
||||
val responseBody = response.body.string()
|
||||
lockFile = json.decodeFromString<LockFile>(responseBody)
|
||||
logcat(LogPriority.DEBUG) { "SyncYomi lock file status: ${lockFile.status}" }
|
||||
|
||||
if (lockFile.status != SyncStatus.Success) {
|
||||
logcat(LogPriority.DEBUG) { "Lock file not ready, retrying in $backoff ms..." }
|
||||
delay(backoff)
|
||||
backoff = (backoff * 2).coerceAtMost(maxBackoff)
|
||||
val finalSyncData = if (remoteData != null){
|
||||
assert(etag.isNotEmpty()) { "ETag should never be empty if remote data is not null" }
|
||||
logcat(LogPriority.DEBUG, "SyncService") {
|
||||
"Try update remote data with ETag($etag)"
|
||||
}
|
||||
mergeSyncData(syncData, remoteData)
|
||||
} else {
|
||||
// init or overwrite remote data
|
||||
logcat(LogPriority.DEBUG) {
|
||||
"Try overwrite remote data with ETag($etag)"
|
||||
}
|
||||
syncData
|
||||
}
|
||||
} while (lockFile.status != SyncStatus.Success)
|
||||
|
||||
// update lock file acquired_by
|
||||
client.newCall(lockFileUpdate).await()
|
||||
pushSyncData(finalSyncData, etag)
|
||||
return finalSyncData.backup
|
||||
|
||||
} catch (e: Exception) {
|
||||
logcat(LogPriority.ERROR) { "Error syncing: ${e.message}" }
|
||||
notifier.showSyncError(e.message)
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun pullSyncData(): SyncData? {
|
||||
private suspend fun pullSyncData(): Pair<SyncData?, String> {
|
||||
val host = syncPreferences.clientHost().get()
|
||||
val apiKey = syncPreferences.clientAPIKey().get()
|
||||
val downloadUrl = "$host/api/sync/download"
|
||||
val downloadUrl = "$host/api/sync/content"
|
||||
|
||||
val client = OkHttpClient()
|
||||
val headers = Headers.Builder().add("X-API-Token", apiKey).build()
|
||||
val headersBuilder = Headers.Builder().add("X-API-Token", apiKey)
|
||||
val lastETag = syncPreferences.lastSyncEtag().get()
|
||||
if (lastETag != "") {
|
||||
headersBuilder.add("If-None-Match", lastETag)
|
||||
}
|
||||
val headers = headersBuilder.build()
|
||||
|
||||
val downloadRequest = GET(
|
||||
url = downloadUrl,
|
||||
headers = headers,
|
||||
)
|
||||
|
||||
val client = OkHttpClient()
|
||||
val response = client.newCall(downloadRequest).await()
|
||||
val responseBody = response.body.string()
|
||||
|
||||
return if (response.isSuccessful) {
|
||||
json.decodeFromString<SyncData>(responseBody)
|
||||
if (response.code == HttpStatus.SC_NOT_MODIFIED) {
|
||||
// not modified
|
||||
assert(lastETag.isNotEmpty())
|
||||
logcat(LogPriority.INFO) {
|
||||
"Remote server not modified"
|
||||
}
|
||||
return Pair(null, lastETag)
|
||||
} else if (response.code == HttpStatus.SC_NOT_FOUND) {
|
||||
// maybe got deleted from remote
|
||||
return Pair(null, "")
|
||||
}
|
||||
|
||||
if (response.isSuccessful) {
|
||||
val newETag = response.headers["ETag"]
|
||||
.takeIf { it?.isNotEmpty() == true } ?: throw SyncYomiException("Missing ETag")
|
||||
|
||||
val byteArray = response.body.byteStream().use {
|
||||
return@use it.readBytes()
|
||||
}
|
||||
|
||||
return try {
|
||||
val backup = protoBuf.decodeFromByteArray(BackupSerializer, byteArray)
|
||||
return Pair(SyncData(backup = backup), newETag)
|
||||
} catch (_: SerializationException) {
|
||||
logcat(LogPriority.INFO) {
|
||||
"Bad content responsed from server"
|
||||
}
|
||||
// the body is invalid
|
||||
// return default value so we can overwrite it
|
||||
Pair(null, "")
|
||||
}
|
||||
|
||||
} else {
|
||||
val responseBody = response.body.string()
|
||||
notifier.showSyncError("Failed to download sync data: $responseBody")
|
||||
responseBody.let { logcat(LogPriority.ERROR) { "SyncError:$it" } }
|
||||
null
|
||||
logcat(LogPriority.ERROR) { "SyncError: $responseBody" }
|
||||
throw SyncYomiException("Failed to download sync data: $responseBody")
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun pushSyncData(syncData: SyncData) {
|
||||
/**
|
||||
* Return true if update success
|
||||
*/
|
||||
private suspend fun pushSyncData(syncData: SyncData, eTag: String) {
|
||||
val backup = syncData.backup ?: return
|
||||
|
||||
val host = syncPreferences.clientHost().get()
|
||||
val apiKey = syncPreferences.clientAPIKey().get()
|
||||
val uploadUrl = "$host/api/sync/upload"
|
||||
val uploadUrl = "$host/api/sync/content"
|
||||
val timeout = 30L
|
||||
|
||||
val headersBuilder = Headers.Builder().add("X-API-Token", apiKey)
|
||||
if (eTag.isNotEmpty()) {
|
||||
headersBuilder.add("If-Match", eTag)
|
||||
}
|
||||
val headers = headersBuilder.build()
|
||||
|
||||
// Set timeout to 30 seconds
|
||||
val client = OkHttpClient.Builder()
|
||||
.connectTimeout(timeout, TimeUnit.SECONDS)
|
||||
@ -167,32 +147,34 @@ class SyncYomiSyncService(
|
||||
.writeTimeout(timeout, TimeUnit.SECONDS)
|
||||
.build()
|
||||
|
||||
val headers = Headers.Builder().add(
|
||||
"Content-Type",
|
||||
"application/gzip",
|
||||
).add("Content-Encoding", "gzip").add("X-API-Token", apiKey).build()
|
||||
val byteArray = protoBuf.encodeToByteArray(BackupSerializer, backup)
|
||||
if (byteArray.isEmpty()) {
|
||||
throw IllegalStateException(context.stringResource(MR.strings.empty_backup_error))
|
||||
}
|
||||
val body = byteArray.toRequestBody("application/octet-stream".toMediaType())
|
||||
|
||||
val mediaType = "application/gzip".toMediaTypeOrNull()
|
||||
|
||||
val jsonData = json.encodeToString(syncData)
|
||||
val body = jsonData.toRequestBody(mediaType).gzip()
|
||||
|
||||
val uploadRequest = POST(
|
||||
val uploadRequest = PUT(
|
||||
url = uploadUrl,
|
||||
headers = headers,
|
||||
body = body,
|
||||
)
|
||||
|
||||
client.newCall(uploadRequest).await().use {
|
||||
if (it.isSuccessful) {
|
||||
logcat(
|
||||
LogPriority.DEBUG,
|
||||
) { "SyncYomi sync completed!" }
|
||||
} else {
|
||||
val responseBody = it.body.string()
|
||||
notifier.showSyncError("Failed to upload sync data: $responseBody")
|
||||
responseBody.let { logcat(LogPriority.ERROR) { "SyncError:$it" } }
|
||||
}
|
||||
val response = client.newCall(uploadRequest).await()
|
||||
|
||||
if (response.isSuccessful) {
|
||||
val newETag = response.headers["ETag"]
|
||||
.takeIf { it?.isNotEmpty() == true } ?: throw SyncYomiException("Missing ETag")
|
||||
syncPreferences.lastSyncEtag().set(newETag)
|
||||
logcat(LogPriority.DEBUG) { "SyncYomi sync completed" }
|
||||
|
||||
} else if (response.code == HttpStatus.SC_PRECONDITION_FAILED) {
|
||||
// other clients updated remote data, will try next time
|
||||
logcat(LogPriority.DEBUG) { "SyncYomi sync failed with 412" }
|
||||
|
||||
} else {
|
||||
val responseBody = response.body.string()
|
||||
notifier.showSyncError("Failed to upload sync data: $responseBody")
|
||||
logcat(LogPriority.ERROR) { "SyncError: $responseBody" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user