목표

- 기존 가치 기반 심층 강화학습 (NFQ, DQN, DDQN) 의 문제점을 이해한다.

- Q - function, A - function, V - function의 관계를 이해하고 Dueling DQN/DDQN 강화학습 프레임워크를 이해한다.

- 기존 Replay buffer 에서 수집한 데이터에 대해서 우선순위(priority)를 어떻게 적용하고 데이터를 어떻게 샘플링 하는지 이해한다.

 

기존 가치 기반 심층 강화학습의 문제점(NFQ, DQN, DDQN)

문제점 Q - function을 대상으로 학습 한다.

- Q - function은 현재 State에서 Action을 했을때 기대할 수 있는 Discount factor가 적용된 return 값이다.

- 하지만 Q - function은 아래식처럼 V-function, A-function 들과 관계를 가지고있다. 

- 기존의 DQN/DDQN은 아래의 공식만을 이용해서 학습했다.

- 이 방법의 문제점은 V-function과 A-function의 관계를 고려하지 않고 오직 Q-function 의 값만을 고려하여 학습한다는 의미이다. (결과적으로 정확한 Q-function 값을 찾기 힘들고, Bias가 커질수 있다는 뜻)

- 또한 어떤 행동을 선택해도 괜찮은 상태에서 가지는 Q-function값에 대한 기대치가 틀릴수있다. (비슷한 Q-function 값을 가져야 하는 상황인데에도 노이즈에 의해 편향이 생길 수 있다는 뜻)

 

Dueling 기법

- DNN의 출력을 V-function, A-function 으로 변형 시킨 방법이다.

- DQN, DDQN에 모두 적용 가능하다. (Dueling DQN, Dueling DDQN)

- 아래의 DNN의 구조로 출력하고 학습 시킨다. 

- 강화학습 Agent는 출력 레이어에서 출력된 V-function 값과 A-function 값을 보고 Q-function을 계산한다.

- 주의해야 할 점은 Q-function을  V-function 값과 A-function 값을 이용하여 계산할때 아래 방법 처럼 조금 다른 방법으로 계산한다. (왜 이렇게 하는지는 정확히 이해 못하겠음.. 책에서는 Q-function에 대한 추정치만 이용해서 V-function과 A-function 값을 완전히 복원할 방법이 없기 때문에 Q-function에 average A-function을 뺀다고 설명함)

class FCDuelingQ(nn.Module):
    
    def __init__(self,
                 input_dim,
                 output_dim,
                 hidden_dims=(32,32)
                 activation_fc=F.relu):
        super(FCDuelingQ, self).__init__()
        self.activation_fc = activation_fc

        self.input_layer = nn.Linear(input_dim, hidden_dims[0])
        self.hidden_layer = nn.ModuleList()

        for i in range(len(hidden_dims)-1):
            hidden_layer = nn.Linear(hidden_dims[i], hidden_dims[i+1])
            self.hidden_layers.append(hidden_layer)
        
        #V-function을 위한 출력, 두 출력은 네트워크를 공유함.
        self.output_value = nn.Linear(hidden_dims[-1], 1)

        #A-function을 위한 출력 , 두 출력은 네트워크를 공유함.
        self.output_layer = nn.Linear(hidden_dims[-1], output_dim)

    def forward(self, state):
        x = state

        if not isinstance(x, torch.Tensor):
            x = torch.tensor(x, device=self.device,dtype=torch.float32)
            x = x.unsqueeze(0)

        x = self.activation_fc(self.input_layer(x))

        for hidden_layer in self.hidden_layers:
            x= self.activation_fc(hidden_layer(x))
        
        #출력 레이어에 A-function, V-function 전용으로 레이어를 따로 구성하여 출력함.
        a = self.output_layer(x)
        v = self.output_value(x).expend_as(a)

        #A-function 값과 V-function 값을 이용하여 Q-function 값 계산.
        q = v + a - a.mean(1,keepdim=True).expand_as(a)
        return q

 

PER(Prioritized Experience Replay)

- 기존 Replay buffer에서 경험 튜플 데이터를 효과적으로 샘플링 하는 방법이다.

- 여기서 효과적이란 강화학습 Agent가 학습을 잘하게 한다를 의미한다.

- 학습을 잘한다는 것은 높은 Return을 받는것도 있지만 안좋은 상태를 인지하는 방법도 효과적인 방법이다.

- PER 기법에서는 TD error 값의 절대값이 큰 것이 중요한 경험이라 한다.

- 이를 개산하고 경험 튜플을 아래와 같이 구성하여 replay buffer에 저장한다.

- 그다음 TD error의 값의 크기 순으로 정렬하여 순차대로 데이터를 가져오는 방법이다.

TD error 에 따르는 우선순위를 적용한 확률적 샘플링 방법

- 위의 방법은 노이즈에 매우 취약함으로 TD error 값에 따르는 확률 특성을 적용하는 방법이다.

- 이때 ε 값은 0 인 상황을 막기 위해 매우 작은 값을 넣는다.

