プロダクトで安全にDataStore移行する

DataStoreの紹介

DataStoreはローカルのデータストレージのライブラリです。SharedPreferenceの代替ソリューションとしても紹介されています。

DataStoreの特徴として、以下のようなものがあります。

  • Preferences DataStoreProto DataStore の2種類がある。
  • Kotlin Coroutines と Flowをベースに実装されている。

それぞれ詳しくみていきます。

Preferences DataStore と Proto DataStore

DataStoreにはPreferences DataStoreProto DataStoreの2種類があります。

特徴的な違いは「データ格納方法」と「タイプセーフかどうか」の2点です。

Preferences DataStore Proto DataStore
データ格納方法 Key-Valueでデータ格納 Protocol Buffersを利用して型付きオブジェクトを格納
タイプセーフかどうか タイプセーフではない タイプセーフである

Preferences DataStoreの実装イメージ

ユーザの年齢を記録しておくアプリを作るとします。 記録した年齢をDataStoreに読み書きする場合は、以下のような実装になります。

data class UserPreferences(
    val age: Int
)

class UserPreferencesRepository(
    // ① DataStoreのインスタンス作成
    private val dataStore: DataStore<Preferences>
) {

    private object PreferencesKeys {
        // ② キーの定義
        val AGE: Preferences.Key<Int> = intPreferencesKey("age")
    }

    // ③ データの取得
    val userPreferencesFlow: Flow<UserPreferences> = dataStore.data
        .map { preferences ->
            UserPreferences(
                age = preferences[PreferencesKeys.AGE] ?: -1
            )
        }

    // ④ データの更新
    suspend fun updateAge(age: Int) {
        dataStore.edit { preferences ->
            preferences[PreferencesKeys.AGE] = age
        }
    }
}

① DataStoreのインスタンス作成

DataStore 自体はインタフェースです。 データ読み取る時の data、書き込むときのupdateData() だけが公開されています。

そして、ここでは大事なことが2点あります。

  • dataFlowになっていること
  • updateData()がsuspend関数になっていること

dataFlowになっているので、利用側はこのFlowを購読することでPreferences変更を購読することができます。 これによってPreferencesに記録したデータをSingle Source of Truthとして扱いやすくなります。

またupdateData()がsuspend関数になっているということは、利用側はCoroutinesのスコープから利用する必要があります。

public interface DataStore<T> {

    public val data: Flow<T>

    public suspend fun updateData(transform: suspend (t: T) -> T): T
}

DataStore.kt - Android Code Searchから抜粋

また、DataStore<T>ジェネリクスを使っているのがわかります。 Preferences DataStore を利用するはPreferencesを使います。

Preferences自体は抽象クラスになっています。

ここで大事なことは以下の点です。

  • コンストラクタが internal になっていること
  • 内部クラスを2つ持っていること
  • Mapに似たメソッドを持っていること
public abstract class Preferences internal constructor() {

    public class Key<T> internal constructor(public val name: String) {
        ...
    }

    public class Pair<T> internal constructor(internal val key: Key<T>, internal val value: T)

    public abstract operator fun <T> contains(key: Key<T>): Boolean

    public abstract operator fun <T> get(key: Key<T>): T?

    public abstract fun asMap(): Map<Key<*>, Any>

    public fun toMutablePreferences(): MutablePreferences {
        return MutablePreferences(asMap().toMutableMap(), startFrozen = false)
    }

    public fun toPreferences(): Preferences {
        return MutablePreferences(asMap().toMutableMap(), startFrozen = true)
    }
}

Preferences.kt - Android Code Searchから抜粋

そもそもPreferencesが抽象クラスというのもありますが、Preferences() とやってインスタンスを作ることはできません。 ちなみにPreferencesを継承しているMutablePreferencesもコンストラクタがinternalになっています。

DataStore<Preferences>インスタンスを作るにはPreferenceDataStoreFactory#createを利用します。

