소개

 

강화학습(Reinforcement Learning; RL)은 주어진 보상 함수를 통해 최적 정책을 계산한다.

역강화학습(Inverse Reinforcement Learing; IRL)은 최선의 행동 이력(최적 정책)을 입력으로 보상 함수를 찾는다.

 

모방학습(Imitation Learning; IL)은 전문가의 행동을 모방하는 순차적 작업을 찾는다. 전문가가 최적 정책을 직접적으로 설계하여 전문가가 원하는 행동을 쉽게 발현 시키는데 장점이 있다.

 

 

모방학습

1. 행동복제(Behavior Cloning; BC)

- 전문가를 통해 쌍으로 이뤄진 (상태-행동) 쌍에 대한 시퀀스 궤적(Demonstration Trajectory)를 수집하여 정책을 지도학습한다. 

- 지도학습 개념을 이용하기 때문에 복잡한 시퀀스 궤적을 학습시킬 때에는 많은 데이터가 필요하고, 테스트가 누적될수록 오차가 커지는 문제가 발생한다.

- 전문가에 의해 수동적으로 데이터를 수집하기 때문에 양적, 질적 한계가 존재하며, 누락된 데이터 쌍에 대해서는 성능이 현저하게 저하되는 문제가 발생한다.

2. 견습학습(Apprenticeship Learning, AL)

- 전문가를 통해 쌍으로 이뤄진 (상태-행동) 쌍에 대한 시퀀스 궤적(Demonstration Trajectory)을 수집하여 보상 함수를 만들고 계산된 보상함수를 통해 최적의 정책을 학습하는 알고리즘이다.

- IRL과 연계하면 BC에 비해 적은 데이터로 학습이 가능하고 예상치 못한 환경 대응에 강인하다.

- 전문가와 학습에이전트의 기대치집합으로 부터 계산된 성능 차이를 최소화하는 과정을 통해 보상값을 찾고, 이를 RL에 적용하여 최적 정책을 업데이트한다.

- 성능 차이가 임계치 이하로 수렴하면 학습을 종료한다.

 

 

3. IRL 알고리즘 종류

1. ALIRL

- TODO..

2. MaxEnt IRL

- TODO..

3. GCL

- TODO..

4. GAIL

- TODO..

5. VAIL

- TODO..

6. InfoGAIL

- TODO..

7. TD-GAIL(개인적으로 추가)

- 로봇에 적용한 논문 : Virtual Imitation Learning method based on TD3-GAIL for robot manipulator

- DOI : http://doi.org/10.5370/KIEE.2021.70.1.145

결론

- 최근 강화학습은 가상 시뮬레이션 환경의 연구 단계에서 자율 주행, 자연어 처리, 추천 시스템, 질병 진단 등 광범위한 응용 단계롤 확장되고 있음.

- 하지만 강화학습은 복잡한 실세계 환경에서 활용 가능성이 낮음. (더욱 많은 연구 필요)

- 역강화학습은 전문가의 시연 데이터를 통해 기존 강화학습 보다 좀 더 정확하고 세밀하게 목표 임무를 수행함.

- 특히 역강화학습은 인공 일반 지능(Artificial General Intelligence; AGI)  연구의 주요 핵심 기술이 될 것으로 기대됨.

 

DOI : 10.22648/ETRI.2019.J.340609

 

목표

- 기존 가치 기반 심층 강화학습 (NFQ, DQN, DDQN) 의 문제점을 이해한다.

- Q - function, A - function, V - function의 관계를 이해하고 Dueling DQN/DDQN 강화학습 프레임워크를 이해한다.

- 기존 Replay buffer 에서 수집한 데이터에 대해서 우선순위(priority)를 어떻게 적용하고 데이터를 어떻게 샘플링 하는지 이해한다.

 

기존 가치 기반 심층 강화학습의 문제점(NFQ, DQN, DDQN)

문제점 Q - function을 대상으로 학습 한다.

- Q - function은 현재 State에서 Action을 했을때 기대할 수 있는 Discount factor가 적용된 return 값이다.

- 하지만 Q - function은 아래식처럼 V-function, A-function 들과 관계를 가지고있다. 

- 기존의 DQN/DDQN은 아래의 공식만을 이용해서 학습했다.

- 이 방법의 문제점은 V-function과 A-function의 관계를 고려하지 않고 오직 Q-function 의 값만을 고려하여 학습한다는 의미이다. (결과적으로 정확한 Q-function 값을 찾기 힘들고, Bias가 커질수 있다는 뜻)

- 또한 어떤 행동을 선택해도 괜찮은 상태에서 가지는 Q-function값에 대한 기대치가 틀릴수있다. (비슷한 Q-function 값을 가져야 하는 상황인데에도 노이즈에 의해 편향이 생길 수 있다는 뜻)

 

Dueling 기법

- DNN의 출력을 V-function, A-function 으로 변형 시킨 방법이다.

- DQN, DDQN에 모두 적용 가능하다. (Dueling DQN, Dueling DDQN)

- 아래의 DNN의 구조로 출력하고 학습 시킨다. 

- 강화학습 Agent는 출력 레이어에서 출력된 V-function 값과 A-function 값을 보고 Q-function을 계산한다.

- 주의해야 할 점은 Q-function을  V-function 값과 A-function 값을 이용하여 계산할때 아래 방법 처럼 조금 다른 방법으로 계산한다. (왜 이렇게 하는지는 정확히 이해 못하겠음.. 책에서는 Q-function에 대한 추정치만 이용해서 V-function과 A-function 값을 완전히 복원할 방법이 없기 때문에 Q-function에 average A-function을 뺀다고 설명함)

class FCDuelingQ(nn.Module):
    
    def __init__(self,
                 input_dim,
                 output_dim,
                 hidden_dims=(32,32)
                 activation_fc=F.relu):
        super(FCDuelingQ, self).__init__()
        self.activation_fc = activation_fc

        self.input_layer = nn.Linear(input_dim, hidden_dims[0])
        self.hidden_layer = nn.ModuleList()

        for i in range(len(hidden_dims)-1):
            hidden_layer = nn.Linear(hidden_dims[i], hidden_dims[i+1])
            self.hidden_layers.append(hidden_layer)
        
        #V-function을 위한 출력, 두 출력은 네트워크를 공유함.
        self.output_value = nn.Linear(hidden_dims[-1], 1)

        #A-function을 위한 출력 , 두 출력은 네트워크를 공유함.
        self.output_layer = nn.Linear(hidden_dims[-1], output_dim)

    def forward(self, state):
        x = state

        if not isinstance(x, torch.Tensor):
            x = torch.tensor(x, device=self.device,dtype=torch.float32)
            x = x.unsqueeze(0)

        x = self.activation_fc(self.input_layer(x))

        for hidden_layer in self.hidden_layers:
            x= self.activation_fc(hidden_layer(x))
        
        #출력 레이어에 A-function, V-function 전용으로 레이어를 따로 구성하여 출력함.
        a = self.output_layer(x)
        v = self.output_value(x).expend_as(a)

        #A-function 값과 V-function 값을 이용하여 Q-function 값 계산.
        q = v + a - a.mean(1,keepdim=True).expand_as(a)
        return q

 

PER(Prioritized Experience Replay)

- 기존 Replay buffer에서 경험 튜플 데이터를 효과적으로 샘플링 하는 방법이다.

- 여기서 효과적이란 강화학습 Agent가 학습을 잘하게 한다를 의미한다.

