TensorFlow(PyTorch)

TA21 E121 リカレントニューラルネットワーク

この記事は約14分で読めます。

機械学習の覚書です。(ほとんどが生成AIによるものです)

RNN(リカレントニューラルネットワーク)

以下は、入力文字を予測するモデルです、

「りんご」の文字で「り」を入力して「ん」を予測したところ

リカレントニューラルネットワーク(RNN)は、時系列データシーケンシャルデータ(文章、音声など、順序を持つデータ)を処理するために設計されたニューラルネットワークの一種です。
従来のニューラルネットワークは、各入力データが独立していることを前提としていましたが、文章や音声など、前のデータと次のデータが関連しているようなデータに対しては、その関係性を十分に捉えきることができませんでした。

RNNは、過去の情報を記憶し、それを現在の処理に活かすことができるという特徴を持っています。これにより、文章の文脈を理解したり、音声の連続性を把握したりすることが可能になります。

RNNの仕組み

RNNは、ニューロンが過去の出力に接続されているという特徴を持っています。この自己回帰的な構造が、RNNを他のニューラルネットワークと区別する特徴です。

時系列処理: RNNは、入力データを一つずつ処理し、隠れ状態を更新していきます。これにより、過去の情報が現在の出力に影響を与える仕組みになっています。

隠れ状態: RNNは、過去の情報を蓄える「隠れ状態」を持ちます。隠れ状態は、現在の入力と過去の隠れ状態から計算され、次の隠れ状態と出力に影響を与えます。

Gemini生成

モデル学習・保存部分 (train_model.py)

tensorflowを用いて、RNNを作成します。
学習によってモデルを作成して、保存します。
作成物: char_prediction_model.h5 、 char_mapping.txt

みかん
かぶ
ごぼう
れんこん
やまのいも
にんじん
りんご
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense
from tensorflow.keras.utils import to_categorical

# データの準備
def load_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        sequences = [line.strip() for line in f if line.strip()]
    return sequences

# シーケンスデータの作成
def create_training_data(sequences, char_to_idx):
    X, y = [], []
    for seq in sequences:
        for i in range(len(seq) - 1):
            X.append([char_to_idx[char] for char in seq[i:i+2]])  # 2文字ずつ
            y.append(char_to_idx[seq[i+2]] if i + 2 < len(seq) else char_to_idx[seq[0]])  # 次の文字 or 循環

        # 1文字で次を予測するケース
        for i in range(len(seq)):
            X.append([char_to_idx[seq[i]], char_to_idx[seq[i]]])  # 同じ文字を2回
            y.append(char_to_idx[seq[(i + 1) % len(seq)]])
    return np.array(X), np.array(y)