引数が4つあります。

  • corruptionHandler → ファイルを読み込んでシリアライズする時の例外をキャッチした時に実行される。この引数に渡すReplaceFileCorruptionHandlerでは、その例外が起きた時に新しく書き込むためのデータを指定する。
  • migrationsマイグレーションの指定。
  • scope → IO処理と変換処理を行うコルーチンスコープ。デフォルト引数はCoroutineScope(Dispatchers.IO + SupervisorJob())
  • produceFile → DataStoreが動作するためのファイルを返す関数。
public object PreferenceDataStoreFactory {

    @JvmOverloads
    public fun create(
        corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
        migrations: List<DataMigration<Preferences>> = listOf(),
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
        produceFile: () -> File
    ): DataStore<Preferences> {
        val delegate = DataStoreFactory.create(
            serializer = PreferencesSerializer,
            corruptionHandler = corruptionHandler,
            migrations = migrations,
            scope = scope
        ) {
            val file = produceFile()
            check(file.extension == PreferencesSerializer.fileExtension) { ... }
            file
        }
        return PreferenceDataStore(delegate)
    }
}

create(...)のブロック内では、引数をさらにDataStoreFactory.create(...)に渡しているのがわかります。

PreferenceDataStoreFactory#createの引数と比べるとserializerが増えています。

  • serializerDataStore<T>で使われるT型のSerializer。T型はImmutableでなければならない。PreferencesSerializer固定。

create(...)のブロック内ではSingleProcessDataStoreインスタンスを作成しています。

public object DataStoreFactory {
    ...
    @JvmOverloads // Generate constructors for default params for java users.
    public fun <T> create(
        serializer: Serializer<T>,
        corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
        migrations: List<DataMigration<T>> = listOf(),
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
        produceFile: () -> File
    ): DataStore<T> =
        SingleProcessDataStore(
            produceFile = produceFile,
            serializer = serializer,
            corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
            initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
            scope = scope
        )
}

DataStoreFactory.kt - Android Code Searchから抜粋

SingleProcessDataStoreDataStore<T>の実装クラスです。つまりはDataStoreを利用している時はこのクラスが実体になっています。

/**
 * Single process implementation of DataStore. This is NOT multi-process safe.
 */
internal class SingleProcessDataStore<T>(
    private val produceFile: () -> File,
    private val serializer: Serializer<T>,
    initTasksList: List<suspend (api: InitializerApi<T>) -> Unit> = emptyList(),
    private val corruptionHandler: CorruptionHandler<T> = NoOpCorruptionHandler<T>(),
    private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
) : DataStore<T> {
    ...
}

SingleProcessDataStore.kt - Android Code Searchから抜粋

内部の処理については後述しますが、ここで重要なのは最初のコメント文に書かれています。

Single process implementation of DataStore. This is NOT multi-process safe.

ここでいう"process"は、ActivityServiceが動いているプロセスを指しています。 デフォルトでは同じアプリのActivityなどは同じプロセスで動きますが、独自のプロセスを設定することもできます。 つまりは、もし異なるプロセスからDataStoreを利用するのは安全ではないということが書かれています。

参考:プロセスとスレッドの概要 | Android Developers

ここまででDataStoreのインスタンスが作成されるまでの流れを追うことができました。

② キーの定義

Preference DataStoreはKey-Valueでデータ格納するので、そのためのキーを定義する必要があります。

キーは型はPreferences.Key<T>になります。(Preferencesの内部クラス)

ちなみにPreferences.Key<T>のコンストラクタはinternalになっています。 つまりは利用する側はPreferences.Key<String>(...)のようにインスタンスを作ることができません。

そこで、キーの定義をするためにはintPreferencesKey(...)というような関数が用意されています。

サンプルコードではval AGE: Preferences.Key<Int> = intPreferencesKey("age")というコードでキーを定義しています。 これはInt型の値を格納するためのキーを定義していますが、他の型も定義可能です。

  • Int型 → intPreferencesKey
  • Double型 → doublePreferencesKey
  • String型 → stringPreferencesKey
  • Boolean型 → booleanPreferencesKey
  • Float型 → floatPreferencesKey
  • Long型 → longPreferencesKey
  • Set<String>型 → stringSetPreferencesKey

