Pytorchでtransformer(その2 ネットワーク作成)

書いてる理由

  • NLPやるぞー
  • レビューがポジティブかネガティブかを判断する
  • transformerのネットワークを組んで、classificationするモデル定義をする

参考

pytorchによる発展ディープラーニング
Attention is All You Need

概要

raishi12.hatenablog.com 前回、上の記事でIMBdのデータをfasttextを使ってベクトル表現するデータローダーを作った。
今回はこのデータを使って特徴抽出〜ポジネガのclassificationをするためのネットワークを作っていく。
Transformerは、Attention is All You Needの論文のもので、最初は翻訳タスク向けとして作られていて、英語の文章→エンコーダーデコーダー→日本語の文章とかするけど、今回は文章の特徴量を抽出したいだけなのでエンコーダーの部分だけ使う感じ。
Transformerのネットワークで文章を分類する流れは、以下の通り。

f:id:raishi12:20200329004814p:plain
transformerの流れ

文章の各単語がボキャブラリーのIDで表現されていて、その長さを統一するために前回は256単語で切る or 埋めるをやった。
これがinputになり、①Embedderでベクトル表現の300次元をくっつけ、②PositionalEncoderで単語の位置の情報を加え、③TransformerBlockで単語同士の関係性を考慮した変換をして、④ClassificationHeadで分類する。
なかなかしんどいけど、それぞれ説明する。

コード

https://github.com/y201810/pytorch_work/blob/master/nlp/sentiment_analysis/script/models.pygithub.com

詳細

1. Embedder

Embedderは、文章が単語IDで表現されているデータにword2vecなどの分散表現されているベクトルを付与する処理。
ちなみにこれはdatasetのreturn の直前にやっても良いが、DataLoader側でベクトル情報を持つとメモリ的に苦しいので毎回の処理でこれをやるためにここに置いてある感じ。
inputのidが羅列されたデータに対し、nn.Embedding.from_pretrainedで今回利用するembedding用のvectorを指定して、それに通すだけでOK。
forwardのxのテンソルのサイズは、バッチサイズが24なので、[24, 256] (バッチサイズ, 単語IDの数)となっていて、これを1単語が300次元で表現されているベクトルをくっつけるのでx_vecは[24, 256, 300]のサイズとなる。

class Embedder(nn.Module):
    """ IDで示される単語をベクトルに変換する"""
    def __init__(self, text_embedding_vectors):
        super(Embedder, self).__init__()

        self.embeddings = nn.Embedding.from_pretrained(embeddings=text_embedding_vectors, freeze=True)  # freeze: バックプロップでの更新なし

    def forward(self, x):
        x_vec = self.embeddings(x)
        return x_vec

2. Positional Encoder

Positional Encoderは、ベクトル表現に位置情報を付与するために利用する。PEは以下の式で算出する。

f:id:raishi12:20200329011502p:plain

画像でか!!!!まぁいいや。
posが最初から数えた時の出現順のindex番号で、powの2iが偶数の時、2i+1が奇数の時を示す。偶数の時にsinを奇数の時にcosを使って、よく分からん数字をそれぞれ突っ込む。d_modelは単語の次元数なので今回は300。
ぶっちゃけなぜこの式になっているのか不明。元論文読まないとちょっと分からなさそう。
が、まぁとにかく文章における単語のポディション情報を足す行為らしい。
なぜ位置情報を足すかというと、この後のAttentionに関係する。
Attentionとは、直訳すると注意なのだが、文章におけるAttentionとは、どの単語とどの単語が関係するか的な意味合いになる。
例えば、「俺は昨日、初めてリモートワークをした。だから通勤の必要がなくなって、時間をものすごく有効活用できた気がする。ずっとこれが続けばいいのになぁ。」
という文章があった時に、"ずっとこれが"の"これ"はリモートワークのことを指していて、前後の数単語だけではこの意味が理解できなくなってしまう。
そこで、文章の単語ごとに関係性というAttentionの情報があると、より有意味な特徴量の作成ができるはず、ということで、単語の位置情報をaddすると良いらしい。
for文でpeを作成し、forward関数でself.peを入力のxに足しあげる。xの各要素がpeに比べて小さいので、math.sqrt(300)をかけてスケールを合わせて足してreturn。