- 학습을 잘한다는 것은 높은 Return을 받는것도 있지만 안좋은 상태를 인지하는 방법도 효과적인 방법이다.

- PER 기법에서는 TD error 값의 절대값이 큰 것이 중요한 경험이라 한다.

- 이를 개산하고 경험 튜플을 아래와 같이 구성하여 replay buffer에 저장한다.

- 그다음 TD error의 값의 크기 순으로 정렬하여 순차대로 데이터를 가져오는 방법이다.

TD error 에 따르는 우선순위를 적용한 확률적 샘플링 방법

- 위의 방법은 노이즈에 매우 취약함으로 TD error 값에 따르는 확률 특성을 적용하는 방법이다.

- 이때 ε 값은 0 인 상황을 막기 위해 매우 작은 값을 넣는다.

- 알파값은 α 값은 우선순위 확률 적용 가중치를 의미한다. 0이면 랜덤 샘플링과 동일하게 수행하고, 1이면 우선순위가 TD error에 따라 비확률적으로 샘플링 한다는 뜻이다. 따라서 0 과 1사이의 값으로 설정해야한다.

 

TD error 에 따르는 Rank를 적용한 확률적 샘플링 방법

- 위의 확률적 샘플링 방법은 특정 TD error 값이 매우 크면 해당 TD error 값에 대한 경험 데이터를 이용할 확률이 높다. 이는 노이즈에 의해 TD error 값이 매우 크게 나올경우 문제가 발생한다.

- 따라서 값에 따라 확률을 나눈다기 보다는 순서라는 값으로 스케일링하여 확률을 적용하는 방법이다.

class PrioritizedReplayBuffer():

    def store(self, sample):
        priority = 1.0
        
        if self.n_entries > 0:
            priority = self.memory[:self.n_entries].max()
            
        self.memory[self.next_index, self.td_error_index] = priority
        self.memory[self.next_index, self.sample_index] = np.array(sample)

        self.n_entries = min(self.n_entries + 1, self.max_samples)

        self.next_index += 1
        self.next_index = self.next_index % self.max_samples

    def update(self, idx, td_errors):
        self.memory[idx, self.td_error_index] = np.abs(td_errors)

        if self.rank_based:
            sorted_arg = self.memory[:self.n_entries, self.td_error_index].argsort()[::-1]
            self.memory[:self.n_entries] = self.memory[sorted_arg]

    def sample(self, batch_size=None):
        
        batch_size = self.batch_size if batch_size == None else batch_size
        self._update_beta()
        entries = self.memory[:self.n_entries]
        if self.rank_based:
            priorities = 1/(np.arange(self.n_entries) + 1)
        else:
            priorities = entries[:, self.td_error_index] + EPS  
        scaled_priorities = priorities**self.alpha

        probs = np.array(scaled_priorities/np.sum(scaled_priorities), dtype=np.float64)

        weights = (self.n_entries * probs)**-self.beta

        normalized_weights = weights/weights.max()

        idxs = np.random.choice(self.n_entries, batch_size, replace=False,p=probs)

        samples = np.array([entries[idx] for idx in idxs])

        samples_stacks = [np.vstack(batch_size) for batch_type in np.vstack(samples[:, self.sample_index]).T]

        idxs_stack = np.vstack(idxs)

        weights_stack = np.vstack(normalized_weights[idxs])

        return idxs_stack, weights_stack, samples_stacks
class PER():
    def optimize_model(self, experiences):
         idxs, weights, (states, actions, rewards, next_states, is_terminals) = experiences
         weights = self.online_model.numpy_float_to_device(weights)
         batch_size = len(is_terminals)

         argmax_a_q_sp = self.onlinde_model(next_states).max(1)[1]
         q_sp = self.target_model(next_states).detach()
         max_a_q_sp = q_sp[np.arange(batch_size), argmax_a_q_sp].unsqueeze(1)

         target_q_sa = rewards + (self.gamma * max_a_q_sp * (1 - is_terminals))

         q_sa = self.onlinde_model(states).gather(1, actions)

         td_error = q_sa - target_q_sa

         value_loss = (weights * td_error).pow(2).mul(0.5).mean()
         self.value_optimizer.zero_grad()
         value_loss.backward()
         torch.nn.utils.clip_grad_norm_(self.onlinde_model.parameters(), self.max_gradient_norm)

         self.value_optimizer.step()

         priorities = np.abs(td_error.detach().cpu().numpy())

         self.replay_buffer.update(idxs, priorities)
    
    def train(self, make_env_fn, make_env_kargs, seed, gamma, max_minutes, max_episodes, goal_mean_100_reward):
        
        for episode in range(1, max_episodes + 1):
            for step in count():
                state, is_terminal = self.interaction_step(state, env)

                if len(self.replay_buffer) > min_samples:
                    experiences = self.replay_buffer.sample()
                    idxs, weights, samples = experiences
                    experiences = self.online_model.load(samples)

                    experiences = (idxs, weights) + (experiences,)

                    self.optimize_model(experiences)
                
                if np.sum(self.episode_timestep) % self.update_target_every_steps == 0:
                    self.update_network()
                
                if is_terminal:
                    break

 

정리

- 기존 DQN, DDQN의 문제점은 Q-function을 V-function과 A-function을 고려하여 추정하지 않았다. 

- Dueling 기법은 강화학습 Agent의 DNN 구조를 V-function, A-function을 추론하여 계산하도록 변경하여 기존의 문제점을 개선하였다.

- 기존 Replay buffer 를 Random 샘플링 방법은 효과적이지 않았다.

- PER 기법은 Replay Buffer에서 TD error 값의 절대치가 큰값에 순위를 적용하여 TD error가 큰 값을 데이터로 추출하여 Agent의 DNN을 학습 시켰다. 

 

 

목표

- MSE Loss, MAE Loss, Huber Loss 함수를 이해한다.

- 강화학습 관점에서 각 Loss 함수의 장단점을 이해한다. 

 

MSE(Mean Square Error) Loss 함수

- 평균 제곱 오차를 의미한다.

- 아래 그림은 강화학습에서 MSE를 적용했을때  TD error 값을 어떻게 처리했는지 알 수 있다.

- MSE Loss 함수 장점

    - Loss 값이 0에 가까울 수록 경사가 줄어드면서 backpropagation 시 이동시키는 거리가 작아짐으로 수렴을 도와준다.(최적화에 용이) 

- 강화학습에서 MSE Loss 함수의 단점

    - TD error 값의 절대값이 클 수록 Loss 값이 기하급수적으로 커진다. -> 모든 TD error 범위의 값에 대해서 평등한 Loss 계산이 안된다는 뜻이다. (모든 error에 대한 평등한 Loss 값 x)

MAE(Mean Absolute Error)  Loss 함수

- 평균 절대값 오차를 의미한다.

- 아래 그림은 강화학습에서 MAE를 적용했을때  TD error 값을 어떻게 처리했는지 알 수 있다.

- MAE Loss 함수 장점

   - 모든 error 값에 대해서 평등한 Loss 값 제공. (모든 error 값에 대해 공평함)

- 지도학습에서 MAE Loss 함수의 단점

   -  backpropagation시 수렴이 힘듬 (최적화가 힘듬)

Huber Loss 함수

- MSE Loss 함수의 장점과 MAE Loss 함수의 장점을 섞었다.

- 최적화에 용이 + 모든 error 값에 대해서 공평