ちなみにSharedPreferenceではDouble型を読み書きするインタフェースはありませんでした。

③ データの取得

DataStore<T>#dataからデータを取得します。dataの戻り値はFlow<T>になっていて、DataStore<Preferences>を利用している場合はPreferencesが流れてきます。 そして流れてきたPreferencesからキーを使って値を取り出します。

ここでのポイントは以下です。

  • キーに対応する値だけ流れてくるのではなく、Preferencesが流れてくる。

サンプルコードではdataStore.data.map { ... }のブロック内でPreferencesKeys.AGEを使って値を取り出していますが、他のキーの値が変わった場合は変更後のPreferencesが流れてくるので、PreferencesKeys.AGEに対応する値に変更がなくてもそのブロック内の処理は動いています。

次に、DataStore<T>#dataにデータが放出されるまでの流れについてです。

前述の通り、DataStore<T>の実体はSingleProcessDataStore<T>です。

internal class SingleProcessDataStore<T>(
    ...
) : DataStore<T> {

    override val data: Flow<T> = flow {

        val currentDownStreamFlowState = downstreamFlow.value

        if (currentDownStreamFlowState !is Data) {
            // We need to send a read request because we don't have data yet.
            actor.offer(Message.Read(currentDownStreamFlowState))
        }

        emitAll(
            downstreamFlow.dropWhile {
                if (currentDownStreamFlowState is Data<T> ||
                    currentDownStreamFlowState is Final<T>
                ) {
                    // We don't need to drop any Data or Final values.
                    false
                } else {
                    // we need to drop the last seen state since it was either an exception or
                    // wasn't yet initialized. Since we sent a message to actor, we *will* see a
                    // new value.
                    it === currentDownStreamFlowState
                }
            }.map {
                when (it) {
                    is ReadException<T> -> throw it.readException
                    is Final<T> -> throw it.finalException
                    is Data<T> -> it.value
                    is UnInitialized -> error(
                        "This is a bug in DataStore. Please file a bug at: " +
                            "https://issuetracker.google.com/issues/new?" +
                            "component=907884&template=1466542"
                    )
                }
            }
        )
    }

    ...

    @Suppress("UNCHECKED_CAST")
    private val downstreamFlow = MutableStateFlow(UnInitialized as State<T>)

    ...
}

SingleProcessDataStore.kt - Android Code Searchから抜粋

まず、現在のdownStreamFlowの状態がチェックされています。 downStreamFlowSingleProcessDataStoreのメンバになっていて、MutableStateFlow<State<T>>(...)で初期化されています。

State<T>SingleProcessDataStore.ktにファイルプライベートで定義されています。 状態は4つあります。

  • UnInitialized → 初期化前の状態
  • Data<T> → ディスクから読み込みに成功した状態。現在のディスクの値を保持している。
  • ReadException<T> → ディスクからの読み込みに失敗した状態。
  • Final<T> → データの読み書きができなくなった状態。

なお、downStreamFlowの初期値はState.UnInitializedになっています。

private sealed class State<T>

private object UnInitialized : State<Any>()

private class Data<T>(val value: T, val hashCode: Int) : State<T>() {
    fun checkHashCode() {
        check(value.hashCode() == hashCode) {
            "Data in DataStore was mutated but DataStore is only compatible with Immutable types."
        }
    }
}

private class ReadException<T>(val readException: Throwable) : State<T>()

private class Final<T>(val finalException: Throwable) : State<T>()

SingleProcessDataStore.kt - Android Code Searchから抜粋

そしてdownStreamFlowの状態がData<T>ではない時は、actor.offer(Message.Read(currentDownStreamFlowState))という呼出しをしています。

actorSingleProcessDataStoreのメンバになっていて、実体はSimpleActorであることがわかります。