- 알파값은 α 값은 우선순위 확률 적용 가중치를 의미한다. 0이면 랜덤 샘플링과 동일하게 수행하고, 1이면 우선순위가 TD error에 따라 비확률적으로 샘플링 한다는 뜻이다. 따라서 0 과 1사이의 값으로 설정해야한다.

 

TD error 에 따르는 Rank를 적용한 확률적 샘플링 방법

- 위의 확률적 샘플링 방법은 특정 TD error 값이 매우 크면 해당 TD error 값에 대한 경험 데이터를 이용할 확률이 높다. 이는 노이즈에 의해 TD error 값이 매우 크게 나올경우 문제가 발생한다.

- 따라서 값에 따라 확률을 나눈다기 보다는 순서라는 값으로 스케일링하여 확률을 적용하는 방법이다.

class PrioritizedReplayBuffer():

    def store(self, sample):
        priority = 1.0
        
        if self.n_entries > 0:
            priority = self.memory[:self.n_entries].max()
            
        self.memory[self.next_index, self.td_error_index] = priority
        self.memory[self.next_index, self.sample_index] = np.array(sample)

        self.n_entries = min(self.n_entries + 1, self.max_samples)

        self.next_index += 1
        self.next_index = self.next_index % self.max_samples

    def update(self, idx, td_errors):
        self.memory[idx, self.td_error_index] = np.abs(td_errors)

        if self.rank_based:
            sorted_arg = self.memory[:self.n_entries, self.td_error_index].argsort()[::-1]
            self.memory[:self.n_entries] = self.memory[sorted_arg]

    def sample(self, batch_size=None):
        
        batch_size = self.batch_size if batch_size == None else batch_size
        self._update_beta()
        entries = self.memory[:self.n_entries]
        if self.rank_based:
            priorities = 1/(np.arange(self.n_entries) + 1)
        else:
            priorities = entries[:, self.td_error_index] + EPS  
        scaled_priorities = priorities**self.alpha

        probs = np.array(scaled_priorities/np.sum(scaled_priorities), dtype=np.float64)

        weights = (self.n_entries * probs)**-self.beta

        normalized_weights = weights/weights.max()

        idxs = np.random.choice(self.n_entries, batch_size, replace=False,p=probs)

        samples = np.array([entries[idx] for idx in idxs])

        samples_stacks = [np.vstack(batch_size) for batch_type in np.vstack(samples[:, self.sample_index]).T]

        idxs_stack = np.vstack(idxs)

        weights_stack = np.vstack(normalized_weights[idxs])

        return idxs_stack, weights_stack, samples_stacks
class PER():
    def optimize_model(self, experiences):
         idxs, weights, (states, actions, rewards, next_states, is_terminals) = experiences
         weights = self.online_model.numpy_float_to_device(weights)
         batch_size = len(is_terminals)

         argmax_a_q_sp = self.onlinde_model(next_states).max(1)[1]
         q_sp = self.target_model(next_states).detach()
         max_a_q_sp = q_sp[np.arange(batch_size), argmax_a_q_sp].unsqueeze(1)

         target_q_sa = rewards + (self.gamma * max_a_q_sp * (1 - is_terminals))

         q_sa = self.onlinde_model(states).gather(1, actions)

         td_error = q_sa - target_q_sa

         value_loss = (weights * td_error).pow(2).mul(0.5).mean()
         self.value_optimizer.zero_grad()
         value_loss.backward()
         torch.nn.utils.clip_grad_norm_(self.onlinde_model.parameters(), self.max_gradient_norm)

         self.value_optimizer.step()

         priorities = np.abs(td_error.detach().cpu().numpy())

         self.replay_buffer.update(idxs, priorities)
    
    def train(self, make_env_fn, make_env_kargs, seed, gamma, max_minutes, max_episodes, goal_mean_100_reward):
        
        for episode in range(1, max_episodes + 1):
            for step in count():
                state, is_terminal = self.interaction_step(state, env)

                if len(self.replay_buffer) > min_samples:
                    experiences = self.replay_buffer.sample()
                    idxs, weights, samples = experiences
                    experiences = self.online_model.load(samples)

                    experiences = (idxs, weights) + (experiences,)

                    self.optimize_model(experiences)
                
                if np.sum(self.episode_timestep) % self.update_target_every_steps == 0:
                    self.update_network()
                
                if is_terminal:
                    break

 

정리

- 기존 DQN, DDQN의 문제점은 Q-function을 V-function과 A-function을 고려하여 추정하지 않았다. 

- Dueling 기법은 강화학습 Agent의 DNN 구조를 V-function, A-function을 추론하여 계산하도록 변경하여 기존의 문제점을 개선하였다.

- 기존 Replay buffer 를 Random 샘플링 방법은 효과적이지 않았다.

- PER 기법은 Replay Buffer에서 TD error 값의 절대치가 큰값에 순위를 적용하여 TD error가 큰 값을 데이터로 추출하여 Agent의 DNN을 학습 시켰다. 

 

 

+ Recent posts