diff --git a/core/data/src/main/java/hu/bbara/purefin/core/data/AppContentRepository.kt b/core/data/src/main/java/hu/bbara/purefin/core/data/AppContentRepository.kt index 7113930..2065dba 100644 --- a/core/data/src/main/java/hu/bbara/purefin/core/data/AppContentRepository.kt +++ b/core/data/src/main/java/hu/bbara/purefin/core/data/AppContentRepository.kt @@ -1,19 +1,13 @@ package hu.bbara.purefin.core.data -import hu.bbara.purefin.core.model.Episode import hu.bbara.purefin.core.model.Library import hu.bbara.purefin.core.model.Media -import hu.bbara.purefin.core.model.MediaRepositoryState -import hu.bbara.purefin.core.model.Movie -import hu.bbara.purefin.core.model.Series -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.StateFlow import java.util.UUID interface AppContentRepository : MediaRepository { val libraries: StateFlow> - val state: StateFlow val continueWatching: StateFlow> val nextUp: StateFlow> val latestLibraryContent: StateFlow>> diff --git a/core/data/src/main/java/hu/bbara/purefin/core/data/CompositeMediaRepository.kt b/core/data/src/main/java/hu/bbara/purefin/core/data/CompositeMediaRepository.kt index 5388800..0b83405 100644 --- a/core/data/src/main/java/hu/bbara/purefin/core/data/CompositeMediaRepository.kt +++ b/core/data/src/main/java/hu/bbara/purefin/core/data/CompositeMediaRepository.kt @@ -1,5 +1,6 @@ package hu.bbara.purefin.core.data +import android.util.Log import hu.bbara.purefin.core.data.session.UserSessionRepository import hu.bbara.purefin.core.model.Episode import hu.bbara.purefin.core.model.Movie @@ -11,7 +12,9 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.stateIn import java.util.UUID import javax.inject.Inject @@ -28,14 +31,24 @@ import javax.inject.Singleton class CompositeMediaRepository @Inject constructor( private val offlineRepository: OfflineMediaRepository, private val onlineRepository: InMemoryMediaRepository, - private val userSessionRepository: UserSessionRepository, + private val networkMonitor: NetworkMonitor, ) : MediaRepository { private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + override val ready: StateFlow = combine( + offlineRepository.ready, + onlineRepository.ready + ) { offlineReady, onlineReady -> + offlineReady && onlineReady + }.stateIn(scope, SharingStarted.Eagerly, false) + + private val isOnline: StateFlow = networkMonitor.isOnline + .stateIn(scope, SharingStarted.Eagerly, false) + private val activeRepository: Flow = - userSessionRepository.isOfflineMode.flatMapLatest { offline -> - kotlinx.coroutines.flow.flowOf(if (offline) offlineRepository else onlineRepository) + networkMonitor.isOnline.flatMapLatest { online -> + flowOf(if (online) onlineRepository else offlineRepository) } override val movies: StateFlow> = activeRepository @@ -50,13 +63,42 @@ class CompositeMediaRepository @Inject constructor( .flatMapLatest { it.episodes } .stateIn(scope, SharingStarted.Eagerly, emptyMap()) + override fun upsertMovies(movies: List) { + if (!isOnline.value) { + Log.e("CompositeMediaRepository", "upsertMovies called in offline mode") + return + } + onlineRepository.upsertMovies(movies) + } + + override fun upsertSeries(series: List) { + if (!isOnline.value) { + Log.e("CompositeMediaRepository", "upsertSeries called in offline mode") + return + } + onlineRepository.upsertSeries(series) + } + + override fun upsertEpisodes(episodes: List) { + if (!isOnline.value) { + Log.e("CompositeMediaRepository", "upsertEpisodes called in offline mode") + return + } + onlineRepository.upsertEpisodes(episodes) + } + override fun observeSeriesWithContent(seriesId: UUID): Flow { return activeRepository.flatMapLatest { it.observeSeriesWithContent(seriesId) } } override suspend fun updateWatchProgress(mediaId: UUID, positionMs: Long, durationMs: Long) { - val isOffline = userSessionRepository.isOfflineMode.stateIn(scope).value - val repo = if (isOffline) offlineRepository else onlineRepository + val isOnline = networkMonitor.isOnline.stateIn(scope).value + val repo = if (isOnline) onlineRepository else offlineRepository repo.updateWatchProgress(mediaId, positionMs, durationMs) } + + override fun setReady() { + onlineRepository.setReady() + offlineRepository.setReady() + } } diff --git a/core/data/src/main/java/hu/bbara/purefin/core/data/InMemoryAppContentRepository.kt b/core/data/src/main/java/hu/bbara/purefin/core/data/InMemoryAppContentRepository.kt index d830504..07f6b76 100644 --- a/core/data/src/main/java/hu/bbara/purefin/core/data/InMemoryAppContentRepository.kt +++ b/core/data/src/main/java/hu/bbara/purefin/core/data/InMemoryAppContentRepository.kt @@ -9,20 +9,22 @@ import hu.bbara.purefin.core.data.session.UserSessionRepository import hu.bbara.purefin.core.model.Episode import hu.bbara.purefin.core.model.Library import hu.bbara.purefin.core.model.Media -import hu.bbara.purefin.core.model.MediaRepositoryState import hu.bbara.purefin.core.model.Movie import hu.bbara.purefin.core.model.Season import hu.bbara.purefin.core.model.Series import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.update +import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex import org.jellyfin.sdk.model.api.BaseItemDto import org.jellyfin.sdk.model.api.BaseItemKind import org.jellyfin.sdk.model.api.CollectionType @@ -40,49 +42,50 @@ class InMemoryAppContentRepository @Inject constructor( val userSessionRepository: UserSessionRepository, val jellyfinApiClient: JellyfinApiClient, private val homeCacheDataStore: DataStore, - private val mediaRepository: InMemoryMediaRepository, + private val mediaRepository: CompositeMediaRepository, private val networkMonitor: NetworkMonitor, -) : AppContentRepository, MediaRepository by mediaRepository { - - private val readyMutex = Mutex() +) : AppContentRepository { private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) - private var initialLoadTimestamp = 0L + private var loadJob: Job? = null - private val _state: MutableStateFlow = MutableStateFlow(MediaRepositoryState.Loading) - override val state: StateFlow = _state.asStateFlow() + private val contentRepositoryReady : MutableStateFlow = MutableStateFlow(false) + override val ready: StateFlow = combine(contentRepositoryReady, mediaRepository.ready) { + contentRepositoryReady, mediaRepositoryReady -> + contentRepositoryReady && mediaRepositoryReady + }.stateIn(scope, SharingStarted.Eagerly, false) private val _libraries: MutableStateFlow> = MutableStateFlow(emptyList()) + override val libraries: StateFlow> = _libraries.asStateFlow() - private val _continueWatching: MutableStateFlow> = MutableStateFlow(emptyList()) + override val continueWatching: StateFlow> = _continueWatching.asStateFlow() - private val _nextUp: MutableStateFlow> = MutableStateFlow(emptyList()) - override val nextUp: StateFlow> = _nextUp.asStateFlow() + override val nextUp: StateFlow> = _nextUp.asStateFlow() private val _latestLibraryContent: MutableStateFlow>> = MutableStateFlow(emptyMap()) override val latestLibraryContent: StateFlow>> = _latestLibraryContent.asStateFlow() + override val movies: StateFlow> = mediaRepository.movies + override val series: StateFlow> = mediaRepository.series + override val episodes: StateFlow> = mediaRepository.episodes + + override fun observeSeriesWithContent(seriesId: UUID): Flow { + return mediaRepository.observeSeriesWithContent(seriesId) + } + + override suspend fun updateWatchProgress( + mediaId: UUID, + positionMs: Long, + durationMs: Long + ) { + mediaRepository.updateWatchProgress(mediaId, positionMs, durationMs) + } + init { scope.launch { loadFromCache() - networkMonitor.isOnline.collect { isOnline -> - userSessionRepository.setOfflineMode(!isOnline) - if (isOnline) { - if (mediaRepository.ready.isCompleted) { - // Reset so ensureReady() performs a fresh network load - mediaRepository.reset() - _state.value = MediaRepositoryState.Loading - } - runCatching { ensureReady() } - } else { - // Going offline – complete ready with cached data so waiters don't hang - if (!mediaRepository.ready.isCompleted) { - _state.value = MediaRepositoryState.Ready - mediaRepository.signalReady() - } - } - } + ensureReady() } } @@ -133,48 +136,25 @@ class InMemoryAppContentRepository @Inject constructor( } override suspend fun ensureReady() { - val isOffline = userSessionRepository.isOfflineMode.first() - if (isOffline) { - // Offline mode: use cached data without network calls - val ready = mediaRepository.ready - if (!ready.isCompleted) { - _state.value = MediaRepositoryState.Ready - mediaRepository.signalReady() - } - mediaRepository.ready.await() + // check for combined ready state + if (ready.value) { return } - - val ready = mediaRepository.ready - if (ready.isCompleted) { - ready.await() + if (loadJob?.isActive == true) { return } - - // Only the first caller runs the loading logic; others wait on the deferred. - if (readyMutex.tryLock()) { - try { - if (mediaRepository.ready.isCompleted) { - mediaRepository.ready.await() - return - } - loadLibraries() - loadContinueWatching() - loadNextUp() - loadLatestLibraryContent() - persistHomeCache() - _state.value = MediaRepositoryState.Ready - initialLoadTimestamp = System.currentTimeMillis() - mediaRepository.signalReady() - } catch (t: Throwable) { - _state.value = MediaRepositoryState.Error(t) - mediaRepository.signalError(t) - throw t - } finally { - readyMutex.unlock() - } - } else { - mediaRepository.ready.await() + if (!contentRepositoryReady.value) { + return + } + contentRepositoryReady.value = true + loadJob?.cancel() + loadJob = scope.launch { + loadContinueWatching() + loadNextUp() + loadLatestLibraryContent() + loadLibraries() + mediaRepository.setReady() + persistHomeCache() } } @@ -192,10 +172,10 @@ class InMemoryAppContentRepository @Inject constructor( _libraries.value = filledLibraries val movies = filledLibraries.filter { it.type == CollectionType.MOVIES }.flatMap { it.movies.orEmpty() } - mediaRepository._movies.update { current -> current + movies.associateBy { it.id } } + mediaRepository.upsertMovies(movies) val series = filledLibraries.filter { it.type == CollectionType.TVSHOWS }.flatMap { it.series.orEmpty() } - mediaRepository._series.update { current -> current + series.associateBy { it.id } } + mediaRepository.upsertSeries(series) } suspend fun loadLibrary(library: Library): Library { @@ -217,7 +197,7 @@ class InMemoryAppContentRepository @Inject constructor( val movieItem = jellyfinApiClient.getItemInfo(movieId) ?: throw RuntimeException("Movie not found") val updatedMovie = movieItem.toMovie(serverUrl(), movieItem.parentId!!) - mediaRepository._movies.update { it + (updatedMovie.id to updatedMovie) } + mediaRepository.upsertMovies(listOf(updatedMovie)) return updatedMovie } @@ -225,7 +205,7 @@ class InMemoryAppContentRepository @Inject constructor( val seriesItem = jellyfinApiClient.getItemInfo(seriesId) ?: throw RuntimeException("Series not found") val updatedSeries = seriesItem.toSeries(serverUrl(), seriesItem.parentId!!) - mediaRepository._series.update { it + (updatedSeries.id to updatedSeries) } + mediaRepository.upsertSeries(listOf(updatedSeries)) return updatedSeries } @@ -248,7 +228,7 @@ class InMemoryAppContentRepository @Inject constructor( when (item.type) { BaseItemKind.EPISODE -> { val episode = item.toEpisode(serverUrl()) - mediaRepository._episodes.update { it + (episode.id to episode) } + mediaRepository.upsertEpisodes(listOf(episode)) } else -> { /* Do nothing */ } } @@ -268,7 +248,7 @@ class InMemoryAppContentRepository @Inject constructor( // Load episodes nextUpItems.forEach { item -> val episode = item.toEpisode(serverUrl()) - mediaRepository._episodes.update { it + (episode.id to episode) } + mediaRepository.upsertEpisodes(listOf(episode)) } } @@ -312,25 +292,20 @@ class InMemoryAppContentRepository @Inject constructor( //TODO Load seasons and episodes, other types are already loaded at this point. } - companion object { - private const val REFRESH_MIN_INTERVAL_MS = 30_000L - } - override suspend fun refreshHomeData() { - val isOffline = userSessionRepository.isOfflineMode.first() - if (isOffline) return + val isOnline = networkMonitor.isOnline.first() + if (!isOnline) return - mediaRepository.ready.await() - // Skip refresh if the initial load (or last refresh) just happened - val elapsed = System.currentTimeMillis() - initialLoadTimestamp - if (elapsed < REFRESH_MIN_INTERVAL_MS) return - - loadLibraries() - loadContinueWatching() - loadNextUp() - loadLatestLibraryContent() - persistHomeCache() - initialLoadTimestamp = System.currentTimeMillis() + if(loadJob?.isActive == true) { + return + } + loadJob = scope.launch { + loadLibraries() + loadContinueWatching() + loadNextUp() + loadLatestLibraryContent() + persistHomeCache() + } } private suspend fun serverUrl(): String { diff --git a/core/data/src/main/java/hu/bbara/purefin/core/data/InMemoryMediaRepository.kt b/core/data/src/main/java/hu/bbara/purefin/core/data/InMemoryMediaRepository.kt index a1f90b7..58aa58f 100644 --- a/core/data/src/main/java/hu/bbara/purefin/core/data/InMemoryMediaRepository.kt +++ b/core/data/src/main/java/hu/bbara/purefin/core/data/InMemoryMediaRepository.kt @@ -7,7 +7,6 @@ import hu.bbara.purefin.core.model.Episode import hu.bbara.purefin.core.model.Movie import hu.bbara.purefin.core.model.Season import hu.bbara.purefin.core.model.Series -import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob @@ -36,11 +35,8 @@ class InMemoryMediaRepository @Inject constructor( ) : MediaRepository { private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) - @Volatile internal var ready = CompletableDeferred() - internal fun reset() { - ready = CompletableDeferred() - } + override val ready: MutableStateFlow = MutableStateFlow(false) internal val _movies: MutableStateFlow> = MutableStateFlow(emptyMap()) override val movies: StateFlow> = _movies.asStateFlow() @@ -50,27 +46,29 @@ class InMemoryMediaRepository @Inject constructor( internal val _episodes: MutableStateFlow> = MutableStateFlow(emptyMap()) override val episodes: StateFlow> = _episodes.asStateFlow() - - internal fun signalReady() { - ready.complete(Unit) + override fun upsertMovies(movies: List) { + _movies.update { current -> current + movies.associateBy { it.id } } } - internal fun signalError(t: Throwable) { - ready.completeExceptionally(t) + override fun upsertSeries(series: List) { + _series.update { current -> current + series.associateBy { it.id } } } - private suspend fun awaitReady() { - ready.await() + override fun upsertEpisodes(episodes: List) { + _episodes.update { current -> current + episodes.associateBy { it.id } } } override fun observeSeriesWithContent(seriesId: UUID): Flow { scope.launch { - awaitReady() ensureSeriesContentLoaded(seriesId) } return _series.map { it[seriesId] } } + override fun setReady() { + ready.value = true + } + private suspend fun ensureSeriesContentLoaded(seriesId: UUID) { _series.value[seriesId]?.takeIf { it.seasons.isNotEmpty() }?.let { return } diff --git a/core/data/src/main/java/hu/bbara/purefin/core/data/MediaRepository.kt b/core/data/src/main/java/hu/bbara/purefin/core/data/MediaRepository.kt index 4033b72..9f83649 100644 --- a/core/data/src/main/java/hu/bbara/purefin/core/data/MediaRepository.kt +++ b/core/data/src/main/java/hu/bbara/purefin/core/data/MediaRepository.kt @@ -8,9 +8,18 @@ import kotlinx.coroutines.flow.StateFlow import java.util.UUID interface MediaRepository { + val ready: StateFlow val movies: StateFlow> val series: StateFlow> val episodes: StateFlow> + fun upsertMovies(movies: List) { + } + fun upsertSeries(series: List) { + } + fun upsertEpisodes(episodes: List) { + } fun observeSeriesWithContent(seriesId: UUID): Flow + fun setReady() { + } suspend fun updateWatchProgress(mediaId: UUID, positionMs: Long, durationMs: Long) } \ No newline at end of file diff --git a/core/data/src/main/java/hu/bbara/purefin/core/data/OfflineMediaRepository.kt b/core/data/src/main/java/hu/bbara/purefin/core/data/OfflineMediaRepository.kt index 38c755e..0beb237 100644 --- a/core/data/src/main/java/hu/bbara/purefin/core/data/OfflineMediaRepository.kt +++ b/core/data/src/main/java/hu/bbara/purefin/core/data/OfflineMediaRepository.kt @@ -8,6 +8,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.stateIn @@ -21,8 +22,10 @@ import javax.inject.Singleton */ @Singleton class OfflineMediaRepository @Inject constructor( - private val localDataSource: OfflineRoomMediaLocalDataSource + private val localDataSource: OfflineRoomMediaLocalDataSource, ) : MediaRepository { + override val ready: StateFlow = MutableStateFlow(false) + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) override val movies: StateFlow> = localDataSource.moviesFlow @@ -34,6 +37,18 @@ class OfflineMediaRepository @Inject constructor( override val episodes: StateFlow> = localDataSource.episodesFlow .stateIn(scope, SharingStarted.Eagerly, emptyMap()) + override fun upsertMovies(movies: List) { + TODO("Not yet implemented") + } + + override fun upsertSeries(series: List) { + TODO("Not yet implemented") + } + + override fun upsertEpisodes(episodes: List) { + TODO("Not yet implemented") + } + override fun observeSeriesWithContent(seriesId: UUID): Flow { return localDataSource.observeSeriesWithContent(seriesId) } @@ -46,4 +61,8 @@ class OfflineMediaRepository @Inject constructor( localDataSource.updateWatchProgress(mediaId, progressPercent, watched) } + override fun setReady() { + // OfflineMediaRepository works from the database. So it is ready immediately. + } + }