SimpleActorの内部の役割は以下です。(メッセージのキューを管理しているだけなので、内部実装についての詳細は特に述べない)

  • 引数のscopeに与えたCoroutineScopeが持っているJobが完了したらonCompleteをコールバックする。
  • 上記と同じタイミングでメッセージキューに残っているものがあれば、全てonUndeliveredElementでコールバックする。
  • offer(...)でメッセージをキューに追加する。
  • メッセージのキューが消費されるたびにconsumeMessageをコールバックする。

そして、SingleProcessDataStoreSimpleActorのコールバック時に実行されている処理は以下の通りです。

  • onCompleteのコールバック時 → downstreamの状態をFinal<T>にする。activeFilesからfile絶対パスを削除する。
  • onUndeliveredElementのコールバック時 → Message.Update型のメッセージが保持している遅延処理ackを完了させる。
  • consumeMessageのコールバック時 → handleRead(...)またはhandleUpdate(...)を呼び出す。
    private val actor = SimpleActor<Message<T>>(
        scope = scope,
        onComplete = {
            it?.let {
                downstreamFlow.value = Final(it)
            }

            synchronized(activeFilesLock) {
                activeFiles.remove(file.absolutePath)
            }
        },
        onUndeliveredElement = { msg, ex ->
            if (msg is Message.Update) {
                msg.ack.completeExceptionally(
                    ex ?: CancellationException(
                        "DataStore scope was cancelled before updateData could complete"
                    )
                )
            }
        }
    ) { msg ->
        when (msg) {
            is Message.Read -> {
                handleRead(msg)
            }
            is Message.Update -> {
                handleUpdate(msg)
            }
        }
    }

つまりは、actor.offer(Message.Read(currentDownStreamFlowState))を呼び出して、そのメッセージが処理されるとhandleRead(...)が呼ばれることになります。

handleRead()では改めてdownstreamの状態がチェックされています。そしてそれぞれの状態ごとに処理が分岐しています。

  • Data → 何もしない
  • ReadException → メッセージがキューに追加された時点から状態が変わっていなければreadAndInitOrPropagateFailure()を呼び出す。
  • UnInitializedreadAndInitOrPropagateFailure()を呼び出す
  • Final → 例外を投げる
    private suspend fun handleRead(read: Message.Read<T>) {
        when (val currentState = downstreamFlow.value) {
            is Data -> {
                // We already have data so just return...
            }
            is ReadException -> {
                if (currentState === read.lastState) {
                    readAndInitOrPropagateFailure()
                }

                // Someone else beat us but also failed. The collector has already
                // been signalled so we don't need to do anything.
            }
            UnInitialized -> {
                readAndInitOrPropagateFailure()
            }
            is Final -> error("Can't read in final state.") // won't happen
        }
    }

SingleProcessDataStore.kt - Android Code Searchから抜粋

readAndInitOrPropagateFailure()ではさらにreadAndInit()を呼び出しています。もしそこで例外が起きたら、キャッチしてdownstreamの状態をReadExceptionに更新しています。

    private suspend fun readAndInitOrPropagateFailure() {
        try {
            readAndInit()
        } catch (throwable: Throwable) {
            downstreamFlow.value = ReadException(throwable)
        }
    }

readAndInit()ではdownstreamFlowの状態がUnInitializedReadExceptionの初期化ができていない状態かどうかチェックされています。

そのチェックが通ると、readDataOrHandleCorruption()が呼ばれて初期データの読み込みがされます。

private suspend fun readAndInit() {
        // This should only be called if we don't already have cached data.
        check(downstreamFlow.value == UnInitialized || downstreamFlow.value is ReadException)

        val updateLock = Mutex()
        var initData = readDataOrHandleCorruption()

        var initializationComplete: Boolean = false

        // TODO(b/151635324): Consider using Context Element to throw an error on re-entrance.
        val api = object : InitializerApi<T> {
            override suspend fun updateData(transform: suspend (t: T) -> T): T {
                return updateLock.withLock() {
                    if (initializationComplete) {
                        throw IllegalStateException(
                            "InitializerApi.updateData should not be " +
                                "called after initialization is complete."
                        )
                    }

                    val newData = transform(initData)
                    if (newData != initData) {
                        writeData(newData)
                        initData = newData
                    }

                    initData
                }
            }
        }

        initTasks?.forEach { it(api) }
        initTasks = null // Init tasks have run successfully, we don't need them anymore.
        updateLock.withLock {
            initializationComplete = true
        }

        downstreamFlow.value = Data(initData, initData.hashCode())
    }

