mirror of
https://github.com/bbara04/Purefin.git
synced 2026-03-31 17:10:08 +02:00
refactor: Fix repository switch between offline and online. Now it switches seamlessly
This commit is contained in:
@@ -1,19 +1,13 @@
|
||||
package hu.bbara.purefin.core.data
|
||||
|
||||
import hu.bbara.purefin.core.model.Episode
|
||||
import hu.bbara.purefin.core.model.Library
|
||||
import hu.bbara.purefin.core.model.Media
|
||||
import hu.bbara.purefin.core.model.MediaRepositoryState
|
||||
import hu.bbara.purefin.core.model.Movie
|
||||
import hu.bbara.purefin.core.model.Series
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import java.util.UUID
|
||||
|
||||
interface AppContentRepository : MediaRepository {
|
||||
|
||||
val libraries: StateFlow<List<Library>>
|
||||
val state: StateFlow<MediaRepositoryState>
|
||||
val continueWatching: StateFlow<List<Media>>
|
||||
val nextUp: StateFlow<List<Media>>
|
||||
val latestLibraryContent: StateFlow<Map<UUID, List<Media>>>
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package hu.bbara.purefin.core.data
|
||||
|
||||
import android.util.Log
|
||||
import hu.bbara.purefin.core.data.session.UserSessionRepository
|
||||
import hu.bbara.purefin.core.model.Episode
|
||||
import hu.bbara.purefin.core.model.Movie
|
||||
@@ -11,7 +12,9 @@ import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.flatMapLatest
|
||||
import kotlinx.coroutines.flow.flowOf
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import java.util.UUID
|
||||
import javax.inject.Inject
|
||||
@@ -28,14 +31,24 @@ import javax.inject.Singleton
|
||||
class CompositeMediaRepository @Inject constructor(
|
||||
private val offlineRepository: OfflineMediaRepository,
|
||||
private val onlineRepository: InMemoryMediaRepository,
|
||||
private val userSessionRepository: UserSessionRepository,
|
||||
private val networkMonitor: NetworkMonitor,
|
||||
) : MediaRepository {
|
||||
|
||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
||||
|
||||
override val ready: StateFlow<Boolean> = combine(
|
||||
offlineRepository.ready,
|
||||
onlineRepository.ready
|
||||
) { offlineReady, onlineReady ->
|
||||
offlineReady && onlineReady
|
||||
}.stateIn(scope, SharingStarted.Eagerly, false)
|
||||
|
||||
private val isOnline: StateFlow<Boolean> = networkMonitor.isOnline
|
||||
.stateIn(scope, SharingStarted.Eagerly, false)
|
||||
|
||||
private val activeRepository: Flow<MediaRepository> =
|
||||
userSessionRepository.isOfflineMode.flatMapLatest { offline ->
|
||||
kotlinx.coroutines.flow.flowOf(if (offline) offlineRepository else onlineRepository)
|
||||
networkMonitor.isOnline.flatMapLatest { online ->
|
||||
flowOf(if (online) onlineRepository else offlineRepository)
|
||||
}
|
||||
|
||||
override val movies: StateFlow<Map<UUID, Movie>> = activeRepository
|
||||
@@ -50,13 +63,42 @@ class CompositeMediaRepository @Inject constructor(
|
||||
.flatMapLatest { it.episodes }
|
||||
.stateIn(scope, SharingStarted.Eagerly, emptyMap())
|
||||
|
||||
override fun upsertMovies(movies: List<Movie>) {
|
||||
if (!isOnline.value) {
|
||||
Log.e("CompositeMediaRepository", "upsertMovies called in offline mode")
|
||||
return
|
||||
}
|
||||
onlineRepository.upsertMovies(movies)
|
||||
}
|
||||
|
||||
override fun upsertSeries(series: List<Series>) {
|
||||
if (!isOnline.value) {
|
||||
Log.e("CompositeMediaRepository", "upsertSeries called in offline mode")
|
||||
return
|
||||
}
|
||||
onlineRepository.upsertSeries(series)
|
||||
}
|
||||
|
||||
override fun upsertEpisodes(episodes: List<Episode>) {
|
||||
if (!isOnline.value) {
|
||||
Log.e("CompositeMediaRepository", "upsertEpisodes called in offline mode")
|
||||
return
|
||||
}
|
||||
onlineRepository.upsertEpisodes(episodes)
|
||||
}
|
||||
|
||||
override fun observeSeriesWithContent(seriesId: UUID): Flow<Series?> {
|
||||
return activeRepository.flatMapLatest { it.observeSeriesWithContent(seriesId) }
|
||||
}
|
||||
|
||||
override suspend fun updateWatchProgress(mediaId: UUID, positionMs: Long, durationMs: Long) {
|
||||
val isOffline = userSessionRepository.isOfflineMode.stateIn(scope).value
|
||||
val repo = if (isOffline) offlineRepository else onlineRepository
|
||||
val isOnline = networkMonitor.isOnline.stateIn(scope).value
|
||||
val repo = if (isOnline) onlineRepository else offlineRepository
|
||||
repo.updateWatchProgress(mediaId, positionMs, durationMs)
|
||||
}
|
||||
|
||||
override fun setReady() {
|
||||
onlineRepository.setReady()
|
||||
offlineRepository.setReady()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,20 +9,22 @@ import hu.bbara.purefin.core.data.session.UserSessionRepository
|
||||
import hu.bbara.purefin.core.model.Episode
|
||||
import hu.bbara.purefin.core.model.Library
|
||||
import hu.bbara.purefin.core.model.Media
|
||||
import hu.bbara.purefin.core.model.MediaRepositoryState
|
||||
import hu.bbara.purefin.core.model.Movie
|
||||
import hu.bbara.purefin.core.model.Season
|
||||
import hu.bbara.purefin.core.model.Series
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.update
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import org.jellyfin.sdk.model.api.BaseItemDto
|
||||
import org.jellyfin.sdk.model.api.BaseItemKind
|
||||
import org.jellyfin.sdk.model.api.CollectionType
|
||||
@@ -40,49 +42,50 @@ class InMemoryAppContentRepository @Inject constructor(
|
||||
val userSessionRepository: UserSessionRepository,
|
||||
val jellyfinApiClient: JellyfinApiClient,
|
||||
private val homeCacheDataStore: DataStore<HomeCache>,
|
||||
private val mediaRepository: InMemoryMediaRepository,
|
||||
private val mediaRepository: CompositeMediaRepository,
|
||||
private val networkMonitor: NetworkMonitor,
|
||||
) : AppContentRepository, MediaRepository by mediaRepository {
|
||||
|
||||
private val readyMutex = Mutex()
|
||||
) : AppContentRepository {
|
||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
||||
private var initialLoadTimestamp = 0L
|
||||
private var loadJob: Job? = null
|
||||
|
||||
private val _state: MutableStateFlow<MediaRepositoryState> = MutableStateFlow(MediaRepositoryState.Loading)
|
||||
override val state: StateFlow<MediaRepositoryState> = _state.asStateFlow()
|
||||
private val contentRepositoryReady : MutableStateFlow<Boolean> = MutableStateFlow(false)
|
||||
override val ready: StateFlow<Boolean> = combine(contentRepositoryReady, mediaRepository.ready) {
|
||||
contentRepositoryReady, mediaRepositoryReady ->
|
||||
contentRepositoryReady && mediaRepositoryReady
|
||||
}.stateIn(scope, SharingStarted.Eagerly, false)
|
||||
|
||||
private val _libraries: MutableStateFlow<List<Library>> = MutableStateFlow(emptyList())
|
||||
|
||||
override val libraries: StateFlow<List<Library>> = _libraries.asStateFlow()
|
||||
|
||||
private val _continueWatching: MutableStateFlow<List<Media>> = MutableStateFlow(emptyList())
|
||||
|
||||
override val continueWatching: StateFlow<List<Media>> = _continueWatching.asStateFlow()
|
||||
|
||||
private val _nextUp: MutableStateFlow<List<Media>> = MutableStateFlow(emptyList())
|
||||
override val nextUp: StateFlow<List<Media>> = _nextUp.asStateFlow()
|
||||
|
||||
override val nextUp: StateFlow<List<Media>> = _nextUp.asStateFlow()
|
||||
private val _latestLibraryContent: MutableStateFlow<Map<UUID, List<Media>>> = MutableStateFlow(emptyMap())
|
||||
override val latestLibraryContent: StateFlow<Map<UUID, List<Media>>> = _latestLibraryContent.asStateFlow()
|
||||
|
||||
override val movies: StateFlow<Map<UUID, Movie>> = mediaRepository.movies
|
||||
override val series: StateFlow<Map<UUID, Series>> = mediaRepository.series
|
||||
override val episodes: StateFlow<Map<UUID, Episode>> = mediaRepository.episodes
|
||||
|
||||
override fun observeSeriesWithContent(seriesId: UUID): Flow<Series?> {
|
||||
return mediaRepository.observeSeriesWithContent(seriesId)
|
||||
}
|
||||
|
||||
override suspend fun updateWatchProgress(
|
||||
mediaId: UUID,
|
||||
positionMs: Long,
|
||||
durationMs: Long
|
||||
) {
|
||||
mediaRepository.updateWatchProgress(mediaId, positionMs, durationMs)
|
||||
}
|
||||
|
||||
init {
|
||||
scope.launch {
|
||||
loadFromCache()
|
||||
networkMonitor.isOnline.collect { isOnline ->
|
||||
userSessionRepository.setOfflineMode(!isOnline)
|
||||
if (isOnline) {
|
||||
if (mediaRepository.ready.isCompleted) {
|
||||
// Reset so ensureReady() performs a fresh network load
|
||||
mediaRepository.reset()
|
||||
_state.value = MediaRepositoryState.Loading
|
||||
}
|
||||
runCatching { ensureReady() }
|
||||
} else {
|
||||
// Going offline – complete ready with cached data so waiters don't hang
|
||||
if (!mediaRepository.ready.isCompleted) {
|
||||
_state.value = MediaRepositoryState.Ready
|
||||
mediaRepository.signalReady()
|
||||
}
|
||||
}
|
||||
}
|
||||
ensureReady()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,48 +136,25 @@ class InMemoryAppContentRepository @Inject constructor(
|
||||
}
|
||||
|
||||
override suspend fun ensureReady() {
|
||||
val isOffline = userSessionRepository.isOfflineMode.first()
|
||||
if (isOffline) {
|
||||
// Offline mode: use cached data without network calls
|
||||
val ready = mediaRepository.ready
|
||||
if (!ready.isCompleted) {
|
||||
_state.value = MediaRepositoryState.Ready
|
||||
mediaRepository.signalReady()
|
||||
}
|
||||
mediaRepository.ready.await()
|
||||
// check for combined ready state
|
||||
if (ready.value) {
|
||||
return
|
||||
}
|
||||
|
||||
val ready = mediaRepository.ready
|
||||
if (ready.isCompleted) {
|
||||
ready.await()
|
||||
if (loadJob?.isActive == true) {
|
||||
return
|
||||
}
|
||||
|
||||
// Only the first caller runs the loading logic; others wait on the deferred.
|
||||
if (readyMutex.tryLock()) {
|
||||
try {
|
||||
if (mediaRepository.ready.isCompleted) {
|
||||
mediaRepository.ready.await()
|
||||
if (!contentRepositoryReady.value) {
|
||||
return
|
||||
}
|
||||
loadLibraries()
|
||||
contentRepositoryReady.value = true
|
||||
loadJob?.cancel()
|
||||
loadJob = scope.launch {
|
||||
loadContinueWatching()
|
||||
loadNextUp()
|
||||
loadLatestLibraryContent()
|
||||
loadLibraries()
|
||||
mediaRepository.setReady()
|
||||
persistHomeCache()
|
||||
_state.value = MediaRepositoryState.Ready
|
||||
initialLoadTimestamp = System.currentTimeMillis()
|
||||
mediaRepository.signalReady()
|
||||
} catch (t: Throwable) {
|
||||
_state.value = MediaRepositoryState.Error(t)
|
||||
mediaRepository.signalError(t)
|
||||
throw t
|
||||
} finally {
|
||||
readyMutex.unlock()
|
||||
}
|
||||
} else {
|
||||
mediaRepository.ready.await()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,10 +172,10 @@ class InMemoryAppContentRepository @Inject constructor(
|
||||
_libraries.value = filledLibraries
|
||||
|
||||
val movies = filledLibraries.filter { it.type == CollectionType.MOVIES }.flatMap { it.movies.orEmpty() }
|
||||
mediaRepository._movies.update { current -> current + movies.associateBy { it.id } }
|
||||
mediaRepository.upsertMovies(movies)
|
||||
|
||||
val series = filledLibraries.filter { it.type == CollectionType.TVSHOWS }.flatMap { it.series.orEmpty() }
|
||||
mediaRepository._series.update { current -> current + series.associateBy { it.id } }
|
||||
mediaRepository.upsertSeries(series)
|
||||
}
|
||||
|
||||
suspend fun loadLibrary(library: Library): Library {
|
||||
@@ -217,7 +197,7 @@ class InMemoryAppContentRepository @Inject constructor(
|
||||
val movieItem = jellyfinApiClient.getItemInfo(movieId)
|
||||
?: throw RuntimeException("Movie not found")
|
||||
val updatedMovie = movieItem.toMovie(serverUrl(), movieItem.parentId!!)
|
||||
mediaRepository._movies.update { it + (updatedMovie.id to updatedMovie) }
|
||||
mediaRepository.upsertMovies(listOf(updatedMovie))
|
||||
return updatedMovie
|
||||
}
|
||||
|
||||
@@ -225,7 +205,7 @@ class InMemoryAppContentRepository @Inject constructor(
|
||||
val seriesItem = jellyfinApiClient.getItemInfo(seriesId)
|
||||
?: throw RuntimeException("Series not found")
|
||||
val updatedSeries = seriesItem.toSeries(serverUrl(), seriesItem.parentId!!)
|
||||
mediaRepository._series.update { it + (updatedSeries.id to updatedSeries) }
|
||||
mediaRepository.upsertSeries(listOf(updatedSeries))
|
||||
return updatedSeries
|
||||
}
|
||||
|
||||
@@ -248,7 +228,7 @@ class InMemoryAppContentRepository @Inject constructor(
|
||||
when (item.type) {
|
||||
BaseItemKind.EPISODE -> {
|
||||
val episode = item.toEpisode(serverUrl())
|
||||
mediaRepository._episodes.update { it + (episode.id to episode) }
|
||||
mediaRepository.upsertEpisodes(listOf(episode))
|
||||
}
|
||||
else -> { /* Do nothing */ }
|
||||
}
|
||||
@@ -268,7 +248,7 @@ class InMemoryAppContentRepository @Inject constructor(
|
||||
// Load episodes
|
||||
nextUpItems.forEach { item ->
|
||||
val episode = item.toEpisode(serverUrl())
|
||||
mediaRepository._episodes.update { it + (episode.id to episode) }
|
||||
mediaRepository.upsertEpisodes(listOf(episode))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -312,25 +292,20 @@ class InMemoryAppContentRepository @Inject constructor(
|
||||
//TODO Load seasons and episodes, other types are already loaded at this point.
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val REFRESH_MIN_INTERVAL_MS = 30_000L
|
||||
}
|
||||
|
||||
override suspend fun refreshHomeData() {
|
||||
val isOffline = userSessionRepository.isOfflineMode.first()
|
||||
if (isOffline) return
|
||||
|
||||
mediaRepository.ready.await()
|
||||
// Skip refresh if the initial load (or last refresh) just happened
|
||||
val elapsed = System.currentTimeMillis() - initialLoadTimestamp
|
||||
if (elapsed < REFRESH_MIN_INTERVAL_MS) return
|
||||
val isOnline = networkMonitor.isOnline.first()
|
||||
if (!isOnline) return
|
||||
|
||||
if(loadJob?.isActive == true) {
|
||||
return
|
||||
}
|
||||
loadJob = scope.launch {
|
||||
loadLibraries()
|
||||
loadContinueWatching()
|
||||
loadNextUp()
|
||||
loadLatestLibraryContent()
|
||||
persistHomeCache()
|
||||
initialLoadTimestamp = System.currentTimeMillis()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun serverUrl(): String {
|
||||
|
||||
@@ -7,7 +7,6 @@ import hu.bbara.purefin.core.model.Episode
|
||||
import hu.bbara.purefin.core.model.Movie
|
||||
import hu.bbara.purefin.core.model.Season
|
||||
import hu.bbara.purefin.core.model.Series
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
@@ -36,11 +35,8 @@ class InMemoryMediaRepository @Inject constructor(
|
||||
) : MediaRepository {
|
||||
|
||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
||||
@Volatile internal var ready = CompletableDeferred<Unit>()
|
||||
|
||||
internal fun reset() {
|
||||
ready = CompletableDeferred()
|
||||
}
|
||||
override val ready: MutableStateFlow<Boolean> = MutableStateFlow(false)
|
||||
|
||||
internal val _movies: MutableStateFlow<Map<UUID, Movie>> = MutableStateFlow(emptyMap())
|
||||
override val movies: StateFlow<Map<UUID, Movie>> = _movies.asStateFlow()
|
||||
@@ -50,27 +46,29 @@ class InMemoryMediaRepository @Inject constructor(
|
||||
|
||||
internal val _episodes: MutableStateFlow<Map<UUID, Episode>> = MutableStateFlow(emptyMap())
|
||||
override val episodes: StateFlow<Map<UUID, Episode>> = _episodes.asStateFlow()
|
||||
|
||||
internal fun signalReady() {
|
||||
ready.complete(Unit)
|
||||
override fun upsertMovies(movies: List<Movie>) {
|
||||
_movies.update { current -> current + movies.associateBy { it.id } }
|
||||
}
|
||||
|
||||
internal fun signalError(t: Throwable) {
|
||||
ready.completeExceptionally(t)
|
||||
override fun upsertSeries(series: List<Series>) {
|
||||
_series.update { current -> current + series.associateBy { it.id } }
|
||||
}
|
||||
|
||||
private suspend fun awaitReady() {
|
||||
ready.await()
|
||||
override fun upsertEpisodes(episodes: List<Episode>) {
|
||||
_episodes.update { current -> current + episodes.associateBy { it.id } }
|
||||
}
|
||||
|
||||
override fun observeSeriesWithContent(seriesId: UUID): Flow<Series?> {
|
||||
scope.launch {
|
||||
awaitReady()
|
||||
ensureSeriesContentLoaded(seriesId)
|
||||
}
|
||||
return _series.map { it[seriesId] }
|
||||
}
|
||||
|
||||
override fun setReady() {
|
||||
ready.value = true
|
||||
}
|
||||
|
||||
private suspend fun ensureSeriesContentLoaded(seriesId: UUID) {
|
||||
_series.value[seriesId]?.takeIf { it.seasons.isNotEmpty() }?.let { return }
|
||||
|
||||
|
||||
@@ -8,9 +8,18 @@ import kotlinx.coroutines.flow.StateFlow
|
||||
import java.util.UUID
|
||||
|
||||
interface MediaRepository {
|
||||
val ready: StateFlow<Boolean>
|
||||
val movies: StateFlow<Map<UUID, Movie>>
|
||||
val series: StateFlow<Map<UUID, Series>>
|
||||
val episodes: StateFlow<Map<UUID, Episode>>
|
||||
fun upsertMovies(movies: List<Movie>) {
|
||||
}
|
||||
fun upsertSeries(series: List<Series>) {
|
||||
}
|
||||
fun upsertEpisodes(episodes: List<Episode>) {
|
||||
}
|
||||
fun observeSeriesWithContent(seriesId: UUID): Flow<Series?>
|
||||
fun setReady() {
|
||||
}
|
||||
suspend fun updateWatchProgress(mediaId: UUID, positionMs: Long, durationMs: Long)
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
@@ -21,8 +22,10 @@ import javax.inject.Singleton
|
||||
*/
|
||||
@Singleton
|
||||
class OfflineMediaRepository @Inject constructor(
|
||||
private val localDataSource: OfflineRoomMediaLocalDataSource
|
||||
private val localDataSource: OfflineRoomMediaLocalDataSource,
|
||||
) : MediaRepository {
|
||||
override val ready: StateFlow<Boolean> = MutableStateFlow(false)
|
||||
|
||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
||||
|
||||
override val movies: StateFlow<Map<UUID, Movie>> = localDataSource.moviesFlow
|
||||
@@ -34,6 +37,18 @@ class OfflineMediaRepository @Inject constructor(
|
||||
override val episodes: StateFlow<Map<UUID, Episode>> = localDataSource.episodesFlow
|
||||
.stateIn(scope, SharingStarted.Eagerly, emptyMap())
|
||||
|
||||
override fun upsertMovies(movies: List<Movie>) {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override fun upsertSeries(series: List<Series>) {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override fun upsertEpisodes(episodes: List<Episode>) {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override fun observeSeriesWithContent(seriesId: UUID): Flow<Series?> {
|
||||
return localDataSource.observeSeriesWithContent(seriesId)
|
||||
}
|
||||
@@ -46,4 +61,8 @@ class OfflineMediaRepository @Inject constructor(
|
||||
localDataSource.updateWatchProgress(mediaId, progressPercent, watched)
|
||||
}
|
||||
|
||||
override fun setReady() {
|
||||
// OfflineMediaRepository works from the database. So it is ready immediately.
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user