- delta 라는 임계값을 두고 error 값을 delta 값과 비교하여 MSE Loss 함수와 MAE Loss 함수를 조건에 따라 쓰는 방법이다.

- delta 값이 0 이면 MAE Loss 함수만 쓰고, delta 값이 무한대이면 MSE Loss 함수만 쓰게 된다.

- 보통 delta 값은 1로 두고 사용한다.

 

정리

- 강화학습에서는 Huber Loss가 좋고 delta 값을 1로 두고 사용한다.

- MSE Loss 함수는 모든 error 값에 대해 공평하지 않다.

- MAE Loss 함수는 최적화 기법에 사용하기에는 힘들다.

- Huber Loss 함수는 delta 임계값을 두고 MSE Loss 함수와 MAE Loss 함수를 조건에 따라 바꿔가며 사용한다.

- Pytorch 모듈에는 이미 Huber Loss 함수가 구현되어 있다. [LINK]

목표

- Double Deep Q Network가 Deep Q Network의 어떤 문제를 어떻게 해결했는지 이해한다.

- Double Deep Q Network의 동작 방식을 이해하고 코드를 다룰 줄 안다.

 

DDQN의 특징

- Target network가 Online network의 행동을 평가한다.(max Q(s',a) 값을 평가한다)

- Double Q learning 은 2개의 Q table을 가지고 서로를 평가했지만 DNN 구조가 들어간 DDQN은 이미 target network, online network로 구분되어 있으므로 서로를 평가하는 방법으로 진행한다.

- 핵심은 행동( argmax(Q(s)))과 행동에 대한 평가( Q(s,argmax(Q(s)) ) 값을 분리했다는 것이다. 

def optimize_model(self, experiences):
    state, actions, reward, next_states, is_terminals = experiences
    batch_size = len(is_terminals)

    #online model에서 action index 취득
    argmax_a_q_sp = self.online_model(next_states).max(1)[1]

    
    q_sp = self.target_model(next_states).detach()

    #online model에서 취득한 action index를 target model에서 뽑은 Q(s', action index) 로 값을 평가.
    max_a_q_sp = q_sp[np.arange(batch_size), argmax_a_q_sp].unsqeeze(1)
    target_q_sa = rewards + (self.gamma * max_a_q_sp * (1 - is_terminals))
    q_sa = self.online_model(states).gather(1, actions)

    td_error = q_sa - target_q_sa
    value_loss = td_error.pow(2).mul(0.5).mean()
    self.value_optimizer.zero_grad()
    value_loss.backward()
    self.value_optimizer.step()

 

정리

- DDQN 기법을 통해 DQN의 overestimation 문제를 어느 정도 감소시켰다.

- 핵심은 행동( argmax(Q(s)))과 행동에 대한 평가( Q(s,argmax(Q(s)) ) 값을 분리 한 것이다.

목표

- Deep Q Network가 이전의 심층 강화학습의 어떤 문제를 어떻게 해결했는지 이해한다.

- Deep Q Network의 동작 방식을 이해하고 코드를 다룰 줄 안다.

- Deep Q Network의 한계점을 이해한다.

 

DQN trick 1 - Target Network

- 기존 NFQ의 문제점은 Neural Network 가 update 하면서 next state의 Q function값이 동시에 업데이트 하면서 결론적으로 불안정한 업데이트 Loss 값이 생긴다. 

- 새롭게 제안한 Target network를 도입한 update 식

- 학습할때에는 target network는 k번의 학습 스탭마다 current network의 파라미터를 복사하고, k 번의 학습 스탭 동안 current network의 파라미터 값만 업데이트 하는 방법이다.

- 복잡한 환경일 수록 많은 학습 스탭이 필요하다. (아타리 게임 같이 CNN이 포함되어있는 모델은 10,000스텝 단위, 카트폴 환경 같이 직관적인 환경은 10~20 스텝 단위)

def optimize_model(self, experiences):
    states, actions, rewards, next_states, is_terminals = experiences

    batch_size = len(is_terminals)

    #target model의 Q function 값을 이용
    max_a_q_sp = self.target_model(next_states).detach().max(1)[0].unsqueeze(1)

    target_q_sa = rewards + (self.gamma * max_a_q_sp * (1 - is_terminals))

    #online model의 Q function 값을 이용
    q_sa = self.online_model(states).gather(1, actions)

    td_error = q_sa - target_q_sa
    value_loss = td_error.pow(2).mul(0.5).mean()
    self.value_optimizer.zero_grad()
    value_loss.backward()
    self.value_optmizer.step()
   
def update_network(self):
    # 특정 학습 스텝이 지나면 online model의 파라미터를 target model의 파라미터로 복사함
    for target, online in zip(self.target_model.parameters(), self.online_model.parameters()):
        target.data.copy_(online.data)

DQN trick 2 - Experience replay

- 기존 NFQ에서 update에 사용한 데이터 셋은 IID 가정을 성립시키기 힘들었다.

- 업데이트 하는 미니배치 데이터셋은 모두 시간 관계성(순차적으로 들어오는 데이터)이 있었다.

- experience replay buffer 구조를 이용하여 IID 가정을 어느 정도 성립시키고, 데이터 튜플들간의 시간 관계성을 제거했다.

- 이 문제를 해결 하기위해 experience replay buffer 구조를 이용하여 state, action, reward, next state 튜플을 모두 저장하고 학습 Phase에서 experience replay buffer에서 mini batch 사이즈만큼 튜플을 샘플링하여 학습하는 방법을 사용했다.

 

experience replay buffer에 데이터를 모으는 과정

 

experience replay buffer에서 mini batch 사이즈만큼 샘플링 후 Network 학습

class ReplayBuffer():
    def __init__(self,
                 max_size=50000,
                 batch_size=64):
        self.ss_mem = np.empty(shape=(max_size), dtype=np.ndarray)
        self.as_mem = np.empty(shape=(max_size), dtype=np.ndarray)
        self.rs_mem = np.empty(shape=(max_size), dtype=np.ndarray)
        self.nss_mem = np.empty(shape=(max_size), dtype=np.ndarray)
        self.ds_mem = np.empty(shape=(max_size), dtype=np.ndarray)

        self.max_size = max_size
        self.batch_size = batch_size
        self._idx = 0
        self.size = 0

    def store(self, sample):
        s, a, r, ns, d = sample
        self.ss_mem[self._idx] = s
        self.as_mem[self._idx] = a
        self.rs_mem[self._idx] = r
        self.nss_mem[self._idx] = ns
        self.ds_mem[self._idx] = d
        
        self._idx += 1
        self._idx = self._idx % self.max_size
        self.size += 1
        self.size = min(self.size, self.max_size)
    
    def sample(self, batch_size=None):
        if batch_size == None:
            batch_size = self.batch_size
        
        idxs = np.random.choice(self.size, batch_size, replace=False)

        experiences = np.vstack(self.ss_mem[idxs]), np.vstack(self.as_mem[idxs]), \
                      np.vstack(self.rs_mem[idxs]), np.vstack(self.nss_mem[idxs]), \
                      np.vstack(self.ds_mem[idxs])
    
    def __len__(self):
        return self.size

DNN의 parameter 사이즈에 따르는 팁

- State의 미세한 변화에 따라 다른 action을 해야하는 환경일 수록 DNN의 parameter의 수가 많을 수록 좋다. (너무 많으면 학습 시간이 오래걸리므로 그 중간 단계를 찾을 것)

 

DQN의 한계점

- 기존 Q-learning의 문제 중 Overestimation에 의한 maximization bias 문제를 가지고있다.

- Action space가 discrete 하게 쓰이므로 한계점이 있다. (DNN의 출력이 classfication 과 비슷한 역할을 함으로) 

정리

- DQN은 기존 NFQ의 문제점을 해결하기 위해 2가지 트릭을 이용했다.

- 1. Experience replay : 경험을 매우큰 저장소에 저장하여 학습할때 샘플링하여 학습하는 방법

- 2. Target network : Target network를 고정시키고 Current network만 학습시키고 Current network가 지정한 학습 스탭수만큼 학습했다면 Target network에 복사시키는 방법.

- DQN은 최근 강화학습에서 매우 큰 영향을 끼친 방법이다. (DQN 논문 : Playing Atari with Deep Reinforcement Learning)

- 내 석사학위 논문도 DQN이 도와줬다 하하!!

목표

- 기존의 tabular 기반 MC, TD 방법의 한계점을 이해한다.

- Deep Neural Network를 이용한 Q - function근사화 방법을 이해하고 구현한다. (2 종류의 DNN)

- 기존의 TD 방법을 DNN의 학습 방법으로 매칭 시키며 이해하고 설명 가능해야 한다.

 

기존 MC-control, TD(Q-learning, SARSA) control의 한계점

- 기존의 MC-control, TD control은 table 형태로 저장하여 Value function 또는 Action value function을 업데이트하고 저장했다.

- 하지만 이 방법의 근본적인 문제는 table 형태로 모든 state에 대해서 각 Value function과 Action value function을 저장하고 있어야 한다는 문제점이 있었다.

- 따라서 State가 연속적인 값이거나 많은 State로 구성된 Environment인 경우에는 비현실적으로 매우 많은 state를 기억하고 있어야 하는 문제가 있었다.

 

Deep neural network의 도입

DNN 예시 사진

- 지도 학습에서 많이 쓰이는 Neural network 구조이다.

- 기존의 perceptron 메커니즘을 조합하여 구성한다.

Perceptron 예시

- 이번에 소개하는 NFQ는 2가지 타입의 DNN 구조를 이용하여 Q-function approximation을 한다.

Type1. State input, Q-function output
Type2. State, action input, Q-function output

- 두 타입 모두 추 후 사용하게 될 구조임으로 강화 학습에 DNN을 어떻게 적용했는지 숙지해야 한다.

- DNN Class 정의 코드 (pytorch 이용)

class FCQ(nn.Module):
    def __init__(self,
                input_dim,
                output_dim,
                hidden_dims=(32,32),
                activate_fc=F.relu):
        super(FCQ,self).__init__()

        # 활성화 함수 설정 (보통 ReLu 사용)
        self.activation_fc = activate_fc
        
        # 입력 레이어 정의
        self.input_layer = nn.Linear(input_dim, hidden_dims[0])

        # 은닉 레이어 설정 (여러 Layer 등록을 위해 List로 정의)
        self.hidden_layer =nn.ModuleList()

        # 반복문을 이용하여 여러 Layer를 등록함.
        for i in range(len(hidden_dims) - 1):
            hidden_layer=nn.Linear(hidden_dims[i], hidden_dims[i+1])
            self.hidden_layer.append(hidden_layer)

        # 출력 레이어 정의
        self.output_layer   = nn.Linear(hidden_dims[-1], output_dim)

    # 모델에 입력시 사용할 함수
    def forward(self, state):

        # state 입력
        x = state

        # state 값이 torch.Tensor 타입이 아닐 경우 변환
        if not ininstance(x, torch.Tensor):
            x = torch.Tensor(x, device=self.device, dtype=torch.float32)
            x = x.unsqeeze(0)
        
        # 입력레이어 통과후 활성화 함수 적용
        x = self.activation_fc(self.input_layer(x))
        
        # 은닉레이어 통과후 활성화 함수 적용 반복
        for hidden_layer in self.hidden_layers:
            x = self.activate_fc(hidden_layer(x))
        
        # 출력레이어 통과
        x = self.output_layer(x)

        # 결과값 출력 
        return x

- 다음은 Loss 함수에 대한 설명이다. 간단하게 MSE(mean squared error)를 사용한다.

# 다음 state의 Q function을 모두 가져옴
q_sp = self.online_model(next_states).detach()

# 다음 state의 Q function들 중 최대값을 가져옴
max_a_q_sp = q_sp.max(1)[0].unsqeeze(1)

# Q function target을 계산함.
target_q_s = rewards + self.gamma * max_a_q_sp * (1 - is_terminals)

# 현재 state의 Q function 값을 가져옴. 이때 모델은 입실론 그리디 정책으로 행동을 선택을 기준으로함. 
q_sa = self.online_model(state).gather(1, actions)

# E [ (Q function target - Q function) ^ 2 ] 
MSELoss(target_q_s, q_sa)

- 입실론 그리디 정책을 기반으로 학습시킨다.

class EGreedyStraregy():
    ...

    def select_action(self, model, state):
        self.exploratory_action_take = False
        with torch.no_grad():

            q_values = model(state).cpu().detach().data.numpy().squeeze()
        
        if np.random.rand() > self.epsilon:
            action = np.argmax(q_values)
        else:
            action = np.random.randint(len(q_values))

        return action

- Optimizer 함수는 Adam, RMSProp 등이 있다. 

- 주로 미니 배치 단위로 S, A, R, S'을 모으고 배치 단위로 학습시킨다.

 

정리

NFQ의 한계점(중요)

- 동일한 DNN을 이용하여 학습을 시킨다. 즉 현재 state, action 값에 대해서 학습을 하면서 다른 state, action값의 바뀐다. 

미니 배치 단위로 학습을 하면서 target으로 하는 state, action 짝만 학습시키려고 함으로써 다른 비슷한 state, action 값이 안정적으로 수렴하지 힘들다.

 

- IID(independent and identically distribution) 가정을 지키기 못한다. 미니 배치 단위로 모델을 학습시킨다. 하지만 미니배치 학습을 주기로 매번 Q-function 값이 달라지게된다. 이는 매 주기마다 수집하는 미니배치 샘플들이 서로 다른 분포에서 샘플링된다는 뜻이다. 

 

- 미니배치 데이터들이 시간 관계성이 있다. DNN 모델을 시간 관계가 있는 데이터를 기반으로 학습 시킨다면 오버피팅 된다. (Q-function approximation 기능을 RNN이나 LSTM으로 대체 한다면 괜찮을 지도..?)

Model free 환경에서 Agent를 이용하여 Q(S,A) 값을 구해보자! 

 

알아야 할 개념.

- MC, TD prediction

  현재 Model free 환경에서 주어진 policy를 이용해서  Value function을 평가하는 방법이다.

- Value function - V(S)

  현재 State로 부터 Terminal State 까지 도달 했을때 기대할 수 있는 감가율이 적용된 총 보상값

- Action value function - Q(S,A)

  현재 State에서 Action을 함으로써 Terminal State 까지 도달 했을때 기대할 수 있는 감가율이 적용된 총 보상값

 

MC-control

- MC prediction 방법과 동일하게 episode 를 모두 진행시켜보고 cumulative reward 기반으로 Q(S,A) 값을 update 하는 방법이다.

- policy는 주로 입실론 그리디 방법을 사용한다.

import numpy as np

def decay_schedule(
    init_value, min_value,
    decay_ratio, max_steps,
    log_start=-2, log_base=10):

    decay_steps = int(max_steps * decay_ratio)
    rem_steps = max_steps - decay_steps
    values = np.logspace(log_start, 0, decay_steps, base=log_base, endpoint=True)[::-1]
    values = (values - values.min()) / (values.max() - values.min())
    values = (init_value - min_value) * values + min_value
    values = np.pad(values, (0, rem_steps), 'edge')
    return values
    
def generate_trajectory(
    select_action, Q, epsilon,
    env, max_steps=200):
    
    done, trajectory = False, []
    while not done:
        state = env.reset()
        for t in range(max_steps):
            action = select_action(state, Q, epsilon)
            next_state, reward, done, _ = env.step(action)
            experience = (state, action, reward, next_state, done)
            trajectory.append(experience)
            
            if done:
                break
    
            state = next_state
    return np.array(trajectory)
def mc_control(
    env,
    gamma=1.0,
    init_alpha=0.5,
    min_alpha=0.01,
    alpha_decay_ratio=0.5,
    init_epsilon=1.0,
    min_epsilon=0.1,
    epsilon_decay_ratio=0.9,
    n_episodes=3000,
    max_steps=200,
    first_visit=True):

    nS, nA = env.observation_space.n, env.action_space.n
    discounts = np.logspace(0, max_steps, num=max_steps, base=gamma, endpoint=False)
    alphas = decay_schedule(init_alpha, min_alpha,alpha_decay_ratio, n_episodes)
    epsilons = decay_schedule(init_epsilon, min_epsilon,epsilon_decay_ratio, n_episodes)

    pi_track = []
    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)

    select_action = lambda state, Q, epsilon : np.argmax(Q[state]) if np.random.random() > epsilon else np.random.randint(len(Q[state]))

    for e in tqdm(range(n_episodes), leave=False):
        trajectory = generate_trajectory(select_action=select_action, Q=Q, epsilon=epsilons[e], env, max_steps)

        visited = np.zeros((nS, nA), dtype=np.bool)

        for t, (state, action, reward, next_state, done) in enumerate(trajectory):
            if visited[state][action] and first_visit:
                continue
            visited[state][action] = True

            n_steps = len(trajectory[t:])
            G = np.sum(discounts[:n_steps] * trajectory[t:, 2])
            Q[state][action] = Q[state][action] + alphas[e] * (G - Q[state][action])
        
        Q_track[e] = Q
        pi_track.append(np.argmax(Q, axis=1))
    
    V = np.max(Q, axis=1)
    pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q,axis=1))}[s]
    return Q, V, pi, Q_track, pi_track

 

