STS(Structural time series) model
An STS model expresses an observed time series as the sum of simpler components:
f(t) = f_1(t) + f_2(t) + f_3(t) + ... f_n(t) + \epsilon; \epsilon~N(0,\sigma^2)
f_n(t) or components
f_n(t) are a family of probability models for time series that includes and generalizes many standard time-series modeling ideas, including:
- autoregressive processes,
- moving averages,
- local linear trends,
- seasonality, and
- regression and variable selection on external covariates (other time series potentially related to the series of interest).
the procedure of predict
choose your components to build model
def build_model(observed_time_series):
trend = sts.LocalLinearTrend(observed_time_series=observed_time_series)
seasonal = tfp.sts.Seasonal(
num_seasons=12, observed_time_series=observed_time_series)
model = sts.Sum([trend, seasonal], observed_time_series=observed_time_series)
return model
get train model
co2_by_month_training_data = get_training_data() # np.array
co2_model = build_model(co2_by_month_training_data)
get variational posteriors
variational_posteriors = tfp.sts.build_factored_surrogate_posterior(
model=co2_model)
q_samples_co2_= variational_posteriors.sample(50)
show elbo loss curve(option)
#@title Minimize the variational loss.
# Allow external control of optimization to reduce test runtimes.
num_variational_steps = 200 # @param { isTemplate: true}
num_variational_steps = int(num_variational_steps)
# Build and optimize the variational loss function.
elbo_loss_curve = tfp.vi.fit_surrogate_posterior(
target_log_prob_fn=co2_model.joint_distribution(
observed_time_series=co2_by_month_training_data).log_prob,
surrogate_posterior=variational_posteriors,
optimizer=tf.optimizers.Adam(learning_rate=0.1),
num_steps=num_variational_steps,
jit_compile=True)
plt.plot(elbo_loss_curve)
plt.show()
inferred parameters
for param in co2_model.parameters:
print("{}: {} +- {}".format(param.name,
np.mean(q_samples_co2_[param.name], axis=0),
np.std(q_samples_co2_[param.name], axis=0)))
output:
observation_noise_scale: 0.17199112474918365 +- 0.009443143382668495
LocalLinearTrend/_level_scale: 0.17671072483062744 +- 0.01510554924607277
LocalLinearTrend/_slope_scale: 0.004302256740629673 +- 0.0018349259626120329
Seasonal/_drift_scale: 0.041069451719522476 +- 0.007772190496325493
get forecast dist
num_forecast_steps = len(co2_by_month_training_data)
co2_forecast_dist = tfp.sts.forecast(
co2_model,
observed_time_series=co2_by_month_training_data,
parameter_samples=q_samples_co2_,
num_steps_forecast=num_forecast_steps)
forecast last 10 history data
num_samples=10
co2_forecast_mean, co2_forecast_scale, co2_forecast_samples = (
co2_forecast_dist.mean().numpy()[..., 0],
co2_forecast_dist.stddev().numpy()[..., 0],
co2_forecast_dist.sample(num_samples).numpy()[..., 0])
show forecast last 10 history data(option)
def plot_forecast(x, y,
forecast_mean, forecast_scale, forecast_samples,
title, x_locator=None, x_formatter=None):
"""Plot a forecast distribution against the 'true' time series."""
colors = sns.color_palette()
c1, c2 = colors[0], colors[1]
fig = plt.figure(figsize=(12, 6))
ax = fig.add_subplot(1, 1, 1)
num_steps = len(y)
num_steps_forecast = forecast_mean.shape[-1]
num_steps_train = num_steps - num_steps_forecast
ax.plot(x, y, lw=2, color=c1, label='ground truth')
forecast_steps = np.arange(
x[num_steps_train],
x[num_steps_train]+num_steps_forecast,
dtype=x.dtype)
ax.plot(forecast_steps, forecast_samples.T, lw=1, color=c2, alpha=0.1)
ax.plot(forecast_steps, forecast_mean, lw=2, ls='--', color=c2,
label='forecast')
ax.fill_between(forecast_steps,
forecast_mean-2*forecast_scale,
forecast_mean+2*forecast_scale, color=c2, alpha=0.2)
ymin, ymax = min(np.min(forecast_samples), np.min(y)), max(np.max(forecast_samples), np.max(y))
yrange = ymax-ymin
ax.set_ylim([ymin - yrange*0.1, ymax + yrange*0.1])
ax.set_title("{}".format(title))
ax.legend()
if x_locator is not None:
ax.xaxis.set_major_locator(x_locator)
ax.xaxis.set_major_formatter(x_formatter)
fig.autofmt_xdate()
return fig, ax
co2_dates = co2_by_month_training_data
co2_by_month = len(co2_dates)
fig, ax = plot_forecast(
co2_dates, co2_by_month,
co2_forecast_mean, co2_forecast_scale, co2_forecast_samples,
x_locator=co2_loc,
x_formatter=co2_fmt,
title="Atmospheric CO2 forecast")
ax.axvline(co2_dates[-num_forecast_steps], linestyle="--")
ax.legend(loc="upper left")
ax.set_ylabel("Atmospheric CO2 concentration (ppm)")
ax.set_xlabel("Year")
fig.autofmt_xdate()
component dists(option)
def plot_components(dates,
component_means_dict,
component_stddevs_dict,
x_locator=None,
x_formatter=None):
"""Plot the contributions of posterior components in a single figure."""
colors = sns.color_palette()
c1, c2 = colors[0], colors[1]
axes_dict = collections.OrderedDict()
num_components = len(component_means_dict)
fig = plt.figure(figsize=(12, 2.5 * num_components))
for i, component_name in enumerate(component_means_dict.keys()):
component_mean = component_means_dict[component_name]
component_stddev = component_stddevs_dict[component_name]
ax = fig.add_subplot(num_components,1,1+i)
ax.plot(dates, component_mean, lw=2)
ax.fill_between(dates,
component_mean-2*component_stddev,
component_mean+2*component_stddev,
color=c2, alpha=0.5)
ax.set_title(component_name)
if x_locator is not None:
ax.xaxis.set_major_locator(x_locator)
ax.xaxis.set_major_formatter(x_formatter)
axes_dict[component_name] = ax
fig.autofmt_xdate()
fig.tight_layout()
return fig, axes_dict
# Build a dict mapping components to distributions over
# their contribution to the observed signal.
component_dists = sts.decompose_by_component(
co2_model,
observed_time_series=co2_by_month,
parameter_samples=q_samples_co2_)
co2_component_means_, co2_component_stddevs_ = (
{k.name: c.mean() for k, c in component_dists.items()},
{k.name: c.stddev() for k, c in component_dists.items()})
_ = plot_components(co2_dates, co2_component_means_, co2_component_stddevs_,
x_locator=co2_loc, x_formatter=co2_fmt)