SingleProcessDataStore.kt - Android Code Searchから抜粋

readDataOrHandleCorruption()ではreadData()が呼ばれています。

    private suspend fun readDataOrHandleCorruption(): T {
        try {
            return readData()
        } catch (ex: CorruptionException) {

            val newData: T = corruptionHandler.handleCorruption(ex)

            try {
                writeData(newData)
            } catch (writeEx: IOException) {
                // If we fail to write the handled data, add the new exception as a suppressed
                // exception.
                ex.addSuppressed(writeEx)
                throw ex
            }

            // If we reach this point, we've successfully replaced the data on disk with newData.
            return newData
        }
    }

SingleProcessDataStore.kt - Android Code Searchから抜粋

readData()ではFileInputStreamが使われて、Preferencesのデータが書き込まれているファイルを読み込みしています。

serializer.readFrom() が呼ばれていますが、Preferences DataStoreではserializerPreferencesSerializerになっています。

    private suspend fun readData(): T {
        try {
            FileInputStream(file).use { stream ->
                return serializer.readFrom(stream)
            }
        } catch (ex: FileNotFoundException) {
            if (file.exists()) {
                throw ex
            }
            return serializer.defaultValue
        }
    }

serializer.readFrom()の内部では、まずPreferencesMapCompat.readFrom(...)が呼ばれています。

internal object PreferencesSerializer : Serializer<Preferences> {
    ...

    @Throws(IOException::class, CorruptionException::class)
    override suspend fun readFrom(input: InputStream): Preferences {
        val preferencesProto = PreferencesMapCompat.readFrom(input)

        val mutablePreferences = mutablePreferencesOf()

        preferencesProto.preferencesMap.forEach { (name, value) ->
            addProtoEntryToPreferences(name, value, mutablePreferences)
        }

        return mutablePreferences.toPreferences()
    }

    ...

    private fun addProtoEntryToPreferences(
        name: String,
        value: Value,
        mutablePreferences: MutablePreferences
    ) {
        return when (value.valueCase) {
            Value.ValueCase.BOOLEAN ->
                mutablePreferences[booleanPreferencesKey(name)] =
                    value.boolean
            ...
        }
    }
}

PreferencesSerializer.kt - Android Code Searchから抜粋

PreferencesMapCompat.readFrom(...)の内部ではPreferencesProto.PreferenceMap.parseFrom(...)が呼ばれています。

class PreferencesMapCompat {
    companion object {
        fun readFrom(input: InputStream): PreferencesProto.PreferenceMap {
            return try {
                PreferencesProto.PreferenceMap.parseFrom(input)
            } catch (ipbe: InvalidProtocolBufferException) {
                throw CorruptionException("Unable to parse preferences proto.", ipbe)
            }
        }
    }

PreferencesMapCompat.kt - Android Code Searchから抜粋

PreferencesProto.PreferenceMap.parseFrom(...)の内部実装をさらに読み進めても、protobufのファイルをパースする処理が続いてあまり述べることがないのでここでは言及しませんが、ここで重要なのはPreferencesProto.PreferenceMapインスタンスを作っていることです。なお、フィールドにpreferences_というMapも保持しています。

    public static final class PreferenceMap extends GeneratedMessageLite<PreferencesProto.PreferenceMap, PreferencesProto.PreferenceMap.Builder> implements PreferencesProto.PreferenceMapOrBuilder {
        public static final int PREFERENCES_FIELD_NUMBER = 1;
        private MapFieldLite<String, PreferencesProto.Value> preferences_ = MapFieldLite.emptyMapField();
        ...