SARSA

- MC-control 방법과 달리 episode를 끝까지 기다릴 필요가 없다.

- on-policy 방법이다(현재 agent의 policy로 sample을 수집하고 평가한다.)

- next state에 대해서 자기 자신 agent가 직접 선택한다.

def sarsa(env,
          gamma=1.0,
          init_alpha=0.5,
          min_alpha=0.01,
          alpha_decay_ratio=0.5,
          init_epsilon=1.0,
          min_epsilon=0.1,
          epsilon_decay_ratio=0.9,
          n_episodes=3000):

    nS, nA = env.observation_space.n, env.action_space.n
    pi_track = []

    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)

    
    select_action = lambda state, Q, epsilon : np.argmax(Q[state]) \
        if np.random.random() > epsilon \
        else np.random.randint(len(Q[state]))
    
    alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
    epsilons = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)

    for e in tqdm(range(n_episodes), leave=False):
        state, done = env.reset(), False
        action = select_action(state, Q, epsilons[e])

        while not done:
            next_state, reward, done, _ = env.step(action)

            # 다음 상태에 대한 action을 현재 agent의 policy로 선택함. (epsilon greedy)
            next_action = select_action(next_state, Q, epsilons[e])

            # sarsa target 계산
            sarsa_target = reward + gamma * Q[next_state][next_action] * (not done)

            # sarsa error 계산
            sarsa_error = sarsa_target - Q[state][action]

            # Q값 update
            Q[state][action] = Q[state][action] + alphas[e] * sarsa_error
            
            state, action = next_state, next_action
        
        Q_track[e] = Q
        pi_track.append(np.argmax(Q, axis=1))
    V = np.max(Q, axis = 1)
    pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]

    return Q, V, pi, Q_track, pi_track

