概念及定义
强化学习是一种通过交互的目标导向学习方法,旨在找到连续时间序列的最优策略.
强化学习(Reinforcement learning,RL)讨论的问题是一个主体(agent) 怎么在一个复杂不确定的 环境(environment) 里面去极大化它能获得的奖励。通过感知所处环境的状态(state)对动作(action)的奖励(reward), 来指导更好的动作,从而获得最多的奖励。
学习过程
基本的强化学习被建模为马尔可夫决策过程:
- 创建环境状态的集合S
- 定义动作的集合A
- 制定在状态之间转换的规则(转移概率矩阵)P
- 规定转换后“即时奖励”的规则(奖励函数)R
- 结束判断C。
强化学习的主体与环境基于离散的时间步作用。
在每一个时间t, 主体接收到一个环境的状态S_t、奖励R_t、是否完成C。然后,主体选择一个动作A_t, 送出到环境中去。环境则变化到一个新的状态S_{t+1},然后决定了和这个变化(s_t, a_t, s_{t+1})相关联的奖励r_{t+1}。强化学习主体的目标,是得到尽可能多的奖励。
环境S
环境状态时一个有限的离散数据集合,是时间t的一个函数返回集合f(t)。
动作A
动作集合是有限的,例如,在棋盘中棋子只能上、下、左、右移动。
主体选择动作时可以是随机的,也可以是历史动作的函数
状态转换P
环境的先状态是上一状态和时间的函数f(s_{t-1}, t)
奖励R
奖励是规定好的一个和状态相关的函数f(s_t)
结束C
由于奖励的上限是无限的,因此需要一个结束标志。此结束标志一般是环境状态的函数f(s_t)
实例
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple, deque
import pandas as pd
import random
import math
import copy
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class Action:
__data = ['BUY', 'SELL']
__size = len(__data)
def __init__(self, action) -> None:
self.__val = None
if isinstance(action, int):
if 0 <= action < self.__size:
self.__val = self.__data[action]
elif isinstance(action, str):
temp = action.upper()
if temp in self.__data:
self.__val = temp
def name(self) -> str:
return self.__val
def value(self) -> int:
if self.__val is None:
return -1
return self.__data.index(self.__val)
def to_tensor(self) -> torch.tensor:
index = self.value()
return torch.tensor([[index]])
def is_buy(self) -> bool:
if self.__val is None:
return False
return True if self.value() == 0 else False
def is_sell(self) -> bool:
if self.__val is None:
return False
return True if self.value() == 1 else False
def __str__(self) -> str:
return self.__val
def __repr__(self) -> str:
return str(self)
@classmethod
def size(cls) -> int:
return cls.__size
@classmethod
def rand(cls):
temp = random.randint(0, cls.__size-1)
return Action(temp)
class Info:
def __init__(self, can_short: bool = False) -> None:
self.long = []
self.short = []
self.profit = 0
self.max_cost = 0
self.can_short = can_short
def buy(self, price:float) ->float:
""" return cost in this time"""
# or close
size = len(self.short)
if size > 0:
temp = 0
for item in self.short:
temp = item - price
self.profit += temp
self.short.clear()
return 0
# or buy
self.long.append(price)
temp = 0
for item in self.long:
temp += item
if temp > self.max_cost:
self.max_cost = temp
return price
def sell(self, price: float) -> float:
""" return profit in this time"""
# or close
size = len(self.long)
if size > 0:
temp = 0
for item in self.long:
temp = price - item
self.profit += temp
self.long.clear()
return 0
# or sell
if not self.can_short:
return 0
self.short.append(price)
temp = 0
for item in self.short:
temp += item
if temp > self.max_cost:
self.max_cost = temp
return price
def __str__(self) -> str:
ret = ''
# short
temp = ''
for item in self.short:
temp += f'{temp}, {item}'
if temp != '':
ret += f'"short": [{temp[2:]}]\r\n'
# long
temp = ''
for item in self.long:
temp += f'{temp}, {item}'
if temp != '':
ret += f'"long": [{temp[2:]}]\r\n'
# profit
ret += f'"profit": {self.profit}\r\n'
# max cost
ret += f'"max_cost": {self.max_cost}\r\n'
return '{' + ret[:-2] + '}'
def __repr__(self) -> str:
return str(self)
class TradeEnv:
""" as same as gymnasium.Env """
def __init__(self, data: list, n_observations:int):
self.__data = data
self.__n_observations = n_observations
self.reset()
def reset(self):
self.__cur_index = self.__n_observations
self.__state = self.__data[0:self.__cur_index]
self.__info = Info()
return self.__state, self.__info
def step(self, act:Action):
reward = 0
terminated = False
if self.__cur_index >= len(self.__data):
terminated = True
else:
self.__state = self.__data[self.__cur_index-self.__n_observations:self.__cur_index]
price = self.__data[self.__cur_index]
if act.is_buy():
reward = 1 * self.__info.buy(price)
elif act.is_sell():
reward = -1 * self.__info.sell(price)
# set next time
self.__cur_index += self.__n_observations
# clipping reward
if reward > 0:
reward = 1
elif reward < 0:
reward = -1
return self.__state, reward, terminated, False, self.__info
class DQN(nn.Module):
def __init__(self, n_observations, n_actions, hidden_size=128):
super(DQN, self).__init__()
self.layer1 = nn.Linear(n_observations, hidden_size)
self.layer2 = nn.Linear(hidden_size, hidden_size)
self.layer3 = nn.Linear(hidden_size, n_actions)
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = F.relu(self.layer1(x))
x = F.relu(self.layer2(x))
return self.layer3(x)
class ReplayMemory(object):
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity)
def push(self, *args) ->None:
"""Save a transition"""
self.memory.append(Transition(*args))
def sample(self, batch_size) -> list:
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
def select_action(policy_net:DQN, index_epoch:int, state:float) ->Action:
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000 # 0 <= index_epoch/EPS_DECAY < 1
eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * index_epoch / EPS_DECAY)
sample = random.random()
if sample > eps_threshold:
temp = policy_net(state)
# t.max(1) will return the largest column value of each row.
# second column on max result is index of where max element was
# found, so we pick action with the larger expected reward.
temp = temp.max(1)[1].view(1, 1)
return Action(temp.item())
return Action.rand()
def optimize_model(policy_net: DQN, target_net: DQN, memory: ReplayMemory, optimizer:optim.AdamW, BATCH_SIZE=128):
if len(memory) < BATCH_SIZE:
return
GAMMA = 0.99
transitions = memory.sample(BATCH_SIZE)
# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
# detailed explanation). This converts batch-array of Transitions
# to Transition of batch-arrays.
batch = Transition(*zip(*transitions))
# Compute a mask of non-final states and concatenate the batch elements
# (a final state would've been the one after which simulation ended)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the
# columns of actions taken. These are the actions which would've been taken
# for each batch state according to policy_net
temp = policy_net(state_batch)
state_action_values = temp.gather(1, action_batch)
# Compute V(s_{t+1}) for all next states.
# Expected values of actions for non_final_next_states are computed based
# on the "older" target_net; selecting their best reward with max(1)[0].
# This is merged based on the mask, such that we'll have either the expected
# state value or 0 in case the state was final.
next_state_values = torch.zeros(BATCH_SIZE)
temp = target_net(non_final_next_states)
temp = temp.max(1)[0]
next_state_values[non_final_mask] = temp
# Compute the expected Q values
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Compute Huber loss
criterion = nn.SmoothL1Loss()
loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
# Optimize the model
optimizer.zero_grad()
loss.backward()
# In-place gradient clipping
torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
optimizer.step()
def dqn_predict(df:pd.DataFrame, price_col_name:str, pre_days=5, num_epochs=100):
TAU = 0.005
n_observations = pre_days
n_actions = Action.size()
df_temp = df[[price_col_name]]
data = (df_temp - df_temp.mean()) / df_temp.std()
data = data[price_col_name].values
# train
train = data[:-pre_days]
env = TradeEnv(train, n_observations)
policy_net = DQN(n_observations, n_actions, n_observations)
target_net = copy.deepcopy(policy_net)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.AdamW(policy_net.parameters(), lr=1/num_epochs)
memory = ReplayMemory(10000)
for i in range(num_epochs):
state, info = env.reset()
state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
while True:
action = select_action(policy_net, i, state)
observation, reward, terminated, _, info = env.step(action)
reward = torch.tensor([reward])
if terminated:
next_state = None
else:
next_state = torch.tensor(observation, dtype=torch.float32).unsqueeze(0)
memory.push(state, action.to_tensor(), next_state, reward)
state = next_state
optimize_model(policy_net, target_net, memory, optimizer)
target_net_state_dict = target_net.state_dict()
policy_net_state_dict = policy_net.state_dict()
for key in policy_net_state_dict:
target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
target_net.load_state_dict(target_net_state_dict)
if terminated:
break
# predict
test = torch.tensor(data[-pre_days:], dtype=torch.float32).unsqueeze(0)
test_pred = policy_net(test)
ret = test_pred.max(1)[1].item()
ret = round(ret, 2)
ret = ret * df_temp.std() + df_temp.mean()
ret = round(ret[price_col_name], 2)
print(f'dqn: {price_col_name}:{ret}')
return ret