M.S/Reinforcement learning

강화학습 Agent를 이용한 MC control, SARSA, Q-Learning

Health&Program 2022. 8. 5. 15:13

Model free 환경에서 Agent를 이용하여 Q(S,A) 값을 구해보자! 

 

알아야 할 개념.

- MC, TD prediction

  현재 Model free 환경에서 주어진 policy를 이용해서  Value function을 평가하는 방법이다.

- Value function - V(S)

  현재 State로 부터 Terminal State 까지 도달 했을때 기대할 수 있는 감가율이 적용된 총 보상값

- Action value function - Q(S,A)

  현재 State에서 Action을 함으로써 Terminal State 까지 도달 했을때 기대할 수 있는 감가율이 적용된 총 보상값

 

MC-control

- MC prediction 방법과 동일하게 episode 를 모두 진행시켜보고 cumulative reward 기반으로 Q(S,A) 값을 update 하는 방법이다.

- policy는 주로 입실론 그리디 방법을 사용한다.

import numpy as np

def decay_schedule(
    init_value, min_value,
    decay_ratio, max_steps,
    log_start=-2, log_base=10):

    decay_steps = int(max_steps * decay_ratio)
    rem_steps = max_steps - decay_steps
    values = np.logspace(log_start, 0, decay_steps, base=log_base, endpoint=True)[::-1]
    values = (values - values.min()) / (values.max() - values.min())
    values = (init_value - min_value) * values + min_value
    values = np.pad(values, (0, rem_steps), 'edge')
    return values
    
def generate_trajectory(
    select_action, Q, epsilon,
    env, max_steps=200):
    
    done, trajectory = False, []
    while not done:
        state = env.reset()
        for t in range(max_steps):
            action = select_action(state, Q, epsilon)
            next_state, reward, done, _ = env.step(action)
            experience = (state, action, reward, next_state, done)
            trajectory.append(experience)
            
            if done:
                break
    
            state = next_state
    return np.array(trajectory)
def mc_control(
    env,
    gamma=1.0,
    init_alpha=0.5,
    min_alpha=0.01,
    alpha_decay_ratio=0.5,
    init_epsilon=1.0,
    min_epsilon=0.1,
    epsilon_decay_ratio=0.9,
    n_episodes=3000,
    max_steps=200,
    first_visit=True):

    nS, nA = env.observation_space.n, env.action_space.n
    discounts = np.logspace(0, max_steps, num=max_steps, base=gamma, endpoint=False)
    alphas = decay_schedule(init_alpha, min_alpha,alpha_decay_ratio, n_episodes)
    epsilons = decay_schedule(init_epsilon, min_epsilon,epsilon_decay_ratio, n_episodes)

    pi_track = []
    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)

    select_action = lambda state, Q, epsilon : np.argmax(Q[state]) if np.random.random() > epsilon else np.random.randint(len(Q[state]))

    for e in tqdm(range(n_episodes), leave=False):
        trajectory = generate_trajectory(select_action=select_action, Q=Q, epsilon=epsilons[e], env, max_steps)

        visited = np.zeros((nS, nA), dtype=np.bool)

        for t, (state, action, reward, next_state, done) in enumerate(trajectory):
            if visited[state][action] and first_visit:
                continue
            visited[state][action] = True

            n_steps = len(trajectory[t:])
            G = np.sum(discounts[:n_steps] * trajectory[t:, 2])
            Q[state][action] = Q[state][action] + alphas[e] * (G - Q[state][action])
        
        Q_track[e] = Q
        pi_track.append(np.argmax(Q, axis=1))
    
    V = np.max(Q, axis=1)
    pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q,axis=1))}[s]
    return Q, V, pi, Q_track, pi_track

 

SARSA

- MC-control 방법과 달리 episode를 끝까지 기다릴 필요가 없다.

- on-policy 방법이다(현재 agent의 policy로 sample을 수집하고 평가한다.)

- next state에 대해서 자기 자신 agent가 직접 선택한다.

def sarsa(env,
          gamma=1.0,
          init_alpha=0.5,
          min_alpha=0.01,
          alpha_decay_ratio=0.5,
          init_epsilon=1.0,
          min_epsilon=0.1,
          epsilon_decay_ratio=0.9,
          n_episodes=3000):

    nS, nA = env.observation_space.n, env.action_space.n
    pi_track = []

    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)

    
    select_action = lambda state, Q, epsilon : np.argmax(Q[state]) \
        if np.random.random() > epsilon \
        else np.random.randint(len(Q[state]))
    
    alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
    epsilons = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)

    for e in tqdm(range(n_episodes), leave=False):
        state, done = env.reset(), False
        action = select_action(state, Q, epsilons[e])

        while not done:
            next_state, reward, done, _ = env.step(action)

            # 다음 상태에 대한 action을 현재 agent의 policy로 선택함. (epsilon greedy)
            next_action = select_action(next_state, Q, epsilons[e])

            # sarsa target 계산
            sarsa_target = reward + gamma * Q[next_state][next_action] * (not done)

            # sarsa error 계산
            sarsa_error = sarsa_target - Q[state][action]

            # Q값 update
            Q[state][action] = Q[state][action] + alphas[e] * sarsa_error
            
            state, action = next_state, next_action
        
        Q_track[e] = Q
        pi_track.append(np.argmax(Q, axis=1))
    V = np.max(Q, axis = 1)
    pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]

    return Q, V, pi, Q_track, pi_track

Q-learning

- MC-control 방법과 달리 episode를 끝까지 기다릴 필요가 없다.

- off-policy 방법이다(현재 policy로 sample을 수집하지만 평가 할때는 optimal policy로만 평가한다.)

- next state에 대해서 optimal policy로 선택한다.

