TensorFlow(PyTorch)

TA31 強化学習 DQN(PyTorch)

この記事は約35分で読めます。

機械学習の覚書です。(ほとんどが生成AIによるものです)

Aがゴールを探して進みます

Q学習

Q学習は、強化学習と呼ばれる機械学習の一種です。

以下は作成サンプルです。PyTorchのライブラリを使用しています。(tensorFlow未使用)

学習の実施

  • 仕様
  • パラメータ
  • memory
  • model_weights.json

学習パート

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import os
from collections import deque
from math import sqrt
import json
import matplotlib.pyplot as plt

# #-------------------------------------
# フィールドサイズと匂い強度
FIELD_SIZE = 8  # フィールドのサイズ
SMELL_STRENGTH = 100  # 匂いの強度

# エージェントとゴールの初期位置
ANT_START = (0, 0)  # エージェントの初期位置
GOAL_POSITION = (FIELD_SIZE - 1, FIELD_SIZE - 4)  # ゴール位置

# #学習のモニタリング
rewards = []
# #-------------------------------------

# DQNのニューラルネットワーク定義
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)


# エージェントの定義
class Agent:
    def __init__(self, field_size, gamma=0.99, lr = 0.001, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
        # print(lr)
        self.field_size = field_size
        self.state_size = field_size ** 2
        self.action_size = 4  # 上下左右の4方向
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay

        self.memory = deque(maxlen=2000)
        self.model = DQN(self.state_size, self.action_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.criterion = nn.MSELoss()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)  # ランダムアクション
        state = torch.FloatTensor(state).unsqueeze(0)
        act_values = self.model(state)
        return torch.argmax(act_values[0]).item()  # Q値の最大を選択

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        batch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in batch:
            state = torch.FloatTensor(state)
            next_state = torch.FloatTensor(next_state)
            target = reward
            if not done:
                target += self.gamma * torch.max(self.model(next_state)).item() #Q学習の部分
            target_f = self.model(state).detach().clone()
            target_f[action] = target
            output = self.model(state)
            loss = self.criterion(output[action], target_f[action])
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay  # epsilonを減少させる

# 環境の定義
class Environment:
    def __init__(self, field_size, ant_start, goal_position, smell_strength):
        self.field_size = field_size
        self.ant_position = ant_start
        self.goal_position = goal_position
        self.smell_strength = 1/smell_strength #入力と逆にする

    def calculate_smell(self, position):
        """ゴールからの匂いの強さを計算"""
        distance = sqrt((position[0] - self.goal_position[0]) ** 2 + (position[1] - self.goal_position[1]) ** 2)
        return self.smell_strength / (1 + distance)
        # return (1 + distance)/self.smell_strength
    
    def reset(self):
        """環境をリセット"""
        self.ant_position = ANT_START
        state = np.zeros((self.field_size, self.field_size))
        state[self.ant_position] = 1  # エージェントの位置
        state[self.goal_position] = self.calculate_smell(self.goal_position)  # ゴール位置の匂い
        return state.flatten()

    def step(self, action):
        """アクションを受けて次の状態と報酬を計算"""
        x, y = self.ant_position
        if action == 0 and x > 0:  # 上
            x -= 1
        elif action == 1 and x < self.field_size - 1:  # 下
            x += 1
        elif action == 2 and y > 0:  # 左
            y -= 1
        elif action == 3 and y < self.field_size - 1:  # 右
            y += 1

        # 更新後の位置を保存
        new_position = (x, y)

        # 無効なアクションを防ぐ
        if new_position != self.ant_position:
            self.ant_position = new_position

        # 次の状態を作成
        state = np.zeros((self.field_size, self.field_size))
        state[self.ant_position] = 1
        state[self.goal_position] = self.calculate_smell(self.goal_position)

        # 報酬と終了判定
        reward = 1 if self.ant_position == self.goal_position else -0.1
        # ゴールに近づいた場合、追加の報酬
        if self.ant_position != self.goal_position:
            prev_distance = sqrt((x - self.goal_position[0])**2 + (y - self.goal_position[1])**2)
            new_distance = sqrt((self.ant_position[0] - self.goal_position[0])**2 + (self.ant_position[1] - self.goal_position[1])**2)
            reward += 0.1 if new_distance < prev_distance else -0.1
        done = self.ant_position == self.goal_position
        return state.flatten(), reward, done


# アスキー図を生成する関数
def render_ascii(env):
    """現在の環境状態をアスキー形式で表現(匂いの強さも表示)"""
    grid = [["-"] * env.field_size for _ in range(env.field_size)]
    ant_x, ant_y = env.ant_position
    goal_x, goal_y = env.goal_position
    grid[ant_x][ant_y] = "A"  # エージェントの位置
    grid[goal_x][goal_y] = "G"  # ゴール位置

    # 全セルの匂いを計算
    smell_map = np.zeros((env.field_size, env.field_size))
    for x in range(env.field_size):
        for y in range(env.field_size):
            smell_map[x, y] = env.calculate_smell((x, y))

    # ゴール以外の最大値と最小値を計算
    non_goal_smells = [
        smell_map[x, y]
        for x in range(env.field_size)
        for y in range(env.field_size)
        if (x, y) != (goal_x, goal_y)
    ]
    max_smell = max(non_goal_smells)
    min_smell = min(non_goal_smells)

    # アスキー文字の閾値を設定
    thresholds = {
        "@": max_smell * 0.8,
        "#": max_smell * 0.6,
        "+": max_smell * 0.4,
        ".": max_smell * 0.2,
        "-": min_smell,
    }

    # 匂いに基づいて文字を割り当て
    for x in range(env.field_size):
        for y in range(env.field_size):
            if (x, y) == (ant_x, ant_y) or (x, y) == (goal_x, goal_y):
                continue
            smell = smell_map[x, y]
            if smell >= thresholds["@"]:
                grid[x][y] = "@"
            elif smell >= thresholds["#"]:
                grid[x][y] = "#"
            elif smell >= thresholds["+"]:
                grid[x][y] = "+"
            elif smell >= thresholds["."]:
                grid[x][y] = "."
            else:
                grid[x][y] = "-"

    # フィールド全体を文字列として返す
    return "\n".join([" ".join(row) for row in grid])

# def log_path(env, episode, step, log_path="path_log.txt"):
#     """エージェントの移動軌跡をログとして保存"""
#     with open(log_path, "a") as f:
#         ant_x, ant_y = env.ant_position
#         goal_x, goal_y = env.goal_position
#         f.write(f"Episode {episode}, Step {step}: Ant at ({ant_x}, {ant_y}), Goal at ({goal_x}, {goal_y})\n")

def log_smell_correlation(env, action, log_file="smell_correlation.txt"):
    """アクション選択時の匂い強度をログ"""
    current_smell = env.calculate_smell(env.ant_position)
    next_position = env.ant_position
    if action == 0 and next_position[0] > 0:  # 上
        next_position = (next_position[0] - 1, next_position[1])
    elif action == 1 and next_position[0] < env.field_size - 1:  # 下
        next_position = (next_position[0] + 1, next_position[1])
    elif action == 2 and next_position[1] > 0:  # 左
        next_position = (next_position[0], next_position[1] - 1)
    elif action == 3 and next_position[1] < env.field_size - 1:  # 右
        next_position = (next_position[0], next_position[1] + 1)
    next_smell = env.calculate_smell(next_position)
    with open(log_file, "a") as f:
        f.write(f"Current smell: {current_smell}, Next smell: {next_smell}, Action: {action}\n")

# 学習ループ
def train_agent(episodes, batch_size, save_path="asciiFig.txt", model_save_path="model_weights.json", summary_path="episode_summary.txt"):
    # 環境とエージェントの初期化
    env = Environment(FIELD_SIZE, ANT_START, GOAL_POSITION, SMELL_STRENGTH)
    agent = Agent(FIELD_SIZE)

    # 既存のモデルがあれば読み込む
    if os.path.exists(model_save_path):
        print(f"Loading existing model from {model_save_path}...")
        
        # 学習前のモデルを保存
        temp_model = DQN(agent.state_size, agent.action_size)  # 修正: agent の属性を参照
        load_model_from_json(temp_model, model_save_path)
        
        # temp_model の最初の層の重みをプリント ロードチェック用
        # print("Weights of the model before training (fc1):")
        # print(temp_model.fc1.weight.data)

        # 現在のモデルにロード
        load_model_from_json(agent.model, model_save_path)

        agent.epsilon = load_model_from_json(agent.model, model_save_path)
        load_memory(agent, "memory.json")

    else:
        print("No existing model found. Starting with a new model.")

    # アスキー図の保存用ファイルを開く
    with open(save_path, "w") as ascii_file, open(summary_path, "w") as summary_file:
        summary_file.write("Episode, Total Reward, Steps\n")  # サマリーのヘッダー

        for episode in range(1, episodes + 1):
            state = env.reset()
            done = False
            total_reward = 0
            step_count = 0

            ascii_file.write(f"Episode {episode}, Step 1\n")
            ascii_file.write(render_ascii(env) + "\n\n")  # 初期状態の出力

            agent_path = []  # エージェントの経路を記録
            smell_correlation_data = []  # 匂い強度と位置の関連データを記録

            while not done:
                # アクションを選択
                action = agent.act(state)
                
                # 環境を更新
                next_state, reward, done = env.step(action)
                total_reward += reward
                step_count += 1

                # 経験を記憶し、学習
                agent.remember(state, action, reward, next_state, done)
                agent.replay(batch_size)
                state = next_state

                # ステップごとのアスキー図を保存(終了状態まで)
                if not done:
                    ascii_file.write(f"Episode {episode}, Step {step_count + 1}\n")
                    ascii_file.write(render_ascii(env) + "\n\n")

                agent_path.append(env.ant_position)  # エージェントの現在位置を経路に追加
                smell_correlation_data.append((env.ant_position, env.calculate_smell(env.ant_position)))  # 匂いデータを記録


            # エピソード終了時の情報を記録
            ascii_file.write(f"Episode {episode} finished after {step_count} steps\n\n")
            summary_file.write(f"{episode}, {total_reward}, {step_count}\n")  # サマリーに書き出し

            print(f"Episode {episode}: Total reward: {total_reward}, Steps: {step_count}")

            # モデルをエピソードごとに保存
            torch.save(agent.model.state_dict(), model_save_path)

            # 学習ループ内でモデル保存
            save_model_to_json(agent.model, model_save_path, agent.epsilon)
            save_memory(agent, "memory.json")
        
            rewards.append(total_reward)  # 各エピソードの報酬を記録

            # エピソード終了時にデータをログ
            # log_path(env, episode, step_count)
            # log_smell_correlation(episode, smell_correlation_data)

        # 報酬の推移をプロット
        plt.plot(rewards)
        plt.xlabel('Episode')
        plt.ylabel('Total Reward')
        plt.show()

        # 学習終了後にポリシーを可視化
        # print("Visualizing policy...")
        # visualize_policy(agent, env)

# def visualize_policy(env, agent):
#     """各セルにおけるエージェントの方策を可視化"""
#     policy_grid = [["-"] * env.field_size for _ in range(env.field_size)]
#     for x in range(env.field_size):
#         for y in range(env.field_size):
#             position = np.zeros((env.field_size, env.field_size)).flatten()
#             position[x * env.field_size + y] = 1
#             action = agent.act(position)
#             policy_grid[x][y] = ["↑", "↓", "←", "→"][action]
#     return "\n".join([" ".join(row) for row in policy_grid])


# モデル保存 (テキスト形式)
def save_model_to_json(model, file_path, epsilon):
    # モデルの状態辞書を取得
    state_dict = model.state_dict()

    # パラメータをJSONに変換
    model_dict = {}
    for param_tensor in state_dict:
        # パラメータのテンソルをリストに変換
        model_dict[param_tensor] = state_dict[param_tensor].cpu().numpy().tolist()

    # epsilonを保存
    model_dict["epsilon"] = epsilon
    
    # JSONファイルに保存
    with open(file_path, "w", encoding="utf-8") as f:
        json.dump(model_dict, f, indent=4)

    # print(f"Model saved to {file_path}.")


# モデル読み込み (テキスト形式)
def load_model_from_json(model, file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        model_dict = json.load(f)

    # モデルのstate_dictにパラメータをロード
    state_dict = {k: torch.tensor(v) for k, v in model_dict.items() if k != "epsilon"}
    model.load_state_dict(state_dict)


    # モデルにパラメータをロード
    model.load_state_dict(state_dict)
    print(f"Model loaded from {file_path}.")

    # 最初の層の重みをプリント ロードチェック用
    # print("Load Initial weights of fc1 (first layer):")
    # print(model.fc1.weight.data)

    # epsilonを復元
    epsilon = model_dict.get("epsilon", 1.0)
    print(f"Model loaded from {file_path}. Epsilon: {epsilon}")
    return epsilon


# メモリの保存
def save_memory(agent, memory_file_path):
    # メモリの内容をリストに変換
    memory_list = [
        [item.tolist() if isinstance(item, np.ndarray) else item for item in experience]
        for experience in agent.memory
    ]
    
    # JSON に保存
    with open(memory_file_path, "w", encoding="utf-8") as f:
        json.dump(memory_list, f, indent=4)  # インデント追加
    # print(f"Memory saved to {memory_file_path}.")

    # コメント風のフィールドを追加
    memory_with_comment = [{"_comment": "Memory format: [state, action, reward, next_state, done]"}]
    memory_with_comment.extend(memory_list)
    
    with open(memory_file_path, "w", encoding="utf-8") as f:
        json.dump(memory_with_comment, f, indent=4)
    # print(f"Memory saved to {memory_file_path}.")


# メモリの読み込み
def load_memory(agent, memory_file_path):
    if os.path.exists(memory_file_path):
        with open(memory_file_path, "r", encoding="utf-8") as f:
            memory_list = json.load(f)

        # コメントを除外
        memory_list = [item for item in memory_list if not isinstance(item, dict)]

        # リストを再度 `deque` に変換し、必要に応じて numpy.ndarray に戻す
        agent.memory = deque(
            [
                [np.array(item) if isinstance(item, list) and isinstance(item[0], float) else item for item in experience]
                for experience in memory_list
            ],
            maxlen=2000
        )
        print(f"Memory loaded from {memory_file_path}.")
    else:
        print("No memory file found. Starting with an empty memory.")


# メイン処理
def main():
    # 設定
    episodes = 100  # 学習エピソード数
    batch_size = 32  # ミニバッチサイズ
    save_path = "asciiFig.txt"  # 学習過程の保存先
    model_save_path = "model_weights.json"  # 学習モデルの保存先

    # 環境とエージェントの初期化
    print("Initializing environment and agent...")
    env = Environment(FIELD_SIZE, ANT_START, GOAL_POSITION, SMELL_STRENGTH)
    agent = Agent(FIELD_SIZE)

    # 学習ループの実行
    print(f"Training for {episodes} episodes...")
    train_agent(episodes, batch_size, save_path, model_save_path)

    print("Training complete.")
    print(f"Results saved to {save_path} and model saved to {model_save_path}.")

    # print("Visualizing policy...")
    # visualize_policy(agent, env)
    # print("Policy visualization complete.")

if __name__ == "__main__":
    main()
    

ビューアー

import tkinter as tk
from tkinter import ttk

def load_ascii_file(file_path):
    """asciiFig.txtを読み込んでデータを辞書に変換する関数"""
    data = {}
    current_episode = None
    current_step = None
    with open(file_path, 'r') as file:
        lines = file.readlines()
        for line in lines:
            line = line.strip()
            if line.startswith("Episode") and "Step" in line:
                parts = line.replace(",", "").split()
                current_episode = int(parts[1])  # エピソード番号
                current_step = int(parts[3])  # ステップ番号
                if current_episode not in data:
                    data[current_episode] = {}
                data[current_episode][current_step] = []
            elif line.startswith("Episode") and "finished" in line:
                # 終了行をスキップ
                continue
            elif current_episode is not None and current_step is not None and line != "":
                data[current_episode][current_step].append(line)
    return data


class EpisodeViewerApp(tk.Tk):
    def __init__(self, data):
        super().__init__()
        self.title("Episode Viewer")
        self.geometry("400x300")
        
        self.data = data
        self.current_episode = 1
        self.current_step = 1

        # エピソード選択
        tk.Label(self, text="Episode:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
        self.episode_var = tk.IntVar(value=1)
        self.episode_selector = ttk.Spinbox(self, from_=1, to=len(data), textvariable=self.episode_var, command=self.update_steps)
        self.episode_selector.grid(row=0, column=1, padx=5, pady=5)

        # ステップ選択(シークバー)
        tk.Label(self, text="Step:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
        self.step_var = tk.IntVar(value=1)
        self.step_slider = ttk.Scale(self, from_=1, to=1, orient="horizontal", variable=self.step_var, command=self.update_display)
        self.step_slider.grid(row=1, column=1, padx=5, pady=5, sticky="ew")

        # 図の表示エリア
        self.display_area = tk.Text(self, wrap="none", width=30, height=10)
        self.display_area.grid(row=2, column=0, columnspan=2, padx=5, pady=5)

        # 更新ボタン
        self.update_button = ttk.Button(self, text="Update", command=self.update_display)
        self.update_button.grid(row=3, column=0, columnspan=2, pady=5)

        # 初期表示
        self.update_steps()

    def update_steps(self):
        """エピソード変更時にステップ数を更新"""
        episode = self.episode_var.get()
        if episode in self.data:
            max_steps = len(self.data[episode])
            self.step_slider.config(to=max_steps)
            self.step_var.set(1)
            self.update_display()

    def update_display(self, *_):
        """選択されたエピソードとステップに基づいて図を更新"""
        episode = self.episode_var.get()
        step = int(self.step_var.get())
        if episode in self.data and step in self.data[episode]:
            self.display_area.delete("1.0", tk.END)
            diagram = "\n".join(self.data[episode][step])
            self.display_area.insert("1.0", f"Episode {episode}, Step {step}\n\n{diagram}")
        else:
            self.display_area.delete("1.0", tk.END)
            self.display_area.insert("1.0", "No data available for this step.")

# メイン処理
if __name__ == "__main__":
    # ファイルを読み込む
    file_path = "asciiFig.txt"
    # file_path = "asciiFig_X.txt"
    data = load_ascii_file(file_path)

    # アプリケーションを起動
    app = EpisodeViewerApp(data)
    app.mainloop()
コード実行

github

GitHub - WOCae/TA31_up
Contribute to WOCae/TA31_up development by creating an account on GitHub.

コメント

Translate »
タイトルとURLをコピーしました