Q-learning

- MC-control 방법과 달리 episode를 끝까지 기다릴 필요가 없다.

- off-policy 방법이다(현재 policy로 sample을 수집하지만 평가 할때는 optimal policy로만 평가한다.)

- next state에 대해서 optimal policy로 선택한다.

def q_learning(env,
          gamma=1.0,
          init_alpha=0.5,
          min_alpha=0.01,
          alpha_decay_ratio=0.5,
          init_epsilon=1.0,
          min_epsilon=0.1,
          epsilon_decay_ratio=0.9,
          n_episodes=3000):

    nS, nA = env.observation_space.n, env.action_space.n
    pi_track = []

    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)

    
    select_action = lambda state, Q, epsilon : np.argmax(Q[state]) \
        if np.random.random() > epsilon \
        else np.random.randint(len(Q[state]))
    
    alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
    epsilons = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)

    for e in tqdm(range(n_episodes), leave=False):
        state, done = env.reset(), False

        #현재 state는 agent의 policy를 이용해서 선택
        action = select_action(state, Q, epsilons[e])

        while not done:
            next_state, reward, done, _ = env.step(action)

            # Q-learning target 계산, 다음 state는 다음 상태의 Q값에서 최대값 선택
            q_target = reward + gamma * Q[next_state].max() * (not done)

            # Q-learning error 계산
            q_error = q_target - Q[state][action]

            # Q값 update
            Q[state][action] = Q[state][action] + alphas[e] * q_error

            state = next_state
        
        Q_track[e] = Q
        pi_track.append(np.argmax(Q, axis=1))
    V = np.max(Q, axis = 1)
    pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]

    return Q, V, pi, Q_track, pi_track

Double  Q-Learning

- Q-learning의 고질적인 문제인 overestimation 문제가 있다.

- 그 결과 maximazation bias 현상이 일어난다. (Q-learning 로직 특성상 다음 상태의 max 값을 적용하는 경향 때문.)

- 2개 이상의 Q(S,A) 매칭 값을 두고 sum, avg, min 값 들을 고려하여 Q(S,A) 값을 개선하는 방법이다.

- double learning 의 기반이다.

- double deep Q - network 의 기반이다.

import numpy as np