def q_learning(env,
          gamma=1.0,
          init_alpha=0.5,
          min_alpha=0.01,
          alpha_decay_ratio=0.5,
          init_epsilon=1.0,
          min_epsilon=0.1,
          epsilon_decay_ratio=0.9,
          n_episodes=3000):

    nS, nA = env.observation_space.n, env.action_space.n
    pi_track = []

    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)

    
    select_action = lambda state, Q, epsilon : np.argmax(Q[state]) \
        if np.random.random() > epsilon \
        else np.random.randint(len(Q[state]))
    
    alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
    epsilons = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)

    for e in tqdm(range(n_episodes), leave=False):
        state, done = env.reset(), False

        #현재 state는 agent의 policy를 이용해서 선택
        action = select_action(state, Q, epsilons[e])

        while not done:
            next_state, reward, done, _ = env.step(action)

            # Q-learning target 계산, 다음 state는 다음 상태의 Q값에서 최대값 선택
            q_target = reward + gamma * Q[next_state].max() * (not done)

            # Q-learning error 계산
            q_error = q_target - Q[state][action]

            # Q값 update
            Q[state][action] = Q[state][action] + alphas[e] * q_error

            state = next_state
        
        Q_track[e] = Q
        pi_track.append(np.argmax(Q, axis=1))
    V = np.max(Q, axis = 1)
    pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]

    return Q, V, pi, Q_track, pi_track

Double  Q-Learning

- Q-learning의 고질적인 문제인 overestimation 문제가 있다.

- 그 결과 maximazation bias 현상이 일어난다. (Q-learning 로직 특성상 다음 상태의 max 값을 적용하는 경향 때문.)

- 2개 이상의 Q(S,A) 매칭 값을 두고 sum, avg, min 값 들을 고려하여 Q(S,A) 값을 개선하는 방법이다.

- double learning 의 기반이다.

- double deep Q - network 의 기반이다.

import numpy as np

def double_q_learning(env,
                      gamma=1.0,
                      init_alpha=0.5,
                      min_alpha=0.01,
                      alpha_decay_ratio=0.5,
                      init_epsilon=1.0,
                      min_epsilon=0.1,
                      epsilon_decay_ratio=0.9,
                      n_episodes=3000):

    nS, nA = env.observation_space.n, env.action_space.n
    pi_track = []

    #2개의 Q 값 생성
    Q1 = np.zeros((nS, nA), dtype=np.float64)
    Q2 = np.zeros((nS, nA), dtype=np.float64)
    Q_track1 = np.zeros((n_episodes, nS, nA), dtype=np.float64)
    Q_track2 = np.zeros((n_episodes, nS, nA), dtype=np.float64)
    
    select_action = lambda Q, state, epsilon np.argmax(Q[state]) if np.random.random() > epsilon else np.random.randint(len(Q[state]))

    alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
    epsilons = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)

    for e in tqdm(range(n_episodes), leave=False):
        state, done = env.reset(), False
        while not done:

            # 2개의 Q 값에 대한 평균으로 Q값을 형성하고 선택함.
            action = select_action(state, (Q1+Q2)/2, epsilon[e])

            next_state, reward, done, _ = env.step(action)

            # 2개중 1개의 Q 값을 랜덤으로 선택하여 업데이트함.
            if np.random.randint(2):
                argmax_Q1 = np.argmax(Q1[state])

                #이때 다른 Q 값을 보고 가져와서 업데이트함.
                td_target = reward + gamma * Q2[next_state][argmax_Q1] * (not done)
                td_error = td_target - Q1[state][action]
                Q1[state][action] = Q1[state][action] + alphas[e] * td_error
            else:
                argmax_Q2 = np.argmax(Q2[state])

                #이때 다른 Q 값을 보고 가져와서 업데이트함.
                td_target = reward + gamma * Q1[next_state][argmax_Q2] * (not done)
                td_error = td_target - Q2[state][action]
                Q2[state][action] = Q2[state][action] + alphas[e] * td_error
        state = next_state
    Q_track1[e] = Q1
    Q_track2[e] = Q2
    pi_track.append(np.argmax((Q1+Q2)/2), axis=1))

Q = (Q1 + Q2) / 2.0
V = np.max(Q, axis = 1)
pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]

return Q, V, pi, (Q_track1 + Q_track2)/2., pi_track

 

정리

- Control problem을 위해 Q-function을 prediction 하는 방법을 배웠다.

- Q-function을 prediction  하는 method로  MC-control, SARSA, Q-learning, Double Q-learning을 배웠다.

- MC-control 은 MC 방법 기반으로 variance가 크고 에피소드 끝까지 기다려야 하는 단점이 있지만 bias가 다른 적다.

- SARSA 는 TD 방법 기반으로 on-policy 방법이다. 이유는 현재 policy로 다음 state에 대한 action을 선택하고 평가하고 Q 값을 업데이트 하기때문이다. on-policy 방법은 off-policy 방법에 비해 안정적이다. 하지만 직접 경험해야 하는 문제가 있다.

즉 다른 agent의 평가값을 반영하지 않는다.

- Q-learning은 TD 방법 기반으로 off-policy 방법이다. 이유는 다음 state에 대한 action은 현재 policy를 따르지 않고 max Q 값만을 이용해 학습하기 때문이다. off-policy 방법은 불안정하지만 다른 agent의 평가값을 반영할 수 있기 때문에 학습 데이터를 이용하는데 유연하다.

- Double Q-learning은 TD 방법 기반이고 off-policy 방법이다. Q-learning의 단점인 overestimation, maximization bias 문제를 해결하기 위해 2개 이상의 Q 값을 이용해 평균을 내서 평가하는 방법이다.