カルマンフィルタについて

これなーに

カルマンフィルタのメモだよ。
参考のコードを元に、細かめにメモを取っていくよ。

参考

qiita.com

カルマンフィルタとは

カルマンフィルタは、第1回 カルマンフィルタとは | 地層科学研究所の言葉を借りると、以下。

状態空間モデルと呼ばれる数理モデルにおいて、内部の見えない「状態」を効率的に推定するための計算手法です

観測データから内部の状態を推論し、観測できていないところを推論するようなことができるね。
これが役立つ場面は、例えば動画の物体検出。
動画の物体検出は、動画からパラパラ漫画のように画像フレームを取得して、各フレームで物体検出している。
この時、一瞬ブレてしまって画像がノイズ混じりになった結果、正しく推論できなかったフレームがあった時に、そのフレームの状態を推論して復元できる。
すなわち、物体検出の結果のスムーシングなどに使えるね。

カルマンフィルタの活用は、物体追跡(object tracking)でデファクトっぽくなっていて、以下のように組み込まれて利用されているよ。

これらは、物体検出結果とカルマンフィルタでの推論を組み合わせて同一物体かを判定することに利用していたりするよ。

コード

カルマンフィルタのコードは以下。ほぼ参考にさせてもらった記事のまま。
xとyの二次元座標上を動いていることを想定していて、0.1秒ごとに観測点のデータを得たと想定しているよ。
これをパーツごとにメモしていくよ。

# -*- coding: utf-8 -*-
import numpy as np


def kalman_filter(state, covariance, measurements, transition, external_factor, observation, noise, identity):
    for n in range(len(measurements)):

        # 予測(状態と共分散を更新する。)
        state = transition @ state + external_factor  # 行列の積として掛け算する。
        covariance = transition @ covariance @ transition.T

        # 計測更新
        # ループの中で対象となる観測された値
        measured_value = np.array([measurements[n]])
        # 観測値から、予測した状態を引いて、観測の残差を計算
        observe_difference = measured_value.T - observation @ state
        # 観測対象の共分散を計算(抽出)
        observed_error_covariance = observation @ covariance @ observation.T + noise
        # カルマンゲインの計算
        kalman_gain = covariance @ observation.T @ np.linalg.inv(difference_covariance)
        state = state + kalman_gain @ observe_difference
        covariance = (identity - kalman_gain @ observation) @ covariance

    state = state.tolist()
    covariance = covariance.tolist()
    return state, covariance


def main():
    # 1. 設定
    # x, yの観測値
    measurements = [[7., 15.], [8., 14.], [9., 13.], [10., 12.], [11., 11.], [12., 10.]]
    # x, yの初期値
    initial_xy = [6., 17.]
    # 計測間隔(0.1sごとに観測している場合)
    dt = 0.1

    # 2. カルマンフィルタで利用する変数の設定
    # x, y, x速度, y速度の初期値 (4,1)
    state = np.array([[initial_xy[0]], [initial_xy[1]], [0.], [0.]])  # 初期位置と初期速度を代入した「4次元状態」
    # 外部要素(4,1)
    external_factor = np.array([[0.], [0.], [0.], [0.]])  # 外部要素

    # 共分散行列(4, 4) 各対角要素から、x座標とy座標の分散[不確実性]は低く(=確信度が高く)、x速度とy速度の分散[不確実性]は高く(確信度が低く)している。
    covariance = np.array([[0., 0., 0., 0.], [0., 0., 0., 0.], [0., 0., 100., 0.], [0., 0., 0., 100.]])
    # 状態遷移行列(4, 4)  1行目がx座標、2行目がy座標、3行目がxの加速度、4行目がyの速度で、例えば1行目のx座標は、前のx座標の情報にdt分の速度の影響を受ける。
    # 3行目はxの速度で、速度は等速と仮定しているため、前の速度と等しいようにしている。
    transition = np.array([[1., 0., dt, 0.], [0., 1., 0., dt], [0., 0., 1., 0.], [0., 0., 0., 1.]])
    # 観測行列(2, 4) 観測する対象を指定している意味合い。  1行目はx座標で、1, 0, 0, 0によって、x座標はx座標のみを観測対象とすることを意味する。
    observation = np.array([[1., 0., 0, 0], [0., 1., 0., 0.]])
    # ノイズ(2, 2)
    noise = np.array([[0.1, 0], [0, 0.1]])
    # 単位行列(4, 4)
    identity = np.identity((len(state)))

    state, covariance = kalman_filter(
        state,
        covariance,
        measurements,
        transition,
        external_factor,
        observation,
        noise,
        identity
    )

    print("6回の計測後の位置と速度の予測値:", state)  # stateは、x座標, y座標, x速度, y速度