class PositionalEncoder(nn.Module):
    """ 入力された単語の位置を示すベクトル情報を不可する """
    def __init__(self, model_dim=300, max_seq_len=256):
        super().__init__()
        self.model_dim = model_dim
        pe = torch.zeros(max_seq_len, model_dim)

        device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        pe = pe.to(device)

        for pos in range(max_seq_len):
            for i in range(0, model_dim, 2):
                pe[pos, i] = math.sin(pos / (10000 ** ((2 * i) / model_dim)))
                pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1)) / model_dim)))

        self.pe = pe.unsqueeze(0)  # バッチサイズの次元を追加
        self.pe.requires_grad = False  # 勾配の計算なし

    def forward(self, x):
        result = math.sqrt(self.model_dim) * x + self.pe
        return result

3 TransformerBlock

TransformerBlockは、AttentionとAttentionで得た結果の線形変換を実施する。これは任意回数繰り返す。今回の場合は2回。

3-1 Attention

2で位置情報がaddされた特徴量を使って、特徴量 * 特徴量の転置での行列積をtorch.matmul(q, k.transpose(1, 2)) / math.sqrt(self.d_k)で計算。
次のmaskはpadingされた箇所を0にするためににマイナスの超大きな数字を入れておく。
理由は、その後にベクトルの各要素に対してsoftmaxをかけて全ての要素を0~1に抑えつつ、全部足して1になるような変換をしており、softmaxで数値がマイナスで大きい場合は出力が0となるので、マイナスの大きい値をここで代入する。
softmaxで整えられたベクトルを、元の入力と行列せきをとってそれを全結合層で変換かけてreturn。
行列と転置行列の行列積をとることで、大きい値と大きい値がかけ合わさるとそれがsoftmaxの出力でも大きな値を返すことになり、これが単語間でのAttentionの表現となっているっぽい。(理解不十分。。解析に意味のある要素は、大きな値を持っているけど、それと別の値を掛け合わせて得られたものが単語の関係性と表現していいのかなぁ。。。なんでやろ??)
まぁとりあえずこれで、単語間の関係性を考慮した特徴抽出をする。

class Attention(nn.Module):
    """ 各単語ごとの関係性を踏まえた特徴量抽出 """
    def __init__(self, model_dim=300):
        super().__init__()

        self.q_linear = nn.Linear(model_dim, model_dim)
        self.v_linear = nn.Linear(model_dim, model_dim)
        self.k_linear = nn.Linear(model_dim, model_dim)

        self.out = nn.Linear(model_dim, model_dim)
        self.d_k = model_dim

    def forward(self, q, k, v, mask):
        k = self.k_linear(k)
        q = self.q_linear(q)
        v = self.v_linear(v)

        # Attentionの計算。 各値をそのまま足すと大きくなりすぎるので、root(model_dim)で割って調整
        weights = torch.matmul(q, k.transpose(1, 2)) / math.sqrt(self.d_k)

        mask = mask.unsqueeze(1)
        weights = weights.masked_fill(mask == 0, -1e9)  # maskが0(<pad>)の箇所にマイナス無限大をセット

        normalized_weights = F.softmax(weights, dim=-1)  # Attentionをsoftmaxで確率的な形に変換
        output = torch.matmul(normalized_weights, v)  # Attentionとvalueの掛け算
        output = self.out(output)

        return output, normalized_weights

3-2 FeedForward

Feed ForwardはAttentionで得た結果を一度次元数を大きく表現して、再度元の次元で表現するための操作。
単純に、300次元で表現されていたものを1024次元に一度膨らませ、その後1024次元を300次元に戻す。

class FeedForward(nn.Module):
    def __init__(self, model_dim, ff_dim=1024, dropout=0.1):
        super().__init__()

        self.linear_1 = nn.Linear(model_dim, ff_dim)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(ff_dim, model_dim)

    def forward(self, x):
        x = self.linear_1(x)
        x = self.dropout(x)
        x = self.linear_2(x)

        return x

3-3 TransformerBlockの構築

3-1で作ったAttentionと3-2のFeedForwardを組み合わせる。
まず、nn.LayerNormで正規化してAttentionにかけ、Attentionで得た結果を元の特徴量に足す。
足した結果を正規化して、FeedForwardに突っ込んでreturn。
dropoutも間にちょいちょい挟む。

