- Q - function, A - function, V - function의 관계를 이해하고 Dueling DQN/DDQN 강화학습 프레임워크를 이해한다.
- 기존 Replay buffer 에서 수집한 데이터에 대해서 우선순위(priority)를 어떻게 적용하고 데이터를 어떻게 샘플링 하는지 이해한다.
기존 가치 기반 심층 강화학습의 문제점(NFQ, DQN, DDQN)
문제점 Q - function을 대상으로 학습 한다.
- Q - function은 현재 State에서 Action을 했을때 기대할 수 있는 Discount factor가 적용된 return 값이다.
- 하지만 Q - function은 아래식처럼 V-function, A-function 들과 관계를 가지고있다.
- 기존의 DQN/DDQN은 아래의 공식만을 이용해서 학습했다.
- 이 방법의 문제점은 V-function과 A-function의 관계를 고려하지 않고 오직 Q-function 의 값만을 고려하여 학습한다는 의미이다. (결과적으로 정확한 Q-function 값을 찾기 힘들고, Bias가 커질수 있다는 뜻)
- 또한 어떤 행동을 선택해도 괜찮은 상태에서 가지는 Q-function값에 대한 기대치가 틀릴수있다. (비슷한 Q-function 값을 가져야 하는 상황인데에도 노이즈에 의해 편향이 생길 수 있다는 뜻)
Dueling 기법
- DNN의 출력을 V-function, A-function 으로 변형 시킨 방법이다.
- DQN, DDQN에 모두 적용 가능하다. (Dueling DQN, Dueling DDQN)
- 아래의 DNN의 구조로 출력하고 학습 시킨다.
- 강화학습 Agent는 출력 레이어에서 출력된 V-function 값과 A-function 값을 보고 Q-function을 계산한다.
- 주의해야 할 점은 Q-function을 V-function 값과 A-function 값을 이용하여 계산할때 아래 방법 처럼 조금 다른 방법으로 계산한다. (왜 이렇게 하는지는 정확히 이해 못하겠음.. 책에서는 Q-function에 대한 추정치만 이용해서 V-function과 A-function 값을 완전히 복원할 방법이 없기 때문에 Q-function에 average A-function을 뺀다고 설명함)
class FCDuelingQ(nn.Module):
def __init__(self,
input_dim,
output_dim,
hidden_dims=(32,32)
activation_fc=F.relu):
super(FCDuelingQ, self).__init__()
self.activation_fc = activation_fc
self.input_layer = nn.Linear(input_dim, hidden_dims[0])
self.hidden_layer = nn.ModuleList()
for i in range(len(hidden_dims)-1):
hidden_layer = nn.Linear(hidden_dims[i], hidden_dims[i+1])
self.hidden_layers.append(hidden_layer)
#V-function을 위한 출력, 두 출력은 네트워크를 공유함.
self.output_value = nn.Linear(hidden_dims[-1], 1)
#A-function을 위한 출력 , 두 출력은 네트워크를 공유함.
self.output_layer = nn.Linear(hidden_dims[-1], output_dim)
def forward(self, state):
x = state
if not isinstance(x, torch.Tensor):
x = torch.tensor(x, device=self.device,dtype=torch.float32)
x = x.unsqueeze(0)
x = self.activation_fc(self.input_layer(x))
for hidden_layer in self.hidden_layers:
x= self.activation_fc(hidden_layer(x))
#출력 레이어에 A-function, V-function 전용으로 레이어를 따로 구성하여 출력함.
a = self.output_layer(x)
v = self.output_value(x).expend_as(a)
#A-function 값과 V-function 값을 이용하여 Q-function 값 계산.
q = v + a - a.mean(1,keepdim=True).expand_as(a)
return q
PER(Prioritized Experience Replay)
- 기존 Replay buffer에서 경험 튜플 데이터를 효과적으로 샘플링 하는 방법이다.
- 여기서 효과적이란 강화학습 Agent가 학습을 잘하게 한다를 의미한다.
- 학습을 잘한다는 것은 높은 Return을 받는것도 있지만 안좋은 상태를 인지하는 방법도 효과적인 방법이다.
- PER 기법에서는 TD error 값의 절대값이 큰 것이 중요한 경험이라 한다.
- 이를 개산하고 경험 튜플을 아래와 같이 구성하여 replay buffer에 저장한다.
- 그다음 TD error의 값의 크기 순으로 정렬하여 순차대로 데이터를 가져오는 방법이다.
TD error 에 따르는 우선순위를 적용한 확률적 샘플링 방법
- 위의 방법은 노이즈에 매우 취약함으로 TD error 값에 따르는 확률 특성을 적용하는 방법이다.
- 이때 ε 값은 0 인 상황을 막기 위해 매우 작은 값을 넣는다.
- 알파값은 α 값은 우선순위 확률 적용 가중치를 의미한다. 0이면 랜덤 샘플링과 동일하게 수행하고, 1이면 우선순위가 TD error에 따라 비확률적으로 샘플링 한다는 뜻이다. 따라서 0 과 1사이의 값으로 설정해야한다.
TD error 에 따르는 Rank를 적용한 확률적 샘플링 방법
- 위의 확률적 샘플링 방법은 특정 TD error 값이 매우 크면 해당 TD error 값에 대한 경험 데이터를 이용할 확률이 높다. 이는 노이즈에 의해 TD error 값이 매우 크게 나올경우 문제가 발생한다.
- 따라서 값에 따라 확률을 나눈다기 보다는 순서라는 값으로 스케일링하여 확률을 적용하는 방법이다.
- Deep Q Network가 이전의 심층 강화학습의 어떤 문제를 어떻게 해결했는지 이해한다.
- Deep Q Network의 동작 방식을 이해하고 코드를 다룰 줄 안다.
- Deep Q Network의 한계점을 이해한다.
DQN trick 1 - Target Network
- 기존 NFQ의 문제점은 Neural Network 가 update 하면서 next state의 Q function값이 동시에 업데이트 하면서 결론적으로 불안정한 업데이트 Loss 값이 생긴다.
- 새롭게 제안한 Target network를 도입한 update 식
- 학습할때에는 target network는 k번의 학습 스탭마다 current network의 파라미터를 복사하고, k 번의 학습 스탭 동안 current network의 파라미터 값만 업데이트 하는 방법이다.
- 복잡한 환경일 수록 많은 학습 스탭이 필요하다. (아타리 게임 같이 CNN이 포함되어있는 모델은 10,000스텝 단위, 카트폴 환경 같이 직관적인 환경은 10~20 스텝 단위)
def optimize_model(self, experiences):
states, actions, rewards, next_states, is_terminals = experiences
batch_size = len(is_terminals)
#target model의 Q function 값을 이용
max_a_q_sp = self.target_model(next_states).detach().max(1)[0].unsqueeze(1)
target_q_sa = rewards + (self.gamma * max_a_q_sp * (1 - is_terminals))
#online model의 Q function 값을 이용
q_sa = self.online_model(states).gather(1, actions)
td_error = q_sa - target_q_sa
value_loss = td_error.pow(2).mul(0.5).mean()
self.value_optimizer.zero_grad()
value_loss.backward()
self.value_optmizer.step()
def update_network(self):
# 특정 학습 스텝이 지나면 online model의 파라미터를 target model의 파라미터로 복사함
for target, online in zip(self.target_model.parameters(), self.online_model.parameters()):
target.data.copy_(online.data)
DQN trick 2 - Experience replay
- 기존 NFQ에서 update에 사용한 데이터 셋은 IID 가정을 성립시키기 힘들었다.
- 업데이트 하는 미니배치 데이터셋은 모두 시간 관계성(순차적으로 들어오는 데이터)이 있었다.
- experience replay buffer 구조를 이용하여 IID 가정을 어느 정도 성립시키고, 데이터 튜플들간의 시간 관계성을 제거했다.
- 이 문제를 해결 하기위해 experience replay buffer 구조를 이용하여 state, action, reward, next state 튜플을 모두 저장하고 학습 Phase에서 experience replay buffer에서 mini batch 사이즈만큼 튜플을 샘플링하여 학습하는 방법을 사용했다.
experience replay buffer에 데이터를 모으는 과정
experience replay buffer에서 mini batch 사이즈만큼 샘플링 후 Network 학습
- Deep Neural Network를 이용한 Q - function근사화 방법을 이해하고 구현한다. (2 종류의 DNN)
- 기존의 TD 방법을 DNN의 학습 방법으로 매칭 시키며 이해하고 설명 가능해야 한다.
기존 MC-control, TD(Q-learning, SARSA) control의 한계점
- 기존의 MC-control, TD control은 table 형태로 저장하여 Value function 또는 Action value function을 업데이트하고 저장했다.
- 하지만 이 방법의 근본적인 문제는 table 형태로 모든 state에 대해서 각 Value function과 Action value function을 저장하고 있어야 한다는 문제점이 있었다.
- 따라서 State가 연속적인 값이거나 많은 State로 구성된 Environment인 경우에는 비현실적으로 매우 많은 state를 기억하고 있어야 하는 문제가 있었다.
Deep neural network의 도입
DNN 예시 사진
- 지도 학습에서 많이 쓰이는 Neural network 구조이다.
- 기존의 perceptron 메커니즘을 조합하여 구성한다.
Perceptron 예시
- 이번에 소개하는 NFQ는 2가지 타입의 DNN 구조를 이용하여 Q-function approximation을 한다.
Type1. State input, Q-function outputType2. State, action input, Q-function output
- 두 타입 모두 추 후 사용하게 될 구조임으로 강화 학습에 DNN을 어떻게 적용했는지 숙지해야 한다.
- DNN Class 정의 코드 (pytorch 이용)
class FCQ(nn.Module):
def __init__(self,
input_dim,
output_dim,
hidden_dims=(32,32),
activate_fc=F.relu):
super(FCQ,self).__init__()
# 활성화 함수 설정 (보통 ReLu 사용)
self.activation_fc = activate_fc
# 입력 레이어 정의
self.input_layer = nn.Linear(input_dim, hidden_dims[0])
# 은닉 레이어 설정 (여러 Layer 등록을 위해 List로 정의)
self.hidden_layer =nn.ModuleList()
# 반복문을 이용하여 여러 Layer를 등록함.
for i in range(len(hidden_dims) - 1):
hidden_layer=nn.Linear(hidden_dims[i], hidden_dims[i+1])
self.hidden_layer.append(hidden_layer)
# 출력 레이어 정의
self.output_layer = nn.Linear(hidden_dims[-1], output_dim)
# 모델에 입력시 사용할 함수
def forward(self, state):
# state 입력
x = state
# state 값이 torch.Tensor 타입이 아닐 경우 변환
if not ininstance(x, torch.Tensor):
x = torch.Tensor(x, device=self.device, dtype=torch.float32)
x = x.unsqeeze(0)
# 입력레이어 통과후 활성화 함수 적용
x = self.activation_fc(self.input_layer(x))
# 은닉레이어 통과후 활성화 함수 적용 반복
for hidden_layer in self.hidden_layers:
x = self.activate_fc(hidden_layer(x))
# 출력레이어 통과
x = self.output_layer(x)
# 결과값 출력
return x
- 다음은 Loss 함수에 대한 설명이다. 간단하게 MSE(mean squared error)를 사용한다.
# 다음 state의 Q function을 모두 가져옴
q_sp = self.online_model(next_states).detach()
# 다음 state의 Q function들 중 최대값을 가져옴
max_a_q_sp = q_sp.max(1)[0].unsqeeze(1)
# Q function target을 계산함.
target_q_s = rewards + self.gamma * max_a_q_sp * (1 - is_terminals)
# 현재 state의 Q function 값을 가져옴. 이때 모델은 입실론 그리디 정책으로 행동을 선택을 기준으로함.
q_sa = self.online_model(state).gather(1, actions)
# E [ (Q function target - Q function) ^ 2 ]
MSELoss(target_q_s, q_sa)
- 입실론 그리디 정책을 기반으로 학습시킨다.
class EGreedyStraregy():
...
def select_action(self, model, state):
self.exploratory_action_take = False
with torch.no_grad():
q_values = model(state).cpu().detach().data.numpy().squeeze()
if np.random.rand() > self.epsilon:
action = np.argmax(q_values)
else:
action = np.random.randint(len(q_values))
return action
- Optimizer 함수는 Adam, RMSProp 등이 있다.
- 주로 미니 배치 단위로 S, A, R, S'을 모으고 배치 단위로 학습시킨다.
정리
NFQ의 한계점(중요)
- 동일한 DNN을 이용하여 학습을 시킨다. 즉 현재 state, action 값에 대해서 학습을 하면서 다른 state, action값의 바뀐다.
미니 배치 단위로 학습을 하면서 target으로 하는 state, action 짝만 학습시키려고 함으로써 다른 비슷한 state, action 값이 안정적으로 수렴하지 힘들다.
- IID(independent and identically distribution) 가정을 지키기 못한다. 미니 배치 단위로 모델을 학습시킨다. 하지만 미니배치 학습을 주기로 매번 Q-function 값이 달라지게된다. 이는 매 주기마다 수집하는 미니배치 샘플들이 서로 다른 분포에서 샘플링된다는 뜻이다.
- 미니배치 데이터들이 시간 관계성이 있다. DNN 모델을 시간 관계가 있는 데이터를 기반으로 학습 시킨다면 오버피팅 된다. (Q-function approximation 기능을 RNN이나 LSTM으로 대체 한다면 괜찮을 지도..?)
def q_learning(env,
gamma=1.0,
init_alpha=0.5,
min_alpha=0.01,
alpha_decay_ratio=0.5,
init_epsilon=1.0,
min_epsilon=0.1,
epsilon_decay_ratio=0.9,
n_episodes=3000):
nS, nA = env.observation_space.n, env.action_space.n
pi_track = []
Q = np.zeros((nS, nA), dtype=np.float64)
Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)
select_action = lambda state, Q, epsilon : np.argmax(Q[state]) \
if np.random.random() > epsilon \
else np.random.randint(len(Q[state]))
alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
epsilons = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)
for e in tqdm(range(n_episodes), leave=False):
state, done = env.reset(), False
#현재 state는 agent의 policy를 이용해서 선택
action = select_action(state, Q, epsilons[e])
while not done:
next_state, reward, done, _ = env.step(action)
# Q-learning target 계산, 다음 state는 다음 상태의 Q값에서 최대값 선택
q_target = reward + gamma * Q[next_state].max() * (not done)
# Q-learning error 계산
q_error = q_target - Q[state][action]
# Q값 update
Q[state][action] = Q[state][action] + alphas[e] * q_error
state = next_state
Q_track[e] = Q
pi_track.append(np.argmax(Q, axis=1))
V = np.max(Q, axis = 1)
pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
return Q, V, pi, Q_track, pi_track
Double Q-Learning
- Q-learning의 고질적인 문제인 overestimation 문제가 있다.
- 그 결과 maximazation bias 현상이 일어난다. (Q-learning 로직 특성상 다음 상태의 max 값을 적용하는 경향 때문.)
- 2개 이상의 Q(S,A) 매칭 값을 두고 sum, avg, min 값 들을 고려하여 Q(S,A) 값을 개선하는 방법이다.
- double learning 의 기반이다.
- double deep Q - network 의 기반이다.
import numpy as np
def double_q_learning(env,
gamma=1.0,
init_alpha=0.5,
min_alpha=0.01,
alpha_decay_ratio=0.5,
init_epsilon=1.0,
min_epsilon=0.1,
epsilon_decay_ratio=0.9,
n_episodes=3000):
nS, nA = env.observation_space.n, env.action_space.n
pi_track = []
#2개의 Q 값 생성
Q1 = np.zeros((nS, nA), dtype=np.float64)
Q2 = np.zeros((nS, nA), dtype=np.float64)
Q_track1 = np.zeros((n_episodes, nS, nA), dtype=np.float64)
Q_track2 = np.zeros((n_episodes, nS, nA), dtype=np.float64)
select_action = lambda Q, state, epsilon np.argmax(Q[state]) if np.random.random() > epsilon else np.random.randint(len(Q[state]))
alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
epsilons = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)
for e in tqdm(range(n_episodes), leave=False):
state, done = env.reset(), False
while not done:
# 2개의 Q 값에 대한 평균으로 Q값을 형성하고 선택함.
action = select_action(state, (Q1+Q2)/2, epsilon[e])
next_state, reward, done, _ = env.step(action)
# 2개중 1개의 Q 값을 랜덤으로 선택하여 업데이트함.
if np.random.randint(2):
argmax_Q1 = np.argmax(Q1[state])
#이때 다른 Q 값을 보고 가져와서 업데이트함.
td_target = reward + gamma * Q2[next_state][argmax_Q1] * (not done)
td_error = td_target - Q1[state][action]
Q1[state][action] = Q1[state][action] + alphas[e] * td_error
else:
argmax_Q2 = np.argmax(Q2[state])
#이때 다른 Q 값을 보고 가져와서 업데이트함.
td_target = reward + gamma * Q1[next_state][argmax_Q2] * (not done)
td_error = td_target - Q2[state][action]
Q2[state][action] = Q2[state][action] + alphas[e] * td_error
state = next_state
Q_track1[e] = Q1
Q_track2[e] = Q2
pi_track.append(np.argmax((Q1+Q2)/2), axis=1))
Q = (Q1 + Q2) / 2.0
V = np.max(Q, axis = 1)
pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
return Q, V, pi, (Q_track1 + Q_track2)/2., pi_track
정리
- Control problem을 위해 Q-function을 prediction 하는 방법을 배웠다.
- Q-function을 prediction 하는 method로 MC-control, SARSA, Q-learning, Double Q-learning을 배웠다.
- MC-control 은 MC 방법 기반으로 variance가 크고 에피소드 끝까지 기다려야 하는 단점이 있지만 bias가 다른 적다.
- SARSA 는 TD 방법 기반으로 on-policy 방법이다. 이유는 현재 policy로 다음 state에 대한 action을 선택하고 평가하고 Q 값을 업데이트 하기때문이다. on-policy 방법은 off-policy 방법에 비해 안정적이다. 하지만 직접 경험해야 하는 문제가 있다.
즉 다른 agent의 평가값을 반영하지 않는다.
- Q-learning은 TD 방법 기반으로 off-policy 방법이다. 이유는 다음 state에 대한 action은 현재 policy를 따르지 않고 max Q 값만을 이용해 학습하기 때문이다. off-policy 방법은 불안정하지만 다른 agent의 평가값을 반영할 수 있기 때문에 학습 데이터를 이용하는데 유연하다.
- Double Q-learning은 TD 방법 기반이고 off-policy 방법이다. Q-learning의 단점인 overestimation, maximization bias 문제를 해결하기 위해 2개 이상의 Q 값을 이용해 평균을 내서 평가하는 방법이다.
- State transition matrix, 모든 State, Reward를 모르는 상황이다.
-> Agent가 직접 탐색을 해야하는 상황임.
- 내가 관심있는 강화 학습은 Model free 환경이다.
-> 현실 세계는 Model free라 생각한다. 현실은 매우 다양한 환경요소 때문에 확률적으로 매우 분산이 큼.
- Model free 환경에서 현재 policy에 따르는 Value Function을 평가해야 한다.
-> Value Function을 구하면 현재 policy를 따랐을 때 얻을 수 있는 expected return 값을 매 state 마다 구할 수 있다.
Model free 환경에서 policy를 평가하는 2가지 방법
Monte Carlo prediction(MC)
- First-visit MC
1. Value function은 현재 state부터 termianl state까지의 감가율을 적용한 expected return 값이다.
2. MC 방법을 통해 무한대에 가까운 시뮬레이션을 할 경우 Value function 값은 현재 policy의 value function에 수렴한다. (하지만 수렴이 힘들고 분산이 클 것임 -> state, action, reward, next state로 구성된 tuple 집단이 유사한 trajectory를 구하는 것은 힘들다는 뜻)
많은 Terminal state, 많은 trajactory, 실제 환경에서는 얼마나 많을까..?
3. 좀 더 효율적으로 계산하기 (모든 trajectory를 기억하지 않고, episode가 끝날 때마다 update 하기)
4. 독립적이면서 동일한 분포(independent and identically distributed, IID)에서 trajectory를 형성함으로써 무한개의 tracjtory value-function이 수렴하는 것을 확인 가능.
- Every-visit MC
1. First-visit MC를 공부하다보면 1개의 trajectory에서 2번 이상 같은 state에 방문한다면 어떻게 해야할까? First-visit MC 방법을 고집할까? 아니면 다시 계산할까?
2. 이런 문제를 다루기 위해 Every-visit MC가 있다.
3. IID 는 아니지만 수렴함을 확인함.
4. trajectory 생성 함수
def generate_trajectory(pi, env, max_steps=200):
done, trajectory = False, []
while not done:
state = env.reset()
for t in range(max_steps)
action = pi(state)
next_state, reward, done, _ = env.step(action)
experience = (state, action, reward, next_state, done)
trajectory.append(experience)
if done:
break
state = next_state
return np.array(trajectory)
5. MC prediction 함수
def mc_prediction(pi,
env,
gamma=1.0,
init_alpha=0.5,
min_alpha=0.01,
alpha_decay_ratio=0.5,
n_episodes=500,
max_steps=200,
first_visit=True):
#모든 state의 갯수를 가져옴
nS = env.observation_space.n
#감가 계수 미리 계산 0.99 ~ 0.3695
discounts = np.logspace(0, max_steps, num=max_steps, base=gamma, endpoint=False)
#decay하는 alpha 값도 미리 계산
alpha = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
#value-function 초기화
V = np.zeros(nS, dtype=np.float64)
#episode 별 value-function을 보기위함.
V_track = np.zeros((n_episodes, nS), dtype=np.float64)
#dictionary로 state 별 value function을 저장함.
targets = {state:[] for state in range(nS)}
for e in tqdm(range(n_episodes), leave=False):
#1개의 trajectory 생성
trajectory = generate_trajectory(pi, env, max_steps)
#방문 여부 판단용 변수
visited = np.zeros(nS, dtype=np.bool)
#1개의 trajectory 순회
for t, (state, _, reward, _, _) in enumerate(trajectory):
#현재 state 방문 여부 & first_visit 모드인 경우 continue
if visited[state] and first_visit:
continue
#현재 state는 첫 방문이거나, every_visit 모드 인 경우 수행
visited[state] = True
#현재 상태부터 종료 상태까지 step 갯수를 샘
n_steps = len(trajectory[t:])
#Discount를 적용한 return 값을 계산함.
G = np.sum(discounts[:n_steps] * trajectory[t:, 2])
#targets 자료구조에 Discount를 적용한 return 값 저장
targets[state].append(G)
#mc error 계산 (현재 tracjtory에서 나온 value function - 기존의 value function)
mc_error = G - V[state]
#alpha 감가율을 적용하여 mc_error 적용
V[state] = V[state] + alphas[e] * mc_error
#trajectory 별 value function의 변화를 확인하기 위해 저장
V_track[e] = V
#최종 Value function, trajectory 별 value function, state 별 추론한 모든 기대값 반환
return V.copy(), V_track, targets
6. MC prediction의 단점은 에피소드가 끝날 때까지 기다려야함. (그렇다면 무한 에피소드인 경우??)
7. 장점은 bias에 강함
Temporal-Difference (TD)
1. 단일 step 에서 얻은 Reward 만을 이용하여 Value-function을 추론한다.
2. MC는 에피소드가 끝날때까지 기다려야 하지만 TD는 즉각적으로 계산한다.(bootstrapping)
- bootstrapping
현재의 값을 이용해 미래의 값을 추정하는 방법.
3. TD의 Value-function 구하기 공식
4. Value-function이 현재 state로 부터 기대할 수 있는 return 값이라 하는 것은 변함없고 다음 step의 reward와 감가율을 적용한 공식을 재귀적으로 문제를 해결하였다.
5. TD target과 TD error
- TD target 은 현재 Value-function이 근사 해야하는 값이다.
- TD error 는 현재 Value-function과 TD target의 오차를 의미한다.
def td(pi,
env,
gamma=1.0,
init_alpha=0.01,
min_alpha=0.01,
alpha_decay_ratio=0.5,
n_episodes=500):
#state space 취득
nS = env.observation_space.n
#Value funcion 초기화
V = np.zeros(nS, dtype=np.float64)
V_track = np.zeros((n_episodes, nS), dtype=np.float64)
#TD target 초기화
targets = {state:[] for state in range(nS)}
#감소하는 alpha값 미리 계산
alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
for e in tqdm(range(n_episodes), leave=False):
state, done = env.reset(), False
while not done:
#현재 state을 policy에 따라 action
action = pi(state)
#다음 state, reward, done 여부 취득
next_state, reward, done, _ = env.step(action)
#td target 계산
td_target = reward + gamma * V[next_state] * (not done)
#state에 따르는 td_target 저장
targets[state].append(td_target)
#td_error 계산
td_error = td_target - V[state]
#td_error에 alpha 값 적용후 value-function 없데이트
V[state] = V[state] + alphas[e] * td_error
#다음 state로 update
state = next_state
#V-function의 변화 양상 저장
V_track[e] = V
return V, V_track
- n-step TD
- MC 방법과 TD 방법은 서로 극단적인 성향을 가지고있다.
- MC는 episode가 끝나야하고, TD는 1단계의 step만 수행하고 가치를 평가한다.
- 결과적으로 MC는 low bias, high variance, TD는 high bias, low variance 하게 가치를 평가한다. (무한한 샘플링이 가능하다면 둘다 결국 수렴함.)
- MC와 TD 사이의 방법중 하나로 n-step TD가 있다.
간단하게 현재 state의 value function 값을 2개 step 이상의 discount factor가 적용된 reward로 적용하는 것이다.
def n_step_td(
pi,
env,
gamma=1.0,
init_alpha=0.5,
min_alpha=0.01,
alpha_decay_ratio=0.5,
n_step=3,
n_episodes=500):
#state 공간
nS = env.observation_space.n
#Value function 초기화
V = np.zeros(nS, dtype=np.float64)
#episode 별 value function 히스토리 저장
V_track = np.zeros((n_episodes, nS), dtype=np.float64)
#에피소드별 감소하는 alpha 값 미리 계산
alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_epsidoes)
#Discount factor 미리 계산
discount = np.logspace(0, n_step + 1, num=n_step + 1, base=gamma, endpoint=False)
for e in tqdm(range(n_episodes), leave=False):
#state, done 여부, n-step 저장을 위한 path 초기화
state, done, path = env.reset(), False, []
#episode가 끝날때까지 실행
while not done or path is not None:
path = path[1:]
#n-step만큼 step 할때까지 반복 실행
while not done and len(path) < n_step:
action = pi(state)
next_state, reward, done, _ = env.step(action)
experience = (state, reward, next_state, done)
path.append(experience)
state = next_state
if done:
break
# 현재 path에 저장한 경험의 수
n = len(path)
# 평가할 state
est_state = path[0][0]
# path에 저장해왔던 reward만 추출
rewards = np.array(path)[:,1]
# 모든 reward에 discount factor 적용
partial_return = discount[:n] * rewards
# 다음 state의 value function값
bs_val = discounts[-1] * V[next_state] * (not done)
# td target 구하기, discount factor를 적용한 reward에 다음 state의 value function 합하기
ntd_target = np.sum(np.append(partial_return, bs_val))
# td error 구하기
ntd_error = ntd_target - V[est_state]
# td error에 현재 episode에 맞는 alpha 값 적용 후 저장
V[est_state] = V[est_state] + alphas[e] * ntd_error
if len(path) == 1 and path[0][3] == True:
path = None
V_track[e] = V
return V, V_track
- forward-view TD(λ)
- MC 방법과 TD 방법을 섞은 방법이다.
- λ 값을 0 보다 크고 1 보다 작은 사이의 값을 두면 MC 방법과 TD 방법을 동시에 가중치를 두고 쓰는 방법이다.
- λ 값이 0 이면 MC의 방법을, 1 이면 TD의 방법을 사용하는 것이다.
- 하지만 이 방법은 MC 방법과 동일한 문제점인 value function을 추정하는데 있어서 에피소드가 끝날때까지 기다려야하는 문제가 있다.
- backward-view TD(λ)
- forward-view TD 와 달리 eligibility trace 를 이용하여 MC와 TD의 개념을 섞어서 사용함
def td_lambda(
pi,
env,
gamma=1.0,
init_alpha=0.5,
min_alpha=0.01,
alpha_decay_ratio=0.5,
lambda=0.3,
n_epsidoes=500)
# 모든 state 취득
nS = env.observation_space.n
# Value function 초기화
V = np.zeros(nS, dtype=np.float64)
# episode 별 변하는 Value function 초기화
V_track = np.zeros((n_episodes, nS), dtype=np.float64)
# eligibility trace 배열 초기화
E = np.zeros(nS, dtype=np.float64)
# episode 별 감소하는 alphas 미리 계산
alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
for e in tqdm(range(n_episodes), leave=False):
# episode를 재시작 할때마다 eligibility trace를 0으로 초기화
E.fill(0)
# env reset
state, done = env.reset(), False
while not done:
action = pi(state)
next_state, reward, done, _ = env.step(action)
#td target 계산
td_target = reward + gamma * V[next_state] * (not done)
#td error 계산
td_error = td_target - V[state]
#현재 방문한 state에 대해서 eligibility trace 배열에 1을 더함
E[state] += 1
#현재 state에서 계산한 td error와 eligibility trace 배열을 모두 곱하고
#모든 state의 value-function에 적용
V = V + alphas[e] * td_error * E
# eligibility trace 배열 감가
E = gamma * lambda * E
state = next_state
V_track[e] = V
return V, V_track
- state를 재방문하는 횟수가 많을 수록 eligibility trace에서 배열의 값은 증가한다.
- 같은 state에 많이 방문 할 수록 큰 가중치로 업데이트 됨.
정리
- Model free 환경에서 value function을 prediction 하는 방법을 공부했다.
- 구하는 과정에서는 필연적으로 주어진 policy를 사용해서 trajectory를 형성해야 한다.
- 형성한 trajectory를 이용해서 MC 또는 TD 방법을 통해 value function을 prediction 했다.
- MC는 평가 하고자하는 state에서 시작하는 trajectory 를 여러개 형성하여 평균값을 통해 value function을 구했다. (bias 가 적은 대신 variance 가 큼)
- MC는 First visit, every visit 방법으로 나뉘고 두 방법은 모두 결론적으로 value-function이 수렴함을 볼 수 있다.
- TD는 평가 하고자 하는 state에서 바로 다음 state의 value function과 reward 만을 이용해서 value function을 구했다.(bias 가 큰 대신 variance 가 적음)
- TD는 n-step TD, forward-view TD(λ), backward-view TD(λ) 방법이 있는데 이 방법들은 모두 MC 방법의 특징과 TD 방법의 특징을 혼합하여 사용하기 위해서 사용되었다.