if __name__ == '__main__':
    main()

コードメモ

1. 設定

    # 1. 設定
    # x, yの観測値
    measurements = [[7., 15.], [8., 14.], [9., 13.], [10., 12.], [11., 11.], [12., 10.]]
    # x, yの初期値
    initial_xy = [6., 17.]
    # 計測間隔(0.1sごとに観測している場合)
    dt = 0.1

ここはシンプル。
measurementsは、観測間隔ごとの[x, y]座標を6回観測したリスト。
initial_xyは、観測開始前の[x, y]座標。
dtは、観測感覚の秒数。(動画なら1/FPSになるかな)

2. カルマンフィルタで利用する変数設定

見やすくするため、コメントを削除してます。

    # 2. カルマンフィルタで利用する変数の設定
    state = np.array([[initial_xy[0]], [initial_xy[1]], [0.], [0.]])    # 初期状態
    external_factor = np.array([[0.], [0.], [0.], [0.]])    # 外部要素
    covariance = np.array([[0., 0., 0., 0.], [0., 0., 0., 0.], [0., 0., 100., 0.], [0., 0., 0., 100.]])  # 共分散行列
    transition = np.array([[1., 0., dt, 0.], [0., 1., 0., dt], [0., 0., 1., 0.], [0., 0., 0., 1.]])  # 状態遷移行列
    observation = np.array([[1., 0., 0, 0], [0., 1., 0., 0.]])  # 観測行列
    noise = np.array([[0.1, 0], [0, 0.1]])  # ノイズ
    identity = np.identity((len(state)))  # 単位行列

state

推論対象の状態。すなわち、x座標・y座標・x速度・y速度。
これがカルマンフィルタで推定する対象となり、stateは内部の処理で変化していく。
初期状態は、initial_xyからx座標とy座標をセットして、速度はどっちに向かっているかは不明なので0をセットしている。
 {(6.0, 17.0, 0.0, 0.0)}^\top

external_factor

外部要素。
今回は外部要素はなしとしているが、例えば何らかの力が一定かかり続ける場合などに、それを考慮した値をセットすることになると思う。
 {(0.0, 0.0, 0.0, 0.0)}^\top

covariance

共分散行列。対角要素以外は、x座標・y座標・x速度・y速度の共分散(関連性)で、対角要素は各分散(不確実性)を表す。
初期状態では各要素の関連性は不明なので、対角要素以外は0。
対角要素は、xとy座標は既知なので、確信度が高くて0(分散0)をセットし、x, y速度は不明なので確信度が低くて100(分散100)を置いている。
covarianceも、カルマンフィルタの処理の中で更新されていく。

 \begin{pmatrix}
0.0 & 0.0 & 0.0 & 0.0 \\
0.0 & 0.0 & 0.0 & 0.0 \\
0.0 & 0.0 & 100.0 & 0.0 \\
0.0 & 0.0 & 0.0 & 100.0 \\
\end{pmatrix}

transition

