Halo
发布于 2023-10-19 / 158 阅读 / 0 评论 / 0 点赞

强化学习DQN

概念及定义

强化学习是一种通过交互的目标导向学习方法,旨在找到连续时间序列的最优策略.
强化学习(Reinforcement learning,RL)讨论的问题是一个主体(agent) 怎么在一个复杂不确定的 环境(environment) 里面去极大化它能获得的奖励。通过感知所处环境的状态(state)对动作(action)的奖励(reward), 来指导更好的动作,从而获得最多的奖励。

学习过程

基本的强化学习被建模为马尔可夫决策过程:

  1. 创建环境状态的集合S
  2. 定义动作的集合A
  3. 制定在状态之间转换的规则(转移概率矩阵)P
  4. 规定转换后“即时奖励”的规则(奖励函数)R
  5. 结束判断C。

强化学习的主体与环境基于离散的时间步作用。
在每一个时间t, 主体接收到一个环境的状态S_t、奖励R_t、是否完成C。然后,主体选择一个动作A_t, 送出到环境中去。环境则变化到一个新的状态S_{t+1},然后决定了和这个变化(s_t, a_t, s_{t+1})相关联的奖励r_{t+1}。强化学习主体的目标,是得到尽可能多的奖励。

环境S

环境状态时一个有限的离散数据集合,是时间t的一个函数返回集合f(t)

动作A

动作集合是有限的,例如,在棋盘中棋子只能上、下、左、右移动。
主体选择动作时可以是随机的,也可以是历史动作的函数

状态转换P

环境的先状态是上一状态和时间的函数f(s_{t-1}, t)

奖励R

奖励是规定好的一个和状态相关的函数f(s_t)

结束C

由于奖励的上限是无限的,因此需要一个结束标志。此结束标志一般是环境状态的函数f(s_t)

实例

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple, deque
import pandas as pd
import random
import math
import copy
    

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class Action:
    __data = ['BUY', 'SELL']
    __size = len(__data)
    
    def __init__(self, action) -> None:
        self.__val = None
        if isinstance(action, int):
            if 0 <= action < self.__size:
                self.__val = self.__data[action]
        elif isinstance(action, str):
            temp = action.upper()
            if temp in self.__data:
                self.__val = temp
    
    def name(self) -> str:
        return self.__val
    
    def value(self) -> int:
        if self.__val is None:
            return -1
        return self.__data.index(self.__val)
    
    def to_tensor(self) -> torch.tensor:
        index = self.value()
        return torch.tensor([[index]])
        
    def is_buy(self) -> bool:
        if self.__val is None:
            return False
        return True if self.value() == 0 else False
    
    def is_sell(self) -> bool:
        if self.__val is None:
            return False
        return True if self.value() == 1 else False
    
    def __str__(self) -> str:
        return self.__val
    
    def __repr__(self) -> str:
        return str(self)
            
    @classmethod
    def size(cls) -> int:
        return cls.__size
    
    @classmethod
    def rand(cls):
        temp =  random.randint(0, cls.__size-1)
        return Action(temp)
    
    
class Info:
    def __init__(self, can_short: bool = False) -> None:
        self.long = []
        self.short = []
        self.profit = 0
        self.max_cost = 0
        self.can_short = can_short
        
    def buy(self, price:float) ->float:
        """ return cost in this time"""
        
        # or close
        size = len(self.short)
        if size > 0:
            temp = 0
            for item in self.short:
                temp = item - price
            self.profit += temp
            self.short.clear()
            return 0
        
        # or buy
        self.long.append(price)
        temp = 0
        for item in self.long:
            temp += item
        if temp > self.max_cost:
            self.max_cost = temp
        return price
    
    def sell(self, price: float) -> float:
        """ return profit in this time"""
        
        # or close
        size = len(self.long)
        if size > 0:
            temp = 0
            for item in self.long:
                temp = price - item
            self.profit += temp
            self.long.clear()
            return 0
        
        # or sell
        if not self.can_short:
            return 0
        self.short.append(price)
        temp = 0
        for item in self.short:
            temp += item
        if temp > self.max_cost:
            self.max_cost = temp
        return price
    
    def __str__(self) -> str:
        ret = ''
        
        # short
        temp = ''
        for item in self.short:
            temp += f'{temp}, {item}'
        if temp != '':
            ret += f'"short": [{temp[2:]}]\r\n'
        
        # long
        temp = ''
        for item in self.long:
            temp += f'{temp}, {item}'
        if temp != '':
            ret += f'"long": [{temp[2:]}]\r\n'
            
        # profit
        ret += f'"profit": {self.profit}\r\n'
            
        # max cost
        ret += f'"max_cost": {self.max_cost}\r\n'
        return '{' + ret[:-2] + '}'
    
    def __repr__(self) -> str:
        return str(self)
    