        private PreferenceMap() {
        }

PreferencesSerializer#readFrom(...)の処理に戻ると、preferencesProto.preferencesMap.forEach {...の処理があります。

preferencesProto.preferencesMapの内部処理を見ると、PreferencesProto.PreferenceMapのフィールドで保持していたpreferences_を返却しているのがわかります。

そして、値を取り出してaddProtoEntryToPreferences(...)mutablePreferencesにデータを追加していっています。

そうしてSingleProcessDataStore#readDataの処理にserializer.readFrom(...)の戻り値が来たら、readDataOrHandleCorruption()の呼び出し元のreadAndInit()まで戻ることができます。

readAndInit()内でinitDataが作られた後は、val api = object : InitializerApi<T> {という処理があります。次の行ではinitTasks?.forEach { it(api) }という処理もあり、initTasksマイグレーションの処理のリストなので、apiマイグレーション時に生じる書き込み処理を実装しているのがわかります。

マイグレーションも完了したら、downstreamの状態をDataにしてreadAndInit()の処理は終わっています。ここまで完了するとhandleRead()の呼び出し元まで戻ることができます。

さて、SingleProcessDataStore#dataのFlowビルダーのブロックまで戻ってきました。

次にemitAll(downstreamFlow.dropWhile {...の処理があります。 これはdownstreamの状態がDataまたはFinalになるまで待機して、今の状態から変わるたびにデータを放出します。なおPreferencesが放出されるのは状態がDataになった時だけです。

そしてPreferencesが放出されたら、ようやくDataStore<T>#dataの呼び出し元UserPreferencesRepositoryにデータが流れてきます。

そうしたら② キーの定義で定義したkeyを使ってPreferencesからデータを取り出しています。そしてサンプルコードではUserPreferencesというクラスでラップして扱いやすくしています。

④ データの更新

DataStore<T>のインタフェースにはupdateData()というメソッドがありますが、Preferencesを扱う場合はDataStore<Preferences>#editという拡張関数が用意されています。

この拡張関数を利用しますが、内部的にはDataStore<T>#updateData()を呼び出しています。

PreferenceDataStoreFactoryDataStoreを生成しているので、実体はPreferenceDataStoreです。DataStore<T>.dataを呼び出した時はSingleProcessDataStoreに完全にdelegateしていましたが、updateData()はオーバーライドしています。 ここでは引数のtransformを使ってPreferencesの値を書き換えています。最終的にはdelegate#updateDataを呼び出しています。

internal class PreferenceDataStore(private val delegate: DataStore<Preferences>) :
    DataStore<Preferences> by delegate {
    override suspend fun updateData(transform: suspend (t: Preferences) -> Preferences):
        Preferences {
            return delegate.updateData {
                val transformed = transform(it)
                ...
                (transformed as MutablePreferences).freeze()
                transformed
            }
        }
}

PreferenceDataStoreFactory.kt - Android Code Searchから抜粋

delegate先のSingleProcessDataStore#updateDataを見ると、actor.offer(...)を呼び出してメッセージのキューにMessage.Updateを追加しているのがわかります。

そしてactorで追加したMessage.Updateが処理されるとhandleUpdate(...)が呼び出されます。

ここで気をつけたいのはack.await()があることで、ここでMessage.Updateに渡しているack(Deferred)が完了するまで待機しています。

internal class SingleProcessDataStore<T>(
    ...
    override suspend fun updateData(transform: suspend (t: T) -> T): T {
        val ack = CompletableDeferred<T>()
        val currentDownStreamFlowState = downstreamFlow.value

        val updateMsg =
            Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)

        actor.offer(updateMsg)

        return ack.await()
    }
    ...
}

handleUpdate()の内部ではまず、downstreamの状態をチェックしています。

状態がDataの場合はtransformAndWrite(...)が呼ばれています。

    private suspend fun handleUpdate(update: Message.Update<T>) {
        ...
        update.ack.completeWith(
            runCatching {

                when (val currentState = downstreamFlow.value) {
                    is Data -> {
                        ...
                        transformAndWrite(update.transform, update.callerContext)
                    }
                    is ReadException, is UnInitialized -> {
                        if (currentState === update.lastState) {
                            ...
                            readAndInitOrPropagateAndThrowFailure()

                            ...
                            transformAndWrite(update.transform, update.callerContext)
                        } else {
                            ...
                            throw (currentState as ReadException).readException
                        }
                    }

                    is Final -> throw currentState.finalException // won't happen
                }
            }
        )
    }

SingleProcessDataStore.kt - Android Code Searchから抜粋

transformAndWrite(...)では現在のデータと新しいデータの比較がされています。同じデータだった場合は現在のデータcurDataを返しますが、異なるデータだった場合はwriteData(...)が呼ばれています。

    private suspend fun transformAndWrite(
        transform: suspend (t: T) -> T,
        callerContext: CoroutineContext
    ): T {
        // value is not null or an exception because we must have the value set by now so this cast
        // is safe.
        val curDataAndHash = downstreamFlow.value as Data<T>
        curDataAndHash.checkHashCode()

        val curData = curDataAndHash.value
        val newData = withContext(callerContext) { transform(curData) }

        // Check that curData has not changed...
        curDataAndHash.checkHashCode()

        return if (curData == newData) {
            curData
        } else {
            writeData(newData)
            downstreamFlow.value = Data(newData, newData.hashCode())
            newData
        }
    }

SingleProcessDataStore.kt - Android Code Searchから抜粋

writeData(...)では新しいデータをファイルに書き込んでいます。

まずfile.createParentDirectories()が呼ばれて、ファイルの親ディレクトリが作られます。

その後はFileOutputStreamを使用してファイルの書き込みがされています。.use { ...の中ではserializer.writeTo(...)が呼ばれていて、serializerPreferencesSerializerなのでそこでPreferences用のファイルの書き込みがされています。またこのserializer.writeTo(...)はsuspned関数になっていて、書き込み完了するまで中断しているのがわかります。

    internal suspend fun writeData(newData: T) {
        file.createParentDirectories()

        val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX)
        try {
            FileOutputStream(scratchFile).use { stream ->
                serializer.writeTo(newData, UncloseableOutputStream(stream))
                stream.fd.sync()
                // TODO(b/151635324): fsync the directory, otherwise a badly timed crash could
                //  result in reverting to a previous state.
            }

            if (!scratchFile.renameTo(file)) {
                throw IOException(
                    ...
                )
            }
        } catch (ex: IOException) {
            if (scratchFile.exists()) {
                scratchFile.delete() // Swallow failure to delete
            }
            throw ex
        }
    }

SingleProcessDataStore.kt - Android Code Searchから抜粋

書き込みの処理が終わってtransformAndWrite()に処理が戻るとdownstreamの状態を新しいデータで更新しています。

handleUpdateの処理に戻って、downstreamの状態がReadExceptionUnInitializedの時の処理を見ると、まずreadAndInitOrPropagateAndThrowFailure()が呼ばれています。

downstreamの状態がReadExceptionUnInitializedだとDataStoreの初期化ができていないので、readAndInitOrPropagateAndThrowFailure()ではreadAndInit()が呼ばれています。そこで初期が完了するとtransformAndWrite(...)が呼ばれて新しいデータの書き込みがされます。

    private suspend fun readAndInitOrPropagateAndThrowFailure() {
        try {
            readAndInit()
        } catch (throwable: Throwable) {
            downstreamFlow.value = ReadException(throwable)
            throw throwable
        }
    }

handleUpdateの処理に戻って、downstreamの状態がFinalの時の処理を見ると、例外が投げられています。

これで値を更新する一連の処理が完了しました。

Preferences Storeをプロダクトへ導入する

導入自体はとっても簡単です。

gradleに依存関係を追加して、DataStoreインスタンスを作って使い始めることができます。

    dependencies {
        implementation("androidx.datastore:datastore-preferences:1.0.0")
    }
private val Context.dataStore by preferencesDataStore(
    name = USER_PREFERENCES_NAME,
)

アプリ アーキテクチャとの関係

最近のアプリではアプリ アーキテクチャ ガイドを手本にしたアーキテクチャが多いと思います。またガイドの中でも述べられているようにDataStoreはアーキテクチャにおけるDataSourceの役割を果たします。

https://developer.android.com/topic/architecture/data-layer?hl=ja#architecture

したがって、DataStoreへのアクセスは、UIレイヤーから直接アクセスするのではなく、Repositoryを経由してアクセスするパターンが多くなると思います。

DataStoreのインスタンスの管理

プロダクトのアーキテクチャに合わせてDataStoreインスタンスを持ち方を考える必要があります。

サンプルコードではRepositoryのコンストラクタにDataStoreを渡していますが、注意するべき点があります。

  • ファイル( xxx.preferences_pb のようなデータを書き込む)に対して、DataStoreのインスタンスが1つだけになっていること(PreferenceDataStoreFactory#createのコメントにもあり)

これは1つのファイルを複数から操作するとデータの整合性が取れない場合があるからだと思われます。

class UserPreferencesRepository(
    private val dataStore: DataStore<Preferences>
) {

1つのファイルに対してDataStoreのインスタンスも1つにする方法としては、HiltでDataStoreインスタンスをDIする方法が紹介されています。

https://medium.com/androiddevelopers/datastore-and-dependency-injection-ea32b95704e3

ただ、個人的にはRepositoryに直接DataStoreをInjectしない方が良いと考えています。理由は以下です。

  • Repositoryのテストを書くときにinstrumentationテストのセットアップが必要になる
  • DataStoreにテストデータをセットするのが手間になる

Repositoryのロジックをテストしたいだけなのに、都度DataStoreのセットアップが必要になるのは少々手間なので、RepositoryにはFakeのクラスを差し込めるようにした方が良いと思います。

そのため、DataStoreにアクセスするためのInterfaceを作り、そのinterfaceを実装しているクラスにDataStoreインスタンスを持たせ、そのクラスをシングルトンで管理する方法にすると良いと思います。

キーの管理

DataStoreとSharedPreferenceのどちらにも言えることですが、keyの定義の重複を避ける必要があります。そのため、DataStoreの導入と併せてキーの重複をチェックできる仕組みを入れておくのが良いと思います。

例えば、sealedクラスでkeyを定義しておいて、重複がないかの単体テストを追加しておく方法があります。

class UserPreferencesDataStoreImpl @Inject constructor(
    ...
) : UserPreferencesDataStore {

  sealed class UserPreferencesKeys<T>(
    val preferencesKey: Preferences.Key<T>
  ) {

    object Age : UserPreferencesKeys<Int>(intPreferencesKey("age"))
  }

ここまでできればDataStoreを使い始める準備が出来ました。

参考URL

ブログ

  1. https://android-developers.googleblog.com/2022/03/jetpack-datastore-wrap-up.html
  2. https://medium.com/androiddevelopers/introduction-to-jetpack-datastore-3dc8d74139e7
  3. https://medium.com/androiddevelopers/all-about-preferences-datastore-cc7995679334
  4. https://medium.com/androiddevelopers/all-about-proto-datastore-1b1af6cd2879
  5. https://medium.com/androiddevelopers/datastore-and-dependency-injection-ea32b95704e3
  6. https://medium.com/androiddevelopers/datastore-and-kotlin-serialization-8b25bf0be66c
  7. https://medium.com/androiddevelopers/datastore-and-synchronous-work-576f3869ec4c
  8. https://medium.com/androiddevelopers/datastore-and-data-migration-fdca806eb1aa
  9. https://medium.com/androiddevelopers/datastore-and-testing-edf7ae8df3d8

CodeLab

  1. https://developer.android.com/codelabs/android-preferences-datastore#0

ドキュメント

  1. https://developer.android.com/guide/topics/data?hl=ja
  2. https://developer.android.com/training/data-storage/shared-preferences