Compy's Blog
2975 words
15 minutes
[RL] Q-Learning 구현
2025-10-08

Q-Learning의 기초적인 설명은 여기에서 확인할 수 있습니다.

Q-learning에 대해서 배워봤으니, 이제 눈과 코드로 어떻게 된건지 볼 차례입니다. 예제부터 쭉 한번 작성해봅시다.

먼저 Q-learning으로 저는 maze를 탈출하는 방법을 가르쳐볼겁니다.

일단 먼저 시각화를 하기 위해서 pygame으로 틀을 짜볼까요??

class Grid:

    def __init__(self, x: int, y: int, grid_type: str = "basic",
                 text: str = "", color: tuple = (255, 255, 255)):
        self.x = x
        self.y = y
        self.grid_type = grid_type
        self.text = text
        self.color = color

        return


    def __str__(self):
        return f"현재 위치 ({self.x}, {self.y}) [{self.color}]"

하나의 격자 블럭을 이렇게 정의해봅시다.

이거를 Block처럼 하나씩 쌓아서 Board라는 클래스로 미로를 한번 만들어봅시다. 위에서 설명한 Q-learning처럼 너무 큰 상태 공간은 정말 말도 안되게 학습 오래 걸리기 때문에 5*5로 해봅시다.

Board Class
class Board:
    def __init__(self, width: int, height: int, block_size: int):
        pygame.init()
        pygame.font.init()

        self.width = width
        self.height = height

        self.font = pygame.font.SysFont('Arial', 30)

        self.board = [[Grid(x, y) for x in range(width)] for y in range(height)]
        self.block_size = block_size
        self.padding = 20

        self.background = (255, 255, 255)

        # pygame section
        self.screen = pygame.display.set_mode(
            (block_size * width + self.padding * 2, block_size * height + self.padding * 2))

        self.update()
        return

    def set_grid(self, new: Grid):
        self.board[new.y][new.x] = new
        return

    def update(self):
        self.screen.fill(self.background)

        for y in range(self.height):
            for x in range(self.width):
                self.grid_update(x, y)

        self.draw_grid_lines()

        pygame.display.flip()
        # pygame.display.update()

        return

    def draw_grid_lines(self):
        line_color = (0, 0, 0)
        line_width = 1

        for x in range(self.width + 1):
            start_pos = (self.padding + x * self.block_size, self.padding)
            end_pos = (self.padding + x * self.block_size,
                       self.padding + self.height * self.block_size)
            pygame.draw.line(self.screen, line_color, start_pos, end_pos, line_width)

        for y in range(self.height + 1):
            start_pos = (self.padding, self.padding + y * self.block_size)
            end_pos = (self.padding + self.width * self.block_size,
                       self.padding + y * self.block_size)
            pygame.draw.line(self.screen, line_color, start_pos, end_pos, line_width)
        return

    def grid_update(self, x: int, y: int):
        block = self.board[y][x]
        if block is None: return
        # print(x, y, block)
        rect = pygame.draw.rect(self.screen, block.color,
                                (
                                    self.padding + x * self.block_size, self.padding + y * self.block_size,
                                    self.block_size,
                                    self.block_size))

        text = self.font.render(block.text, True, (255, 255, 255))
        text_rect = text.get_rect()
        text_rect.center = (self.padding + x * self.block_size + self.block_size // 2,
                            self.padding + y * self.block_size + self.block_size // 2)
        self.screen.blit(text, text_rect)

        return

    def valid_position(self, y, x):
        if 0 <= y < self.height and 0 <= x < self.width:
            return self.board[y][x].grid_type != "wall"
        return False

    def __copy__(self):
        result = Board(self.width, self.height, self.block_size)

        for y in range(self.height):
            for x in range(self.width):
                result.set_grid(self.board[y][x])
        return result

일단 위와 같이 클래스 Board를 만들고, 이걸 통해서 시각화를 진행해 봅시다.

간단한 maze를 만들어 보았습니다. 아래의 코드를 실행시키면!

main 초기 코드
from CompyGrid import *
from Algorithm import *
import pygame


def initialize() -> Board:
    x = 5
    y = 5

    board = Board(x, y, 150)

    maze = [
        [1, 1, 1, 0, 0],
        [1, 0, 1, 1, 0],
        [1, 1, 0, 1, 1],
        [1, 0, 0, 1, 0],
        [1, 1, 1, 1, 1]
    ]

    for i in range(y):
        for j in range(x):
            state = maze[i][j]

            block = Grid(j, i, "empty" if state else "wall", str(state), (0, 170, 200) if state else (175, 0, 0))
            board.set_grid(block)


    return board



board = initialize()
board.update()

while True:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            break;
maze = [
        [1, 1, 1, 0, 0],
        [1, 0, 1, 1, 0],
        [1, 1, 0, 1, 1],
        [1, 0, 0, 1, 0],
        [1, 1, 1, 1, 1]
    ]

상태 처럼 나오게 됩니다.



일단 Q-learning으로 진짜되나? 얼마나 잘 되는 것일까?를 보기 위해서 bruteforce 즉 랜덤으로 탈출하는 알고리즘을 작성해서 비교해봅시다!

Bruteforce 알고리즘

import pygame
import random
import imageio
from CompyGrid.Grid import Grid


class Bruteforce:
    def __init__(self, output_path="bruteforce_path"):
        self.output_path= output_path
        pass

    def bruteforce_escape(self, board, max_steps=100000, record=True):
        agent_pos = [0, 0]
        goal_pos = [board.width - 1, board.height - 1]
        steps = 0
        path = [agent_pos.copy()]

        clock = pygame.time.Clock()

        while agent_pos != goal_pos and steps < max_steps:
            action = random.randint(0, 3)  # 0=right, 1=left, 2=down, 3=up

            x, y = agent_pos[0], agent_pos[1]
            new_x, new_y = x, y

            if action == 0:  # right
                new_x = x + 1
            elif action == 1:  # left
                new_x = x - 1
            elif action == 2:  # down
                new_y = y + 1
            elif action == 3:  # up
                new_y = y - 1

            if board.valid_position(new_y, new_x):
                agent_pos = [new_x, new_y]
                path.append(agent_pos.copy())

            steps += 1

            if steps % 10 == 0:
                board.screen.fill(board.background)

                for grid_y in range(board.height):
                    for grid_x in range(board.width):
                        board.grid_update(grid_x, grid_y)

                pre = board.board[agent_pos[1]][agent_pos[0]]


                agent_grid = Grid(agent_pos[0], agent_pos[1],
                                  text="B", color=(255, 165, 0))
                board.set_grid(agent_grid)
                board.grid_update(agent_pos[0], agent_pos[1])

                goal_grid = Grid(goal_pos[0], goal_pos[1],
                                 grid_type="goal",
                                 text="G", color=(0, 255, 0))
                board.set_grid(goal_grid)
                board.grid_update(goal_pos[0], goal_pos[1])

                font = pygame.font.SysFont('Arial', 20)
                text = font.render(f"Brute-force | Steps: {steps}", True, (255, 255, 255))
                board.screen.blit(text, (10, 10))

                pygame.display.flip()
                clock.tick(60)
                board.set_grid(pre)
                for event in pygame.event.get():
                    if event.type == pygame.QUIT:
                        return steps, path
            print(agent_pos, goal_pos)

        if agent_pos == goal_pos:
            print(f"Bruteforce: 총 {steps} 스텝")
        else:
            print(f"Bruteforce 실패")

        if record and agent_pos == goal_pos:
            self.save_path_as_gif(board, path, goal_pos, f"{self.output_path}.gif", fps=30)

        return steps, path

    def save_path_as_gif(self, board, path, goal_pos, filename, fps=10):
        frames = []

        sample_rate = max(1, len(path) // 1000)
        sampled_path = path[::sample_rate]

        print(f"총 {len(path)} 스텝 중 {len(sampled_path)} 프레임 녹화...")

        for i, pos in enumerate(sampled_path):
            board.screen.fill(board.background)

            for y in range(board.height):
                for x in range(board.width):
                    board.grid_update(x, y)

            for j in range(max(0, i - 50), i):
                if j < len(sampled_path):
                    trail_pos = sampled_path[j]
                    trail_grid = Grid(trail_pos[0], trail_pos[1],
                                      grid_type="trail",
                                      text="", color=(200, 200, 100))
                    board.set_grid(trail_grid)
                    board.grid_update(trail_pos[0], trail_pos[1])

            agent_grid = Grid(pos[0], pos[1],
                              grid_type="agent",
                              text="B", color=(255, 165, 0))
            board.set_grid(agent_grid)
            board.grid_update(pos[0], pos[1])

            goal_grid = Grid(goal_pos[0], goal_pos[1],
                             grid_type="goal",
                             text="G", color=(0, 255, 0))
            board.set_grid(goal_grid)
            board.grid_update(goal_pos[0], goal_pos[1])

            font = pygame.font.SysFont('Arial', 30)
            original_step = i * sample_rate
            text = font.render(f"Brute-force | Step: {original_step}/{len(path)}",
                               True, (255, 255, 255))
            board.screen.blit(text, (10, 10))

            pygame.display.flip()

            frame = pygame.surfarray.array3d(board.screen)
            frame = frame.transpose([1, 0, 2])
            frames.append(frame)

        duration = 1000 / fps
        imageio.mimsave(filename, frames, duration=duration, loop=0)
        

이렇게 작성하여서, gif로 어떻게 되었는지 결과도 볼 수 있게 만들어봤습니다.

이제 Q-learning을 작성해봅시다!

import copy
import pygame
from CompyGrid.Grid import Grid
import os
from PIL import Image
import imageio

import numpy as np


class QLearning:

    def __init__(self, n_states, n_actions, learning_rate=0.1, discount_factor=0.9, epsilon=0.2):
        """
        n_states: 상태의 개수
        n_actions: 행동의 개수
        learning_rate: 학습률 (alpha)
        discount_factor: 할인율 (gamma)
        epsilon: 탐험 확률 (epsilon-greedy)
        """

        self.n_states = n_states
        self.n_actions = n_actions
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon

        self.q_table = np.zeros((n_states, n_actions))  # (상태 * 행동)



        self.directions = ["right", "left", "down", "up"]

        return

    def get_action(self, state):  # epsilon greedy로 탐험과 활용
        if np.random.random() < self.epsilon:
            return np.random.randint(0, self.n_actions)
        else:
            return np.argmax(self.q_table[state, :])
    def update_q_table(self, state, action, reward, next_state):
        """
        Q-table (Q-Learning의 핵심입니다)

        ------------

        state: 현재 상태
        action: 수행한 행동
        reward: 받은 보상
        next_state: 다음 상태
        """
        # Q-Learning 업데이트 공식
        # Q(s,a) = Q(s,a) + α[r + γ*max(Q(s',a')) - Q(s,a)]

        current_q = self.q_table[state, action]
        max_next_q = np.max(self.q_table[next_state, :])

        new_q = current_q + self.lr * (reward + self.gamma * max_next_q - current_q)
        self.q_table[state, action] = new_q

    def theta_to_pi(self, theta):  # 현재 행동 확률 분포 반환
        # Softmax 함수 적용
        exp_theta = np.exp(theta)
        pi = exp_theta / np.sum(exp_theta, axis=1, keepdims=True)
        return pi
    def get_policy(self):  # 행동 반환해주는거 argmax Q(s, a)
        return np.argmax(self.q_table, axis=1)

    def train(self, board, episodes=1000, render_interval=100):
        for episode in range(episodes):
            agent_pos = [0, 0]
            goal_pos = [board.width - 1, board.height - 1]
            state = agent_pos[1] * board.width + agent_pos[0]

            done = False
            total_reward = 0
            steps = 0
            max_steps = board.width * board.height * 4
            path = [agent_pos.copy()] if render_interval > 0 and episode % render_interval == 0 else None

            while not done and steps < max_steps:
                action = self.get_action(state)


                x, y = agent_pos[0], agent_pos[1]
                new_x, new_y = x, y

                if action == 0:  # right
                    new_x = x + 1
                elif action == 1:  # left
                    new_x = x - 1
                elif action == 2:  # down
                    new_y = y + 1
                elif action == 3:  # up
                    new_y = y - 1


                if board.valid_position(new_y, new_x):
                    agent_pos = [new_x, new_y]
                    reward = 0
                else:
                    reward = 0

                next_state = agent_pos[1] * board.width + agent_pos[0]


                if agent_pos == goal_pos:
                    reward = 10
                    done = True


                self.update_q_table(state, action, reward, next_state)

                state = next_state
                total_reward += reward
                steps += 1


                if path is not None:
                    path.append(agent_pos.copy())


            if path is not None:
                self._render_path(board, path, goal_pos)

            if (episode + 1) % 100 == 0:
                print(f"Episode {episode + 1}/{episodes}, "
                      f"Total Reward: {total_reward:.2f}, "
                      f"Steps: {steps}")

        return self.q_table



    def _render_path(self, board, path, goal_pos):
        for i, pos in enumerate(path):
            board.screen.fill(board.background)
            for y in range(board.height):
                for x in range(board.width):
                    board.grid_update(x, y)
            for prev_pos in path[:i]:
                trail_grid = Grid(prev_pos[0], prev_pos[1],
                                  grid_type="trail",
                                  text="", color=(150, 150, 255))
                board.set_grid(trail_grid)
                board.grid_update(prev_pos[0], prev_pos[1])


            agent_grid = Grid(pos[0], pos[1],
                              grid_type="agent",
                              text="A", color=(255, 0, 0))
            board.set_grid(agent_grid)
            board.grid_update(pos[0], pos[1])


            goal_grid = Grid(goal_pos[0], goal_pos[1],
                             grid_type="goal",
                             text="G", color=(0, 255, 0))
            board.set_grid(goal_grid)
            board.grid_update(goal_pos[0], goal_pos[1])

            pygame.display.flip()
            pygame.time.wait(100)

    def record_episode(self, board, output_path="agent_path", format="gif", fps=10):
        frames = []
        agent_pos = [0, 0]
        goal_pos = [board.width - 1, board.height - 1]
        state = agent_pos[1] * board.width + agent_pos[0]

        done = False
        steps = 0
        max_steps = board.width * board.height * 4
        path = [agent_pos.copy()]
        while not done and steps < max_steps:
            action = int(self.get_policy()[state])

            x, y = agent_pos[0], agent_pos[1]
            new_x, new_y = x, y

            if action == 0:  # right
                new_x = x + 1
            elif action == 1:  # left
                new_x = x - 1
            elif action == 2:  # down
                new_y = y + 1
            elif action == 3:  # up
                new_y = y - 1

            if board.valid_position(new_y, new_x):
                agent_pos = [new_x, new_y]
                path.append(agent_pos.copy())

            next_state = agent_pos[1] * board.width + agent_pos[0]
            done = (agent_pos == goal_pos)
            state = next_state
            steps += 1

        for i, pos in enumerate(path):
            board.screen.fill(board.background)
            for y in range(board.height):
                for x in range(board.width):
                    board.grid_update(x, y)


            for prev_pos in path[:i]:
                trail_grid = Grid(prev_pos[0], prev_pos[1],
                                  grid_type="trail",
                                  text="", color=(150, 150, 255))
                board.set_grid(trail_grid)
                board.grid_update(prev_pos[0], prev_pos[1])


            agent_grid = Grid(pos[0], pos[1],
                              grid_type="agent",
                              text="A", color=(255, 0, 0))
            board.set_grid(agent_grid)
            board.grid_update(pos[0], pos[1])


            goal_grid = Grid(goal_pos[0], goal_pos[1],
                             grid_type="goal",
                             text="G", color=(0, 255, 0))
            board.set_grid(goal_grid)
            board.grid_update(goal_pos[0], goal_pos[1])

            pygame.display.flip()


            frame = pygame.surfarray.array3d(board.screen)
            frame = frame.transpose([1, 0, 2])
            frames.append(frame)


        if format == "gif":
            self._save_as_gif(frames, f"{output_path}.gif", fps)
        return len(path)
    def _save_as_gif(self, frames, filepath, fps):
        duration = 1000 / fps  # ms
        imageio.mimsave(filepath, frames, duration=duration, loop=0)

사실 블로그에 이론적 설명은 전 포스트에서 했었고, 여기서 또 설명하기 보다는 코드에 조금 설명을 적어 놓았습니다.

이 흐름대로 쭉 보시면 이해하기 편할 것이라 생각했구요. 시각화 코드는 그냥 무시하셔도 전혀 지장이 없습니다. 왜냐면.. 코드가 좋지 않아요 ㅠ 그냥 어떻게든 시각화하고, 결과 뽑으려고 짠 코드라 그렇게 좋은 코드는 아닌것 같습니다.

하튼 이렇게 완성된 코드를 가지고 이제 main에 가져와서 둘 다 결과를 뽑아봅시다.




from CompyGrid import *
from Algorithm import *
import pygame


def initialize() -> Board:
    x = 5
    y = 5

    board = Board(x, y, 150)

    maze = [
        [1, 1, 1, 0, 0],
        [1, 0, 1, 1, 0],
        [1, 1, 0, 1, 1],
        [1, 0, 0, 1, 0],
        [1, 1, 1, 1, 1]
    ]

    for i in range(y):
        for j in range(x):
            state = maze[i][j]

            block = Grid(j, i, "empty" if state else "wall", str(state), (0, 170, 200) if state else (175, 0, 0))
            board.set_grid(block)


    return board



board = initialize()
board.update()


bruteforce = Bruteforce()
bruteforce_steps, bruteforce_path = bruteforce.bruteforce_escape(
    board, max_steps=100000, record=True
)

board = initialize()
board.update()
n_states = 5 * 5
n_actions = 4
q_learning = QLearning(n_states, n_actions,
                       learning_rate=0.1,
                       discount_factor=0.95,
                       epsilon=0.15)

q_learning.train(board, episodes=10000, render_interval=0)
q_learning.record_episode(board, "qlearning_path", format="gif", fps=5)

이렇게 실행을 시켜보면?!

어떤 결과가 나올까요?

먼저 브포의 결과를 살펴보겠습니다.

bruteforce_path_finding

이렇게 총 129 step을 밟아서 도착점에 도달했습니다. 음.. 정말 비효율적으로 도착했구요.


이제는 Q-learning은 어떻게 했는지.. (학습은 코드와 같이 10000번 시켜서 다 보여주긴 힘들고, 마지막으로 학습된 상태에 어떻게 길을 찾았나만 보도록 하겠습니다.)

자 그럼 결과를 보도록 하겠습니다.

Q_learning_path_finding

총 8 step만에 도착했습니다.

그래서 결과적으로 이런 결과가 나왔는데용?

알고리즘소요 Step
Bruteforce129
Q-Learning8

이렇게 Q-learning이 어떻게 동작하는지(코드로) 알아보았고요, 그 결과를 bruteforce와 비교해보았습니다.

하지만 저번 포스팅에서도 말했지만, 이 board의 크기가 조금만 커져도 적용이 힘들다는 큰 단점이 존재하구요..

현재 코드에서 되게 비효율적인 부분이 있을겁니다. 혼자 고쳐보셔도 되고, 아니면 아래 detail 열어서 확인해보시면 좋겠습니다.

비효율적인 부분
                if board.valid_position(new_y, new_x):
                    agent_pos = [new_x, new_y]
                    reward = 0
                else:
                    reward = 0

                next_state = agent_pos[1] * board.width + agent_pos[0]


                if agent_pos == goal_pos:
                    reward = 10
                    done = True

지금보면, 이동이 불가능한 곳에 갔을 때 -1을 주고, 이동가능하면 +1을 주고 도착하면 엄청 큰 보상을 주면 더욱 빠르게 학습하겠죠?

하지만 저는 그렇게 작성하지 않았습니다. 왜 그럴까요? Q-learning이 처음 도입되었을 때는 이론적으로 목표에 도달을 해야지, 그 목표에 대한 가치가 역으로 전파되면서 로 전파되면서 가치가 업데이트 됩니다. 그걸 정확히 구현하기 위해서 저렇게 했는데요. 저게 사실 조금만 생각해도 문제가 많아 보입니다. 애초에 목표에 도달을 못하면 Q table이 업데이트가 안돼요. 즉, 목표에 도달하지 못하면 Q-table에 의미있는 업데이트가 발생하지 않습니다. 또 이상한 뻘짓하다가 결국에는 목표에 도달했는데, Q-learning에서는 그러한 뻘짓도 좋은거로 착각하는 문제도 있습니다.

이런 문제들을 우리는

  1. 희소 보상 문제 (Sparse Reward Problem): 보상이 드물게 주어져 학습 신호가 부족한 문제
  2. 신용 할당 문제 (Credit Assignment Problem): 어떤 과거 행동이 현재 보상에 기여했는지 판단하기 어려운 문제

라고 합니다.

그래서 즉각적 보상을 주는 것으로 더 빨리 학습시킬 수 있는 방법이 있는 겁니당. 이게 현대에는 거의 당연시되게 적용되구요.

[RL] Q-Learning 구현
https://compy07.github.io/Blog/posts/ai/reinforcementlearning/theory/q_learning/implementation/
Author
뒹굴뒹굴 이정훈 공부방
Published at
2025-10-08