def double_q_learning(env,
                      gamma=1.0,
                      init_alpha=0.5,
                      min_alpha=0.01,
                      alpha_decay_ratio=0.5,
                      init_epsilon=1.0,
                      min_epsilon=0.1,
                      epsilon_decay_ratio=0.9,
                      n_episodes=3000):

    nS, nA = env.observation_space.n, env.action_space.n
    pi_track = []

    #2개의 Q 값 생성
    Q1 = np.zeros((nS, nA), dtype=np.float64)
    Q2 = np.zeros((nS, nA), dtype=np.float64)
    Q_track1 = np.zeros((n_episodes, nS, nA), dtype=np.float64)
    Q_track2 = np.zeros((n_episodes, nS, nA), dtype=np.float64)
    
    select_action = lambda Q, state, epsilon np.argmax(Q[state]) if np.random.random() > epsilon else np.random.randint(len(Q[state]))

    alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
    epsilons = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)

    for e in tqdm(range(n_episodes), leave=False):
        state, done = env.reset(), False
        while not done:

            # 2개의 Q 값에 대한 평균으로 Q값을 형성하고 선택함.
            action = select_action(state, (Q1+Q2)/2, epsilon[e])

            next_state, reward, done, _ = env.step(action)

            # 2개중 1개의 Q 값을 랜덤으로 선택하여 업데이트함.
            if np.random.randint(2):
                argmax_Q1 = np.argmax(Q1[state])

                #이때 다른 Q 값을 보고 가져와서 업데이트함.
                td_target = reward + gamma * Q2[next_state][argmax_Q1] * (not done)
                td_error = td_target - Q1[state][action]
                Q1[state][action] = Q1[state][action] + alphas[e] * td_error
            else:
                argmax_Q2 = np.argmax(Q2[state])

                #이때 다른 Q 값을 보고 가져와서 업데이트함.
                td_target = reward + gamma * Q1[next_state][argmax_Q2] * (not done)
                td_error = td_target - Q2[state][action]
                Q2[state][action] = Q2[state][action] + alphas[e] * td_error
        state = next_state
    Q_track1[e] = Q1
    Q_track2[e] = Q2
    pi_track.append(np.argmax((Q1+Q2)/2), axis=1))

Q = (Q1 + Q2) / 2.0
V = np.max(Q, axis = 1)
pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]

return Q, V, pi, (Q_track1 + Q_track2)/2., pi_track

 

정리

- Control problem을 위해 Q-function을 prediction 하는 방법을 배웠다.

- Q-function을 prediction  하는 method로  MC-control, SARSA, Q-learning, Double Q-learning을 배웠다.

- MC-control 은 MC 방법 기반으로 variance가 크고 에피소드 끝까지 기다려야 하는 단점이 있지만 bias가 다른 적다.

- SARSA 는 TD 방법 기반으로 on-policy 방법이다. 이유는 현재 policy로 다음 state에 대한 action을 선택하고 평가하고 Q 값을 업데이트 하기때문이다. on-policy 방법은 off-policy 방법에 비해 안정적이다. 하지만 직접 경험해야 하는 문제가 있다.

즉 다른 agent의 평가값을 반영하지 않는다.

- Q-learning은 TD 방법 기반으로 off-policy 방법이다. 이유는 다음 state에 대한 action은 현재 policy를 따르지 않고 max Q 값만을 이용해 학습하기 때문이다. off-policy 방법은 불안정하지만 다른 agent의 평가값을 반영할 수 있기 때문에 학습 데이터를 이용하는데 유연하다.

- Double Q-learning은 TD 방법 기반이고 off-policy 방법이다. Q-learning의 단점인 overestimation, maximization bias 문제를 해결하기 위해 2개 이상의 Q 값을 이용해 평균을 내서 평가하는 방법이다. 

Model free 환경

- MDP 가 주어져 있지 않은 상태를 의미한다.

   -> 모든 정보가 수학적으로 기술되어 있지 않음.

- State transition matrix, 모든 State, Reward를 모르는 상황이다.

   -> Agent가 직접 탐색을 해야하는 상황임.

- 내가 관심있는 강화 학습은 Model free 환경이다.

   -> 현실 세계는 Model free라 생각한다. 현실은 매우 다양한 환경요소 때문에 확률적으로 매우 분산이 큼.

- Model free 환경에서 현재 policy에 따르는 Value Function을 평가해야 한다.

   -> Value Function을 구하면 현재 policy를 따랐을 때 얻을 수 있는 expected return 값을 매 state 마다 구할 수 있다.

 

Model free 환경에서 policy를 평가하는 2가지 방법

Monte Carlo prediction(MC) 

- First-visit MC 

1. Value function은 현재 state부터 termianl state까지의 감가율을 적용한 expected return 값이다.

 

2. MC 방법을 통해 무한대에 가까운 시뮬레이션을 할 경우 Value function 값은 현재 policy의 value function에 수렴한다. (하지만 수렴이 힘들고 분산이 클 것임 -> state, action, reward, next state로 구성된 tuple 집단이 유사한 trajectory를 구하는 것은 힘들다는 뜻) 

많은 Terminal state, 많은 trajactory, 실제 환경에서는 얼마나 많을까..?

3. 좀 더 효율적으로 계산하기 (모든 trajectory를 기억하지 않고, episode가 끝날 때마다 update 하기)

4. 독립적이면서 동일한 분포(independent and identically distributed, IID)에서 trajectory를 형성함으로써 무한개의 tracjtory value-function이 수렴하는 것을 확인 가능.

- Every-visit MC

1. First-visit MC를 공부하다보면 1개의  trajectory에서 2번 이상 같은 state에 방문한다면 어떻게 해야할까? First-visit MC 방법을 고집할까? 아니면 다시 계산할까?

2. 이런 문제를 다루기 위해 Every-visit MC가 있다.

3. IID 는 아니지만 수렴함을 확인함.

 

4. trajectory 생성 함수

def generate_trajectory(pi, env, max_steps=200):
	
    done, trajectory = False, []
    
    while not done:
    	state = env.reset()
        for t in range(max_steps)
            action = pi(state)
            next_state, reward, done, _ = env.step(action)
            experience = (state, action, reward, next_state, done)
            trajectory.append(experience)
            if done:
                break
        	state = next_state
            
	return np.array(trajectory)

5. MC prediction 함수

def mc_prediction(pi, 
				  env, 
                  gamma=1.0, 
                  init_alpha=0.5, 
                  min_alpha=0.01, 
                  alpha_decay_ratio=0.5, 
                  n_episodes=500, 
                  max_steps=200, 
                  first_visit=True):
                  
    #모든 state의 갯수를 가져옴
	nS = env.observation_space.n
    
    #감가 계수 미리 계산 0.99 ~ 0.3695
    discounts = np.logspace(0, max_steps, num=max_steps, base=gamma, endpoint=False)
    
    #decay하는 alpha 값도 미리 계산
    alpha = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
    
    #value-function 초기화
    V = np.zeros(nS, dtype=np.float64)
    
    #episode 별 value-function을 보기위함.
    V_track = np.zeros((n_episodes, nS), dtype=np.float64)
    
    #dictionary로 state 별 value function을 저장함.
    targets = {state:[] for state in range(nS)}
    
    for e in tqdm(range(n_episodes), leave=False):
    
    	#1개의 trajectory 생성
    	trajectory = generate_trajectory(pi, env, max_steps)
        
        #방문 여부 판단용 변수
        visited = np.zeros(nS, dtype=np.bool)
        
        #1개의 trajectory 순회
        for t, (state, _, reward, _, _) in enumerate(trajectory):
        
        	#현재 state 방문 여부 & first_visit 모드인 경우 continue
        	if visited[state] and first_visit:
            	continue
                
            #현재 state는 첫 방문이거나, every_visit 모드 인 경우 수행
            visited[state] = True
            
            #현재 상태부터 종료 상태까지 step 갯수를 샘
            n_steps = len(trajectory[t:])
            
            #Discount를 적용한 return 값을 계산함.
            G = np.sum(discounts[:n_steps] * trajectory[t:, 2])
            
            #targets 자료구조에 Discount를 적용한 return 값 저장
            targets[state].append(G)
            
            #mc error 계산 (현재 tracjtory에서 나온 value function - 기존의 value function)
            mc_error = G - V[state]
            
            #alpha 감가율을 적용하여 mc_error 적용
            V[state] = V[state] + alphas[e] * mc_error
            
        #trajectory 별 value function의 변화를 확인하기 위해 저장
        V_track[e] = V
        
    #최종 Value function, trajectory 별 value function, state 별 추론한 모든 기대값 반환
    return V.copy(), V_track, targets