状態遷移行列。更新しない固定値。
各行がx座標・y座標・x速度・y速度を意味しており、例えば1行目は、x座標は(前の)x座標(1.0)とx速度(0.1)から影響を受けると定義している。
y座標も同様に(前の)y座標とy速度から影響を受け、x,y速度は、x,y速度自身からしか変化の影響を受けないように設定している。

 \begin{pmatrix}
1.0 & 0.0 & 0.1 & 0.0 \\
0.0 & 1.0 & 0.0 & 0.1 \\
0.0 & 0.0 & 1.0 & 0.0 \\
0.0 & 0.0 & 0.0 & 1.0 \\
\end{pmatrix}

observation

観測行列。更新しない固定値。
x座標とy座標を意味していて、x座標はx座標のみを観測対象とし、y座標はy座標のみを観測対象とすることを指定している。

 \begin{pmatrix}
1.0 & 0.0 & 0.0 & 0.0 \\
0.0 & 1.0 & 0.0 & 0.0 
\end{pmatrix}

noise

ノイズ行列。推定するにあたってどれくらいのブレが発生するかの度合いを示す。

 \begin{pmatrix}
0.1 & 0.0 \\
0.0 & 0.1
\end{pmatrix}

identity

単位行列。共分散行列の更新時に利用する。

 \begin{pmatrix}
1.0 & 0.0 & 0.0 & 0.0 \\
0.0 & 1.0 & 0.0 & 0.0 \\
0.0 & 0.0 & 1.0 & 0.0 \\
0.0 & 0.0 & 0.0 & 1.0 \\
\end{pmatrix}

簡単にまとめると、

変数 変数/定数   次元   説明
state 変数 (4, 1) x座標, y座標, x速度, y速度の推定したい対象の状態の列ベクトル。
external_factor 定数 (4, 1) 外部からの影響の列ベクトル。今回は利用無し。
covariance 変数 (4, 4) x座標, y座標, x速度, y速度の共分散行列。対角要素は不確定度で、非対角要素は正なら片方が増えるともう片方も増える、負なら逆という関係。
transition 定数 (4, 4) 予測ステップでのみ利用。x座標, y座標, x速度, y速度がそれぞれどの要素から影響を受けるかを定義。
observation 定数 (2, 4) 状態の更新ステップでx座標, y座標がどの要素を観測対象にするかを定義。
noise 定数 (2, 2) x座標, y座標がどの程度の不確実さを持つかを定義。
identity 定数 (4, 4) 状態の更新ステップで共分散を更新する際に利用。

3. カルマンフィルタ処理

カルマンフィルタは、1)各観測点に対し、2)予測ステップと3)更新ステップを実施する。
各ステップごとに説明。

3-1: 各観測点でのループ

def kalman_filter(state, covariance, measurements, transition, external_factor, observation, noise, identity):
    for n in range(len(measurements)):

measurementsに観測データ(x,y座標のリスト)が入っており、これをforループで繰り返す。

3-2: 予測ステップ

        state = transition @ state + external_factor  # 行列の積として掛け算する。
        covariance = transition @ covariance @ transition.T

状態予測

state = transition @ state + external_factor

状態遷移行列と状態の列ベクトル(と外部要素)を用いて、次のステップの状態を予測する。
初回処理では、初期状態と定義された遷移の方法で、次の状態を予測できる。
今回の場合は、x,y座標は前回のx,y座標の値とx,yの速度で更新するが、速度は初期値で0なのでそのまま。

共分散予測

covariance = transition @ covariance @ transition.T

状態遷移行列と共分散行列を用いて、共分散を予測する。
初回処理では、仮定した共分散と定義された遷移の方法で、共分散を予測できる。(何がどうなるとどういう状態になるか?が分かるため、共分散も計算できる)
ちなみにA @ B @ A^Tの形は、BをAの空間に写像して元に戻すことをしているイメージになる。

この二つで、状態と共分散をまず予測する。