class TransformerBlock(nn.Module):
    def __init__(self, model_dim, dropout=0.1):
        super().__init__()

        self.norm_1 = nn.LayerNorm(model_dim)
        self.norm_2 = nn.LayerNorm(model_dim)

        self.attention = Attention(model_dim)

        self.feadforward = FeedForward(model_dim)

        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        x_normalized = self.norm_1(x)
        attention_weights, normalized_weights = self.attention(x_normalized, x_normalized, x_normalized, mask)

        attention_add_weights = x + self.dropout_1(attention_weights)

        x_normalized_2 = self.norm_2(attention_add_weights)
        x_normalized_2 = self.feadforward(x_normalized_2)
        output = attention_add_weights + self.dropout_2(x_normalized_2)

        return output, normalized_weights

4 ClassificationHead

ClassificationHeadは、ここまでの特徴量を使って、ポジティブ or ネガティブの2値を出力する。
特筆すべきは、forwardでのx0 = x[:, 0, :]で、これは256文字の最初の1文字<cls>だけを分類に使うことを意味している。
256文字全ての特徴量を使って分類しても良いが、文章ごとにpaddingが多く入ってたり少なかったりするので、共通して使える特徴量ということで全ての文章に同じように入れている<cls> の部分を分類に使っている。

class ClassificationHead(nn.Module):
    """ Transformer Blockの出力を用いて、クラス分類をする"""
    def __init__(self, model_dim=300, output_dim=2):
        super().__init__()

        self.linear = nn.Linear(model_dim, output_dim)

        nn.init.normal_(self.linear.weight, std=0.02)  # 重みの初期化
        nn.init.normal_(self.linear.bias, 0)  # バイアスの初期化

    def forward(self, x):
        x0 = x[:, 0, :]  # 各ミニバッチの各文章の先頭の単語の特徴量のみを取り出す
        out = self.linear(x0)

        return out

5 全体を組み合わせる

まとめ的になるが、まず文章を単語で分割して単語をIDで持つデータに対し、Embedderで300次元のベクトル情報を付与する。
次にPositionalEncoderで単語の位置情報を謎の式の出力をaddすることで付与する。
位置情報を持った状態で、Attentionにかけることで、単語同士の関係性を考慮した特徴量抽出をする。
そこで得られた特徴量を一度大きな次元で表現して再度小さい次元に戻す。
最終的に得られた特徴量でポジティブ or ネガティブを判断できるような出力をする。

んー、位置情報を付与するところも不明だったけど、Attentionの計算がなぜあれでAttentionの役割を果たすかの根本的な理解がしたいなぁ。
次は学習かな

class TransformerClassification(nn.Module):
    def __init__(self, text_embedding_vectors, model_dim=300, max_seq_len=256, output_dim=2):
        super().__init__()

        self.net1 = Embedder(text_embedding_vectors)
        self.net2 = PositionalEncoder(model_dim=model_dim, max_seq_len=max_seq_len)
        self.net3_1 = TransformerBlock(model_dim=model_dim)
        self.net3_2 = TransformerBlock(model_dim=model_dim)
        self.net4 = ClassificationHead(output_dim=output_dim, model_dim=model_dim)

    def forward(self, x, mask):
        x1 = self.net1(x)
        x2 = self.net2(x1)
        x3_1, normalized_weights_1 = self.net3_1(x2, mask)
        x3_2, normalized_weights_2 = self.net3_2(x3_1, mask)
        x4 = self.net4(x3_2)

        return x4, normalized_weights_1, normalized_weights_2

コード全体

class Embedder(nn.Module):
    """ IDで示される単語をベクトルに変換する"""
    def __init__(self, text_embedding_vectors):
        super(Embedder, self).__init__()

        self.embeddings = nn.Embedding.from_pretrained(embeddings=text_embedding_vectors, freeze=True)  # freeze: バックプロップでの更新なし

    def forward(self, x):
        x_vec = self.embeddings(x)
        return x_vec