6. MC prediction의 단점은 에피소드가 끝날 때까지 기다려야함. (그렇다면 무한 에피소드인 경우??)

7. 장점은 bias에 강함

Temporal-Difference (TD)

1. 단일 step 에서 얻은 Reward 만을 이용하여 Value-function을 추론한다. 

2. MC는 에피소드가 끝날때까지 기다려야 하지만 TD는 즉각적으로 계산한다.(bootstrapping)

 

- bootstrapping

현재의 값을 이용해 미래의 값을 추정하는 방법.

 

3. TD의 Value-function 구하기 공식

4. Value-function이 현재 state로 부터 기대할 수 있는 return 값이라 하는 것은 변함없고 다음 step의 reward와 감가율을 적용한 공식을 재귀적으로 문제를 해결하였다.

5. TD target과 TD error

- TD target 은 현재 Value-function이 근사 해야하는 값이다.

- TD error 는 현재 Value-function과 TD target의 오차를 의미한다.

def td(pi,
       env,
       gamma=1.0,
       init_alpha=0.01,
       min_alpha=0.01,
       alpha_decay_ratio=0.5,
       n_episodes=500):
       
       #state space 취득
       nS = env.observation_space.n
       
       #Value funcion 초기화
       V = np.zeros(nS, dtype=np.float64)
       V_track = np.zeros((n_episodes, nS), dtype=np.float64)
       
       #TD target 초기화
       targets = {state:[] for state in range(nS)}
       
       #감소하는 alpha값 미리 계산
       alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
       
       for e in tqdm(range(n_episodes), leave=False):
           state, done = env.reset(), False
           
           while not done:
           
           	   #현재 state을 policy에 따라 action
               action = pi(state)
               
               #다음 state, reward, done 여부 취득
               next_state, reward, done, _ = env.step(action)
               
               #td target 계산
               td_target = reward + gamma * V[next_state] * (not done)
               
               #state에 따르는 td_target 저장
               targets[state].append(td_target)
               
               #td_error 계산
               td_error = td_target - V[state]
               
               #td_error에 alpha 값 적용후 value-function 없데이트
               V[state] = V[state] + alphas[e] * td_error
               
               #다음 state로 update
               state = next_state
           
           #V-function의 변화 양상 저장
           V_track[e] = V
       
       return V, V_track

- n-step TD

- MC 방법과 TD 방법은 서로 극단적인 성향을 가지고있다. 

- MC는 episode가 끝나야하고, TD는 1단계의 step만 수행하고 가치를 평가한다.

- 결과적으로 MC는 low bias, high variance, TD는 high bias, low variance 하게 가치를 평가한다. (무한한 샘플링이 가능하다면 둘다 결국 수렴함.)

- MC와 TD 사이의 방법중 하나로 n-step TD가 있다.

간단하게 현재 state의 value function 값을 2개 step 이상의 discount factor가 적용된 reward로 적용하는 것이다.

def n_step_td(
			pi,
			env,
			gamma=1.0,
			init_alpha=0.5,
			min_alpha=0.01,
			alpha_decay_ratio=0.5,
			n_step=3,
			n_episodes=500):
              
	#state 공간
    nS = env.observation_space.n
    
    #Value function 초기화
    V = np.zeros(nS, dtype=np.float64)
    
    #episode 별 value function 히스토리 저장
    V_track = np.zeros((n_episodes, nS), dtype=np.float64)
    
    #에피소드별 감소하는 alpha 값 미리 계산
    alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_epsidoes)
    
    #Discount factor 미리 계산
    discount = np.logspace(0, n_step + 1, num=n_step + 1, base=gamma, endpoint=False)
    
    for e in tqdm(range(n_episodes), leave=False):
    
        #state, done 여부, n-step 저장을 위한 path 초기화
    	state, done, path = env.reset(), False, []
        
        #episode가 끝날때까지 실행
        while not done or path is not None:
        	
            path = path[1:]
            
            #n-step만큼 step 할때까지 반복 실행 
            while not done and len(path) < n_step:
            	action = pi(state)
                next_state, reward, done, _ = env.step(action)
                experience = (state, reward, next_state, done)
                path.append(experience)
                state = next_state
                if done:
                	break
            
            # 현재 path에 저장한 경험의 수
            n = len(path)
            
            # 평가할 state
            est_state = path[0][0]
            
            # path에 저장해왔던 reward만 추출
            rewards = np.array(path)[:,1]
            
            # 모든 reward에 discount factor 적용
            partial_return = discount[:n] * rewards
            
            # 다음 state의 value function값
            bs_val = discounts[-1] * V[next_state] * (not done)
            
            # td target 구하기, discount factor를 적용한 reward에 다음 state의 value function 합하기
            ntd_target = np.sum(np.append(partial_return, bs_val))
            
            # td error 구하기
            ntd_error = ntd_target - V[est_state]
            
            # td error에 현재 episode에 맞는 alpha 값 적용 후 저장
            V[est_state] = V[est_state] + alphas[e] * ntd_error
            
            if len(path) == 1 and path[0][3] == True:
				path = None
                
 		V_track[e] = V
        
return V, V_track

- forward-view TD(λ)

- MC 방법과 TD 방법을 섞은 방법이다.

- λ 값을 0 보다 크고 1 보다 작은 사이의 값을 두면 MC 방법과 TD 방법을 동시에 가중치를 두고 쓰는 방법이다.

- λ 값이 0 이면 MC의 방법을, 1 이면 TD의 방법을 사용하는 것이다.

- 하지만 이 방법은 MC 방법과 동일한 문제점인 value function을 추정하는데 있어서 에피소드가 끝날때까지 기다려야하는 문제가 있다.

 

- backward-view TD(λ)

- forward-view TD 와 달리 eligibility trace 를 이용하여 MC와 TD의 개념을 섞어서 사용함

def td_lambda(
			  pi,
              env,
              gamma=1.0,
              init_alpha=0.5,
              min_alpha=0.01,
              alpha_decay_ratio=0.5,
              lambda=0.3,
              n_epsidoes=500)
              
    # 모든 state 취득
	nS = env.observation_space.n
    
    # Value function 초기화
    V = np.zeros(nS, dtype=np.float64)
    
    # episode 별 변하는 Value function 초기화
    V_track = np.zeros((n_episodes, nS), dtype=np.float64)
    
    # eligibility trace 배열 초기화
    E = np.zeros(nS, dtype=np.float64)
    
    # episode 별 감소하는 alphas 미리 계산
    alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
    
    for e in tqdm(range(n_episodes), leave=False):
    
    	# episode를 재시작 할때마다 eligibility trace를 0으로 초기화
    	E.fill(0)
        
        # env reset
        state, done = env.reset(), False
        
        while not done:
        
        	action = pi(state)
            next_state, reward, done, _ = env.step(action)
            
            #td target 계산
            td_target = reward + gamma * V[next_state] * (not done)
            
            #td error 계산
            td_error = td_target - V[state]
        
        	#현재 방문한 state에 대해서 eligibility trace 배열에 1을 더함
        	E[state] += 1
            
            #현재 state에서 계산한 td error와  eligibility trace 배열을 모두 곱하고 
            #모든 state의 value-function에 적용
            V = V + alphas[e] * td_error * E
            
            # eligibility trace 배열 감가
            E = gamma * lambda * E
            
            state = next_state
         
        V_track[e] = V
        
    return V, V_track