# モデルの構築
def build_model(num_classes):
    model = Sequential([
        SimpleRNN(10, input_shape=(2, num_classes), activation='tanh'),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

if __name__ == "__main__":
    # 学習データの読み込み
    file_path = 'data.txt'
    sequences = load_data(file_path)

    # ユニークな文字を取得
    unique_chars = sorted(set(''.join(sequences)))
    char_to_idx = {char: idx for idx, char in enumerate(unique_chars)}
    idx_to_char = {idx: char for char, idx in char_to_idx.items()}

    # データ作成
    X, y = create_training_data(sequences, char_to_idx)
    X = to_categorical(X, num_classes=len(char_to_idx))
    y = to_categorical(y, num_classes=len(char_to_idx))

    # モデルの作成と学習
    model = build_model(len(char_to_idx))
    model.fit(X, y, epochs=500, verbose=1)

    # モデル保存
    model.save('char_prediction_model.h5')
    with open('char_mapping.txt', 'w', encoding='utf-8') as f:
        f.write('\n'.join(unique_chars))

モデル実行・追加学習部分 (run_model.py)

import numpy as np
from keras.models import load_model, Sequential
from keras.layers import SimpleRNN, Dense
from keras.utils import to_categorical
import tkinter as tk

# マッピングの読み込み
def load_mappings(mapping_file):
    with open(mapping_file, 'r', encoding='utf-8') as f:
        unique_chars = [line.strip() for line in f]
    char_to_idx = {char: idx for idx, char in enumerate(unique_chars)}
    idx_to_char = {idx: char for char, idx in char_to_idx.items()}
    return unique_chars, char_to_idx, idx_to_char

# マッピングの更新
def update_mappings(new_chars, mapping_file):
    with open(mapping_file, 'a', encoding='utf-8') as f:
        for char in new_chars:
            f.write(f"{char}\n")

# モデルの再構築
def rebuild_model(model, num_classes, added_classes):
    rnn_weights = model.layers[0].get_weights()
    dense_weights = model.layers[1].get_weights()

    # RNN層の重みを拡張
    rnn_weights[0] = np.pad(rnn_weights[0], ((0, added_classes), (0, 0)), mode='constant')
    rnn_weights[1] = np.pad(rnn_weights[1], ((0, 0), (0, added_classes)), mode='constant')
    rnn_weights[2] = np.pad(rnn_weights[2], (0, added_classes), mode='constant')

    # Dense層の重みを拡張
    dense_weights[0] = np.pad(dense_weights[0], ((0, 0), (0, added_classes)), mode='constant')
    dense_weights[1] = np.pad(dense_weights[1], (0, added_classes), mode='constant')

    # 新しいモデルを構築
    new_model = Sequential([
        SimpleRNN(10, input_shape=(2, num_classes), activation='tanh'),
        Dense(num_classes, activation='softmax')
    ])
    new_model.layers[0].set_weights(rnn_weights)
    new_model.layers[1].set_weights(dense_weights)
    new_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return new_model

# データセットの作成
def create_additional_data(sequences, char_to_idx):
    X, y = [], []
    for sequence in sequences:
        for i in range(len(sequence) - 2):
            X.append([char_to_idx[sequence[i]], char_to_idx[sequence[i + 1]]])
            y.append(char_to_idx[sequence[i + 2]])
    return np.array(X), np.array(y)

# 文字予測
def predict_next_char(model, input_seq, char_to_idx, idx_to_char):
    if any(char not in char_to_idx for char in input_seq):
        return "未対応の文字が含まれています"
    if len(input_seq) < 2:
        input_seq = input_seq * 2
    input_encoded = [char_to_idx[char] for char in input_seq[-2:]]
    input_encoded = to_categorical(input_encoded, num_classes=len(char_to_idx))
    input_encoded = input_encoded[np.newaxis, ...]
    prediction = model.predict(input_encoded, verbose=0)
    predicted_idx = np.argmax(prediction)
    return idx_to_char[predicted_idx]

# モデルとマッピングファイルの読み込み
model = load_model('char_prediction_model.h5')
unique_chars, char_to_idx, idx_to_char = load_mappings('char_mapping.txt')

# Tkinter GUI
def on_key_release(event=None):
    input_seq = input_field.get()
    if input_seq:
        prediction = predict_next_char(model, input_seq, char_to_idx, idx_to_char)
        result_label.config(text=f"次の文字予測: {prediction}")
    else:
        result_label.config(text="次の文字予測: ")

def on_add_training_data():
    global model, char_to_idx, idx_to_char
    new_sequence = input_field.get()
    if len(new_sequence) < 2:
        result_label.config(text="2文字以上のシーケンスを入力してください")
        return

    # 入力データを `data.txt` に追記
    with open('data.txt', 'a', encoding='utf-8') as f:
        f.write(new_sequence + '\n')

    # 新しい文字を検出
    new_chars = [char for char in new_sequence if char not in char_to_idx]
    if new_chars:
        result_label.config(text=f"新しい文字を検出: {', '.join(new_chars)}")
        update_mappings(new_chars, 'char_mapping.txt')
        unique_chars, char_to_idx, idx_to_char = load_mappings('char_mapping.txt')
        model = rebuild_model(model, len(char_to_idx), len(new_chars))

    # 追加学習
    X_new, y_new = create_additional_data([new_sequence], char_to_idx)
    X_new = to_categorical(X_new, num_classes=len(char_to_idx))
    y_new = to_categorical(y_new, num_classes=len(char_to_idx))
    model.fit(X_new, y_new, epochs=50, verbose=1)

    # モデルとマッピングの保存
    model.save('char_prediction_model_updated.h5')
    with open('char_mapping.txt', 'w', encoding='utf-8') as f:
        for char in char_to_idx.keys():
            f.write(f"{char}\n")

    result_label.config(text="データ追記と追加学習が完了しました!")

def show_current_files():
    result_label.config(text="現在のモデル: char_prediction_model.h5\nマッピングファイル: char_mapping.txt")

root = tk.Tk()
root.title("文字予測モデル")

input_field = tk.Entry(root, width=30)
input_field.grid(row=0, column=0, padx=10, pady=10)
input_field.bind('<KeyRelease>', on_key_release)  # 入力フィールドに文字が入力されるたびに呼び出し

train_button = tk.Button(root, text="追加学習", command=on_add_training_data)
train_button.grid(row=1, column=0, padx=10, pady=10)

show_button = tk.Button(root, text="現在のファイル", command=show_current_files)
show_button.grid(row=1, column=1, padx=10, pady=10)

result_label = tk.Label(root, text="")
result_label.grid(row=2, column=0, columnspan=2, padx=10, pady=10)

root.mainloop()

githubデータ

GitHub - WOCae/E121
Contribute to WOCae/E121 development by creating an account on GitHub.

コメント

Translate »
タイトルとURLをコピーしました