class PositionalEncoder(nn.Module):
    """ 入力された単語の位置を示すベクトル情報を不可する """
    def __init__(self, model_dim=300, max_seq_len=256):
        super().__init__()
        self.model_dim = model_dim
        pe = torch.zeros(max_seq_len, model_dim)

        device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        pe = pe.to(device)

        for pos in range(max_seq_len):
            for i in range(0, model_dim, 2):
                pe[pos, i] = math.sin(pos / (10000 ** ((2 * i) / model_dim)))
                pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1)) / model_dim)))

        self.pe = pe.unsqueeze(0)  # バッチサイズの次元を追加
        self.pe.requires_grad = False  # 勾配の計算なし

    def forward(self, x):
        result = math.sqrt(self.model_dim) * x + self.pe
        return result


class Attention(nn.Module):
    """ 各単語ごとの関係性を踏まえた特徴量抽出 """
    def __init__(self, model_dim=300):
        super().__init__()

        self.q_linear = nn.Linear(model_dim, model_dim)
        self.v_linear = nn.Linear(model_dim, model_dim)
        self.k_linear = nn.Linear(model_dim, model_dim)

        self.out = nn.Linear(model_dim, model_dim)
        self.d_k = model_dim

    def forward(self, q, k, v, mask):
        k = self.k_linear(k)
        q = self.q_linear(q)
        v = self.v_linear(v)

        # Attentionの計算。 各値をそのまま足すと大きくなりすぎるので、root(model_dim)で割って調整
        weights = torch.matmul(q, k.transpose(1, 2)) / math.sqrt(self.d_k)

        mask = mask.unsqueeze(1)
        weights = weights.masked_fill(mask == 0, -1e9)  # maskが0(<pad>)の箇所にマイナス無限大をセット

        normalized_weights = F.softmax(weights, dim=-1)  # Attentionをsoftmaxで確率的な形に変換
        output = torch.matmul(normalized_weights, v)  # Attentionとvalueの掛け算
        output = self.out(output)

        return output, normalized_weights


class FeedForward(nn.Module):
    def __init__(self, model_dim, ff_dim=1024, dropout=0.1):
        super().__init__()

        self.linear_1 = nn.Linear(model_dim, ff_dim)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(ff_dim, model_dim)

    def forward(self, x):
        x = self.linear_1(x)
        x = self.dropout(x)
        x = self.linear_2(x)

        return x


class TransformerBlock(nn.Module):
    def __init__(self, model_dim, dropout=0.1):
        super().__init__()

        self.norm_1 = nn.LayerNorm(model_dim)
        self.norm_2 = nn.LayerNorm(model_dim)

        self.attention = Attention(model_dim)

        self.feadforward = FeedForward(model_dim)

        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        x_normalized = self.norm_1(x)
        attention_weights, normalized_weights = self.attention(x_normalized, x_normalized, x_normalized, mask)

        attention_add_weights = x + self.dropout_1(attention_weights)

        x_normalized_2 = self.norm_2(attention_add_weights)
        x_normalized_2 = self.feadforward(x_normalized_2)
        output = attention_add_weights + self.dropout_2(x_normalized_2)

        return output, normalized_weights


class ClassificationHead(nn.Module):
    """ Transformer Blockの出力を用いて、クラス分類をする"""
    def __init__(self, model_dim=300, output_dim=2):
        super().__init__()

        self.linear = nn.Linear(model_dim, output_dim)

        nn.init.normal_(self.linear.weight, std=0.02)  # 重みの初期化
        nn.init.normal_(self.linear.bias, 0)  # バイアスの初期化

    def forward(self, x):
        x0 = x[:, 0, :]  # 各ミニバッチの各文章の先頭の単語の特徴量のみを取り出す
        out = self.linear(x0)

        return out


class TransformerClassification(nn.Module):
    def __init__(self, text_embedding_vectors, model_dim=300, max_seq_len=256, output_dim=2):
        super().__init__()

        self.net1 = Embedder(text_embedding_vectors)
        self.net2 = PositionalEncoder(model_dim=model_dim, max_seq_len=max_seq_len)
        self.net3_1 = TransformerBlock(model_dim=model_dim)
        self.net3_2 = TransformerBlock(model_dim=model_dim)
        self.net4 = ClassificationHead(output_dim=output_dim, model_dim=model_dim)

    def forward(self, x, mask):
        x1 = self.net1(x)
        x2 = self.net2(x1)
        x3_1, normalized_weights_1 = self.net3_1(x2, mask)
        x3_2, normalized_weights_2 = self.net3_2(x3_1, mask)
        x4 = self.net4(x3_2)

        return x4, normalized_weights_1, normalized_weights_2