- state를 재방문하는 횟수가 많을 수록 eligibility trace에서 배열의 값은 증가한다.

- 같은 state에 많이 방문 할 수록 큰 가중치로 업데이트 됨.

 

 

정리

- Model free 환경에서 value function을 prediction 하는 방법을 공부했다.

- 구하는 과정에서는 필연적으로 주어진 policy를 사용해서 trajectory를 형성해야 한다.

- 형성한 trajectory를 이용해서 MC 또는 TD 방법을 통해 value function을 prediction 했다.

- MC는 평가 하고자하는 state에서 시작하는 trajectory 를 여러개 형성하여 평균값을 통해 value function을 구했다. (bias 가 적은 대신 variance 가 큼)

- MC는 First visit, every visit 방법으로 나뉘고 두 방법은 모두 결론적으로 value-function이 수렴함을 볼 수 있다.

- TD는 평가 하고자 하는 state에서 바로 다음 state의 value function과 reward 만을 이용해서 value function을 구했다.(bias 가 큰 대신 variance 가 적음)

- TD는 n-step TD, forward-view TD(λ), backward-view TD(λ) 방법이 있는데 이 방법들은 모두 MC 방법의 특징과 TD 방법의 특징을 혼합하여 사용하기 위해서 사용되었다.

간단 설명

1. 다른 강화학습 기법(Q-learning, Sarsa 등..)과 달리 Next state가 없고 오직 Action, Reward 의 형태를 다룬다.

2. Markov Decision Process(MDP) 환경에서 동작시키는 것이 아니다.

3. MAB는 Exploration을 어떻게 할 것인지에 대해 다양한 방법을 제시했다.

 

용어 설명

1. Regret

- v_*는 현재 state에서 얻을 수 있는 최대 보상값이다.

- v는 현재 state에서 MAB Agent가 action 함으로써 얻는 보상값이다.

- 현재 상태에서 수행한 action에 대한 평가값이라 생각한다.

 

2. Total regret

- 왼쪽의 Summation 은 시작지점(Start) t = 1 에서 끝지점(Terminal) T 까지 모두 더한다는 의미이다.

- E[v_*t - v_t]는 t 시점에서 얻었던 Regret 값에 대한 기대값이다. 기대값인 이유는 MAB를 적용할 실제 환경은 확률적이라 가정하기 때문이다.

- 시작지점 -> 끝지점 까지의 Expected Regret값을 Total regret 이라한다. MAB Agent는 이 값을 낮추기 위해서 학습한다. 

Total regret을 낮춘다는 것은 Total reward (Return)을 높인다는 것과 같다.

 

3. Q - function 

- MAB 프레임워크에서는 단순하게 action a를 했을때 즉각적으로 얻을 수 있는 expected reward 값이다. expected 가 있는 이유는 확률적인 환경에서 MAB를 구동시키기 때문이다.

 

Exploration 전략

1. Epsilon-greedy strategy

요약 설명

- epsilon 이라는 Threshold 값을 두고 확률적으로 Random 선택을 하는 방법이다.

- episode가 끝날때 마다 epsilon 값을 지정한 decay_value 만큼 감소 시킨다.

- 많은 episode가 지나면 epsilon 값이 0에 수렴하게 되는데 이때부터는 greedy 하게 선택한다.

내 생각

- 이 방법은 강화학습 프레임워크에서 많이 쓰인다. 최근에 논문을 구현하면서 사용했다.(DQN, SAC, DDPG 등등)

start episode
...
    if np.random.uniform() > epsilon:
        action = np.argmax(Q)
    else:
        action = np.random.randint(len(Q))
    
    reward = env.step(action)
    N[action] += 1
    Q[action] = Q[action] + (reward - Q[action])/N[action]
...
end episode

epsilon -= decay_value

2. Optimistic intialization strategy

요약 설명

- Q 값을 높게 잡고 greedy 하게 여러 episode를 거친다. 여기서 높게라는 의미는 기대하는 expected reward 값보다 높게 잡는 것이다.

- 여러 episode를 거치면서 자동으로 Q값들이 낮아지며 결국 모든 action에 대해 expected reward 값을 알게될 것이다.

 

내 생각

- epsilon-greedy 처럼 if 분기점이 필요없어서 깔끔한 코드가 될 것 같다. 

- 단점은 MAB 뿐만아니라 강화학습의 고질적인 문제는 너무 많은 Sampling을 해야한다는 것이다.

Q = np.full(env.action_space.n, optimistic_estimate)
N = np.full(env.action_space.n, initical_count)

...
#only
action = np.argmax(Q)
reward = env.step(action)
N[action] += 1
Q[action] = Q[action] + (reward - Q[action])/N[action]

3. Softmax strategy 

요약 설명

- B 는 action space 집합이다.

- tao 값은 episode가 끝날때마다 감소한다.

- tao 값이 크면 Softmax 의 확률 분포가 균등하게 나타난다. tao 값이 작으면 Q-function의 값에 따른 확률 분포가 나온다.

- 결국 처음에는 exploration, 후반에는 greedy 하게 선택하는 원리이다. 

- 딥러닝 Output fuction 계열에서 확률 분포로 쓰이는 Softmax와 똑같은 원리로 쓰인다.

scaled_Q = Q / tao
norm_Q = scaled_Q - np.max(scaled_Q)
exp_Q = np.exp(norm_Q)
probs = exp_Q / np.sum(exp_Q)

action = np.random.choice(np.arange(len(probs)), size=1, p=probs)[0]

reward = env.step(action)

tao -= decay_unit

4. Upper confidence bound(UCB) strategy

요약 설명

- Q값의 추정치가 불확실한 경우 해당 Q 값을 exploration 할 수 있도록 장려하는 방법이다.

- argmax 대괄호 안에 오른쪽 텀을 U_t 라 함.

- U_t 의 e 는 현재까지 모든 action을 한 횟수, N_e(a) 는 현재까지 action a를 수행한 횟수.

- 모든 action 대비 현재 action a를 적게할 경우 U_t 값이 증가한다. 

- 같은 action을 많이 할 경우 U_t 텀이 작아진다.

- c는 U_t 텀에 대한 가중치 이다.

U_t = np.sqrt(c * np.log(e)/N)
action = np.argmax(Q + U_t)

reward = env.step(action)

5. Thompsom sampling strategy

요약 설명

- Q 값이 가우시안 분포를 따르는 것으로 시작한다. (평균, 표준편차)

- Agent가 계속 action을 수행 할 수록 평균값은 명확해지고 표준편차는 줄어든다.

- alpha, beta 파라미터를 이용한다.

- alpha와 beta는 action space의 사이즈만큼 존재한다.

- alpha는 Q값의 초기 표준편차 값을 의미한다. alpha는 episode를 반복할수록 줄어드는데 beta는 alhpa값이 감소하는 속도를 조절한다.

samples = np.random.normal(loc=Q, scale=alpha/(np.sqrt(N) + beta))
action = np.argmax(samples)

reward = env.step(action)

+ Recent posts