3-3. 更新ステップ

        measured_value = np.array([measurements[n]])
        observe_difference = measured_value.T - observation @ state
        difference_covariance = observation @ covariance @ observation.T + noise
        kalman_gain = covariance @ observation.T @ np.linalg.inv(difference_covariance)
        state = state + kalman_gain @ observe_difference
        covariance = (identity - kalman_gain @ observation) @ covariance

観測値の取得

measured_value = np.array([measurements[n]])

実際の観測されたデータを取得する。

誤差の計算

observe_difference = measured_value.T - observation @ state

実際の観測値と、予測した状態で、誤差を計算する。
observationはマスクのような役割で、観測値から内部の状態を更新する時に利用する対象(x座標とy座標)をstateから取得するために利用される。
この場合は、stateはx座標, y座標, x速度, y速度を持っているが、observationと行列積を取ることで、x座標とy座標だけが取得でき、
実際に観測したx座標とy座標から、予測したx座標とy座標を引き算して、その誤差計算ができる。

観測対象の共分散の計算

observed_error_covariance = observation @ covariance @ observation.T + noise

観測対象(x座標とy座標)の予測した共分散を取得(抽出)し、そこにノイズを加える。
抽出と表現しているのは、以下のように、共分散行列の左上のみを取得するような処理となるため。

 
\begin{pmatrix}
1.0 & 0.0 & 0.0 & 0.0 \\
0.0 & 1.0 & 0.0 & 0.0 \\
\end{pmatrix}

@

\begin{pmatrix}
0.36363636 & 0.0 & 1.81818182 & 0.0 \\
0.0 & 0.36363636 & 0.0 & 1.81818182 \\
1.81818182 & 0.0 & 9.09090909 & 0.0 \\
0.0 & 1.81818182 & 0.0 & 9.09090909 \\
\end{pmatrix}

@

\begin{pmatrix}
1.0 & 0.0 \\
0.0 & 1.0 \\
0.0 & 0.0 \\
0.0 & 0.0 \\
\end{pmatrix}

=

\begin{pmatrix}
0.36363636 & 0.0 \\
0.0 & 0.36363636 \\
\end{pmatrix}

カルマンゲインの計算

kalman_gain = covariance @ observation.T @ np.linalg.inv(observed_error_covariance)

カルマンゲインは、状態の更新時に、観測誤差や共分散をどれくらい反映させるかの重みの役割を持つ。
covariance @ observation.Tは、共分散行列の観測対象箇所を取得している。
これに、observed_error_covariance(観測対象の共分散)の逆数で行列積をとるのは、
観測対象の共分散が大きい(誤差が大きくなる)ほど予測結果への反映を小さくし、逆なら大きく反映させたいというイメージだと思う。
これにより、推定した状態と共分散を更新していく。

状態の更新

state = state + kalman_gain @ observe_difference

予測した状態(x座標とy座標)に、実際の観測値と予測値の誤差(observe_difference)にカルマンゲインとして更新すべき重みをかけて更新する。
kalman_gainが大きいと不確実性が低いので、observe_differenceを強く反映して更新する。
逆にkalman_gainが小さいと不確実性が高いので、observe_differenceを弱く反映して更新する。

共分散の更新

covariance = (identity - kalman_gain @ observation) @ covariance

予測した共分散を、どれくらいの確信度で更新するかを単位行列から引くことで確定して共分散を更新する。
covarianceは、状態の推定をするためのkalman_gainで利用される。
よって、次の処理でどれくらいのcovarianceの影響度を残すかを(identity - kalman_gain @ observation)で計算している感じだと思われる。
あんまり最後の部分自身ない・・・。。

ループ

3-3の最後で更新した状態と共分散を用いて、再度3-2: 予測ステップと3-3. 更新ステップを繰り返すことで、予測と観測からの更新で内部状態を更新していく。
これにより、未知の値がある場合にそれをそれまでの過去の観測から推論することができる。

以上で、終わり。

ちなみに、これは観測が途切れてからの次の推論だけでなく、それを重ねてさらに未来の状態も予測できる。
未来を推定するコードは以下にあります。
github.com