[WIP][Kotlin Side Project] 자체 단일 저장소 DB 저장 구현 오류

2021. 11. 15. 14:49Kotlin/Kotlin Side Project

class RefreshControl(
    val rate : Long = DEFAULT_REFRESH_RATE_MS,
    var lastUpdate : Date? = null

) {
    companion object{
        val DEFAULT_REFRESH_RATE_MS = TimeUnit.MINUTES.toMillis(5)
    }

    interface Listener{
        suspend fun cleanUp()
    }

    private val listeners : MutableList<Listener> = mutableListOf()
    private val children : MutableList<RefreshControl> = mutableListOf()

    suspend fun evict(cleanup : Boolean) {
        lastUpdate = null
        children.forEach{ it.evict(cleanup) }
        if(cleanup){
            listeners.forEach { it.cleanUp() }
        }
    }

    // Public API
    fun createChild(): RefreshControl =
        RefreshControl(rate, lastUpdate).also { children.add(it) }

    fun addListener(listener: Listener) {
        listeners.add(listener)
    }

    fun refresh() {
        Log.i("KMJTEST","[RefreshControl] refresh()")
        lastUpdate = Date()
    }

    fun isExpired() = lastUpdate?.let { (Date().time - it.time) > rate } ?: true
}
class PopularMovieResource<in Input, out Output>(
    private val remoteFetch: suspend (Input) -> Output?,
    private val localFetch: suspend (Input) -> Output?,
    private val localStore: suspend (Output) -> Unit,
    private val localDelete: suspend () -> Unit,
    private val refreshControl: RefreshControl = RefreshControl()
) : RefreshControl.Listener {

    init { refreshControl.addListener(this) }

    // public api
    suspend fun query(args: Input, force: Boolean = false): Output {
        Log.i("KMJTEST","[PopularMovieResource] query()")
        Log.i("KMJTEST","[PopularMovieResource] lastUpdate? ${refreshControl.lastUpdate} refreshControl is expired? ${refreshControl.isExpired()} force? $force")
        if (!refreshControl.isExpired()  && !force) {
                return fetchFromLocal(args)!!
        }else{
                return fetchFromRemote(args)!!
        }
    }

    override suspend fun cleanUp() {
        deleteLocal()
    }

    // Private API
    private suspend fun deleteLocal() = kotlin.runCatching {
        localDelete()
    }.getOrNull()


    private suspend fun fetchFromLocal(args: Input) = kotlin.runCatching {
        localFetch(args)
    }.getOrNull()

    private suspend fun fetchFromRemote(args: Input) = kotlin.runCatching {
        Log.i("KMJTEST","[PopularMovieResource] fetchRemote()")
        remoteFetch(args)
    }.getOrThrow()?.also {
        localStore(it)
        refreshControl.refresh()
    }
}
class LocalDataSource(val localMovieDatabase: LocalMovieDatabase) :
    PagingSource<Int, PopularMovie>() {
    override fun getRefreshKey(state: PagingState<Int, PopularMovie>): Int? {
        Log.i("KMJTEST", "[LocalDataSource] getRefreshKey ${state.anchorPosition}")
        return state.anchorPosition?.let { pos ->
            state.closestPageToPosition(pos)?.prevKey?.plus(1)
                ?: state.closestPageToPosition(pos)?.nextKey?.minus(1)
        }
    }

    override suspend fun load(params: LoadParams<Int>): LoadResult<Int, PopularMovie> {
        Log.i("KMJTEST", "[LocalDataSource] ${params.key} key, ${params.loadSize} loadsize")
        val page = params.key ?: 1
        return try {
            localMovieDatabase.withTransaction {
                lateinit var popularMovieLocalList: List<PopularMovie>
                //localMovieDatabase.popularMovieDao().getPopularMoviesFlowWithPage(
                //      page = page,
                //      loadsize = 10
                //  ).collect { value ->
                //      popularMovieLocalList = value.map { it.toEntity() }
                //  } // TODO : check collect() is right

                popularMovieLocalList =  localMovieDatabase.popularMovieDao().getPopularMovies(page, 20).map { it.toEntity() }

                popularMovieLocalList.forEach {
                    Log.i("KMJTEST", "[LocalDataSource] id ${it.id}")
                }
                val lastId = popularMovieLocalList.lastOrNull()?.id
                val nextPage = localMovieDatabase.popularMovieDao().getNextPage(lastId)
                val nextPageList = localMovieDatabase.popularMovieDao().getNextPageAll();

                Log.i("KMJTEST", "[LocalDataSource] popularList ${popularMovieLocalList.size}")
                Log.i("KMJTEST", "[LocalDataSource] lastId ${lastId}")
                Log.i("KMJTEST", "[LocalDataSource] nextPage ${nextPage}")
                nextPageList.forEach {
                    Log.i("KMJTEST", "[LocalDataSource] movieId ${it.movieId} nextPage ${it.nextPage}")
                }

                LoadResult.Page(
                    data = popularMovieLocalList,
                    prevKey = if (page == 1) null else page - 1,
                    nextKey =  if (popularMovieLocalList.isEmpty()&&page!=1) null else page + 1
                )
            }

        } catch (e: Exception) {
            return LoadResult.Error(e)
        }
    }

    suspend fun store(flow: Flow<PagingData<PopularMovie>>) {
        //localMovieDatabase.popularMovieDao().insertPopularMovies(*popularMovieLocal) // TODO : check vararg and *
        Log.i("KMJTEST","[LocalDataSource] store() $flow")
        flow.collect {
            Log.i("KMJTEST","[LocalDataSource] store() #1 $it")
            it.map {
                Log.i("KMJTEST","[LocalDataSource] store() #2 $it")
                localMovieDatabase.popularMovieDao().insertPopularMovie(
                    PopularMovieLocal(
                        it.poster_path,
                        it.adult,
                        it.overview,
                        it.release_date,
                        it.genre_ids,
                        it.id,
                        it.original_title,
                        it.original_language,
                        it.title,
                        it.backdrop_path,
                        it.popularity,
                        it.vote_count,
                        it.video,
                        it.vote_average
                    )

                )
            }
        }
    }

    suspend fun delete() {
        localMovieDatabase.popularMovieDao().deletePopularMovieAll()
    }
}
class PopularMovieRepositoryImpl(
    private val localDataSource: LocalDataSource,
    private val remoteDataSource: RemoteDataSource
) : PopularMovieRepository {

    private val popularMovieResource = PopularMovieResource<Unit, Flow<PagingData<PopularMovie>>>(
        remoteFetch = {
            Log.i("KMJTEST","[PopularMovieRepositoryImpl] remoteFetch()")
        Pager(
            pagingSourceFactory = { remoteDataSource },
            config = PagingConfig(pageSize = 20)
        ).flow },
        localFetch = {
            Log.i("KMJTEST","[PopularMovieRepositoryImpl] localFetch()")
            Pager(
            pagingSourceFactory = { localDataSource },
            config = PagingConfig(pageSize = 10)
        ).flow },
        localStore = localDataSource::store,
        localDelete = localDataSource::delete,
        refreshControl = RefreshControl(TimeUnit.MINUTES.toMillis(5))
    )
   
    override suspend fun getAll(): Flow<PagingData<PopularMovie>> {
        Log.i("KMJTEST","[PopularMovieRespositoryImpl] getAll()")
        return popularMovieResource.query(Unit, false)
    }

}

[이슈]

RemoteMediator 없이 PopularMovieRosource를 통해 단일 데이터 소스를 만들어보려고 했으나, LocalDataSource에서 store를 구현 시 로직을 타지 않아 db에 저장이 되지 않음.

 

[원인]

LocalDataSource에 넘겨주는 flow에서 collect()를 해버리면, 현재 코루틴은 flow가 취소되거나 완료될때까지 suspend 되어 이후 로직이 진행되지 않음. 이를 방지하기 위해서 take(1).collect()로 바꾸게 되면 이후 로직이 진행되어 adapter까지 데이터는 전달되지만 collect된 PagingData는 remoteFetch로 넘겨준 Pager.flow와는 다르고, 아직 왜인지 모르지만 flow.map 로직 안에서 it.map을 건너뛰게 됨.

 

[해결]

AAC에서 제공되는 RemoteMediator를 이용해 단일 데이터 저장소를 적용하자. 

만약, 해당 소스로 진행하고자 한다면, remoteFetch에서 넘겨주는 Pager.flow에 map 로직으로 store 로직을 추가하면 데이터가 db에 저장되어 정상 동작하게 된다.