class TradeEnv:
    """ as same as gymnasium.Env """
    def __init__(self, data: list, n_observations:int):
        self.__data = data
        self.__n_observations = n_observations
        self.reset()
        
    def reset(self):
        self.__cur_index = self.__n_observations
        self.__state = self.__data[0:self.__cur_index]
        self.__info = Info()
        return self.__state, self.__info
    
    def step(self, act:Action):
        reward = 0
        terminated = False
        if self.__cur_index >= len(self.__data):
            terminated = True
        else:
            self.__state = self.__data[self.__cur_index-self.__n_observations:self.__cur_index]
            
            price = self.__data[self.__cur_index]
            if act.is_buy():
                reward = 1 * self.__info.buy(price)
            elif act.is_sell():
                reward = -1 * self.__info.sell(price)
            
            # set next time
            self.__cur_index += self.__n_observations
            
            # clipping reward
            if reward > 0:
                reward = 1
            elif reward < 0:
                reward = -1
        
        return self.__state, reward, terminated, False, self.__info


class DQN(nn.Module):
    def __init__(self, n_observations, n_actions, hidden_size=128):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, hidden_size)
        self.layer2 = nn.Linear(hidden_size, hidden_size)
        self.layer3 = nn.Linear(hidden_size, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

    
class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args) ->None:
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size) -> list:
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


def select_action(policy_net:DQN, index_epoch:int, state:float) ->Action:
    EPS_START = 0.9
    EPS_END = 0.05
    EPS_DECAY = 1000  # 0 <= index_epoch/EPS_DECAY < 1
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * index_epoch / EPS_DECAY)
    sample = random.random()
    if sample > eps_threshold:
        temp = policy_net(state)
        
        # t.max(1) will return the largest column value of each row.
        # second column on max result is index of where max element was
        # found, so we pick action with the larger expected reward.
        temp = temp.max(1)[1].view(1, 1)
        return Action(temp.item())
    return Action.rand()
    
    
def optimize_model(policy_net: DQN, target_net: DQN, memory: ReplayMemory, optimizer:optim.AdamW, BATCH_SIZE=128):
    if len(memory) < BATCH_SIZE:
        return
    GAMMA = 0.99
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    temp = policy_net(state_batch)
    state_action_values = temp.gather(1, action_batch)
    
    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE)
    temp = target_net(non_final_next_states)
    temp = temp.max(1)[0]
    next_state_values[non_final_mask] = temp
    
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()
    
    
def dqn_predict(df:pd.DataFrame, price_col_name:str, pre_days=5, num_epochs=100):
    TAU = 0.005
    n_observations = pre_days
    n_actions = Action.size()
    
    df_temp = df[[price_col_name]]
    data = (df_temp - df_temp.mean()) / df_temp.std()
    data = data[price_col_name].values
    
    # train
    train = data[:-pre_days]
    env = TradeEnv(train, n_observations)

    policy_net = DQN(n_observations, n_actions, n_observations)
    target_net = copy.deepcopy(policy_net)
    target_net.load_state_dict(policy_net.state_dict())
    
    optimizer = optim.AdamW(policy_net.parameters(), lr=1/num_epochs)
    memory = ReplayMemory(10000)
    
    for i in range(num_epochs):
        state, info = env.reset()
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        while True:
            action = select_action(policy_net, i, state)
            observation, reward, terminated, _, info  = env.step(action)
            reward = torch.tensor([reward])
            if terminated:
                next_state = None
            else:
                next_state = torch.tensor(observation, dtype=torch.float32).unsqueeze(0)
                memory.push(state, action.to_tensor(), next_state, reward)
                state = next_state
                
                optimize_model(policy_net, target_net, memory, optimizer)
                
                target_net_state_dict = target_net.state_dict()
                policy_net_state_dict = policy_net.state_dict()
                for key in policy_net_state_dict:
                    target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
                target_net.load_state_dict(target_net_state_dict)

            if terminated:
                break
    
    # predict
    test = torch.tensor(data[-pre_days:], dtype=torch.float32).unsqueeze(0)
    test_pred = policy_net(test)
    ret = test_pred.max(1)[1].item()
    ret = round(ret, 2)
    ret = ret * df_temp.std() + df_temp.mean()
    ret = round(ret[price_col_name], 2)
    print(f'dqn: {price_col_name}:{ret}')
    return ret

评论