沈浙奇 (主页)
zqshen@hhu.edu.cn
在这个实例中,我们使用python实现Lorenz 96模式的集合卡尔曼滤波器同化,采用的同化方法为集合调整卡尔曼滤波器(EAKF)。
EAKF的算法描述如下: $x$代表模式状态,$y^o$代表观测,其方差为 $\sigma_o^2$。$h$是观测算子将模式状态投影到观测点上。
1.将$n$份的模式结果都投影到某个观测点上 \begin{equation} y_{p,n}=h(x_{p,n}),n=1\ldots,N \end{equation} 其中下标$p$代表预报
2.使用预报投影的集合方差$\sigma_p^2$和观测的方差求分析场在观测点位置的方差和平均值 \begin{equation}\label{eakf_var} \sigma_u^2=[(\sigma_p^2)^{-1}+(\sigma_o^2)^{-1}]^{-1} \end{equation} \begin{equation}\label{eakf_mean} \overline{y}_u=\sigma_u^2(\overline{y}_p/\sigma_p^2+y^o/\sigma_o^2). \end{equation} 其中下标$u$代表分析
3.那么,为了让分析集合得到这样的均值和方差,需要的观测增量为 \begin{equation}\label{eakf_obsinc} y_{u,n}=(\sigma_u/\sigma_p)(y_{p,n}-\overline{y}_p)+\overline{y}_u, n=1,\ldots,N \end{equation}
4.从而,模式网格点上的增量可以通过相关性计算如下 \begin{equation}\label{eakf_stateinc} \Delta x_{m,n} = (\sigma_{x_m,y}/\sigma_p^2)\Delta y_n, n=1,\ldots,N \end{equation}
5.* 一般情况下,需要引入一个局地化因子再把增量加到模式网格上
我们针对Lorenz96模式的同化问题来使用EAKF。以下使用python实现:
首先载入需要的库,包括numpy(用于科学计算)和matlplotlib.pyplot(用于画图)
import numpy as np;
import matplotlib.pyplot as plt;
Lorenz 96模式的模式方程如下 \begin{equation} \frac{dx_j}{dt}=(x_{j+1}-x_{j-2})x_{j-1}-x_j+F, \end{equation}
为了写出这个模式:
def L96_rhs(x):
forcing = 8;
dx=np.ones_like(x);
L = len(x);
for i in range(L):
if i==0:
dx[i] = (x[i+1]-x[L-2])*x[L-1]-x[i]+forcing;
elif i==1:
dx[i] = (x[i+1]-x[L-1])*x[i-1]-x[i]+forcing;
elif i==L-1:
dx[i] = (x[0]-x[i-2])*x[i-1]-x[i]+forcing;
else:
dx[i] = (x[i+1]-x[i-2])*x[i-1]-x[i]+forcing;
return dx
def RK45(x,func,h):
K1=func(x);
K2=func(x+h/2*K1);
K3=func(x+h/2*K2);
K4=func(x+h*K3);
x1=x+h/6*(K1+2*K2+2*K3+K4);
return x1
def L96_adv_1step(x0,delta_t):
x1=RK45(x0,L96_rhs,delta_t)
return x1
设置基本参数:积分步长和模式变量数
delta_t=0.05;
model_dim = 36;
一般来说,整个实验的初值最好是用随机的初值进行长时间积分的spinup得到。spinup即长时间的积分,其目的是为了保证变量之间的关系符合物理关系。
import os
if not os.path.exists('x0true.npz'):
x0true=np.random.randn(model_dim);
t=0;
spinup_steps = 14400;
x1 = x0true;
while t<spinup_steps:
x2=L96_adv_1step(x1,delta_t);
t+=1;
x1=x2;
x0true = x2;
del x1;x2
print('finish spinup');
np.savez('x0true.npz',x0true);
else:
TMPDAT = np.load('x0true.npz')
print('load spinup')
x0true = TMPDAT['arr_0']
因为模式的0.05对应6小时,所以14400大约是150天。 spinup一般来说只需要运行一次,后续的同化不需要采用,所以保存数据,在下一次同化实验的时候直接读取。
为了进行孪生实验,需要自己构造数据进行同化,得到真实场和观测值,观测点可以是稀疏的,这里设置每4步同化观测一次,每次观测到一半的变量,观测具有随机误差,误差的高斯分布方差可控制,观测算子可以是非线性的算子,在这个例子里,我只考虑线性的观测算子,但是只观测一半的位置。
assim_period_steps = 1000;
tgrid=np.linspace(0,delta_t*assim_period_steps,assim_period_steps,endpoint=False);
assim_every_n_steps = 4;
obs_error_var = 1;
H_mat = np.eye(model_dim);
obs_every_n_vars = 2;
H_mat = H_mat[0:model_dim:obs_every_n_vars,:]; # observation factor in matrix form
tobs = tgrid[0:assim_period_steps:assim_every_n_steps];
if not os.path.exists('truth_obs.npz'):
t=0;x1=x0true;
Xtrue=np.zeros([model_dim,assim_period_steps]);
OBS = np.zeros([model_dim//obs_every_n_vars,assim_period_steps//assim_every_n_steps]);
while t<assim_period_steps:
x2=L96_adv_1step(x1,delta_t)
Xtrue[:,t]=x2;
if t%assim_every_n_steps==0:
OBS[:,t//assim_every_n_steps]=np.dot(H_mat,Xtrue[:,t])+np.random.randn(model_dim//obs_every_n_vars);
tobs[t//assim_every_n_steps]=tgrid[t];
x1=x2;
t+=1;
del x1;x2
print('Experimental data generated')
np.savez('truth_obs.npz',Xtrue,OBS);
else:
print('Experimental data loaded')
ExprData = np.load('truth_obs.npz')
Xtrue = ExprData['arr_0'];
OBS = ExprData['arr_1'];
可以画图大致看一下精确解和观测
[xgrd,ygrd] = np.meshgrid(range(36),range(assim_period_steps))
plt.figure(figsize=(12,6))
plt.subplot(121)
z_lim = np.linspace(-10,10)
plt.title('TRUTH',fontsize=20)
plt.contourf(xgrd,ygrd,Xtrue.T,z_lim,cmap=plt.cm.rainbow)
plt.xticks(np.arange(0,36,10),fontsize=18)
plt.xlabel('Variable Index',fontsize=18)
plt.xlim([0,35])
plt.yticks(np.arange(0,1000,200),fontsize=18)
plt.ylabel('Model Steps',fontsize=18)
plt.colorbar()
plt.subplot(222)
plt.plot(tgrid,Xtrue[0],'k:',lw=4,label = 'True')
plt.plot(tobs,OBS[0],'r+',mew=4,ms=16,label = 'OBS')
plt.legend(fontsize=28,ncol=3)
plt.xlim([0,10])
plt.xlabel(str('dimensionless time').title(),fontsize=16)
plt.ylabel(r'$x_1$',fontsize=16)
plt.tight_layout(2)
plt.subplot(224)
plt.plot(np.arange(36),Xtrue[:,0],'k:',lw=4,label = 'True')
plt.plot(np.linspace(0,35,num=18),OBS[:,0],'r+',mew=4,ms=16,label = 'OBS')
plt.legend(fontsize=28,ncol=3)
plt.xlabel('Variable index',fontsize=16)
plt.ylabel('Truth value',fontsize=16)
定义EAKF的分析算法如下:
- com_cov_factor 计算局地化因子
- obs_increment_eakf 计算观测增量
- get_state_increments 把观测增量回归到模式网格
- eakf_analysis 使用EAKF将输入的预报集合进行分析,输出分析集合
def comp_cov_factor(z_in,c):
z=abs(z_in);
if z<=c:
r = z/c;
cov_factor=((( -0.25*r +0.5)*r +0.625)*r -5.0/3.0)*r**2 + 1.0;
elif z>=c*2.0:
cov_factor=0.0;
else:
r = z / c;
cov_factor = ((((r/12.0 -0.5)*r +0.625)*r +5.0/3.0)*r -5.0)*r + 4.0 - 2.0 / (3.0 * r);
return cov_factor;
# observation increment for the observation site
def obs_increment_eakf(ensemble, observation, obs_error_var):
prior_mean = np.mean(ensemble);
prior_var = np.var(ensemble);
if prior_var >0.1:
post_var = 1.0 / (1.0 / prior_var + 1.0 / obs_error_var);
post_mean = post_var * (prior_mean / prior_var + observation / obs_error_var);
else:
post_var = prior_var;
post_mean = prior_mean;
updated_ensemble = ensemble - prior_mean + post_mean;
var_ratio = post_var / prior_var;
updated_ensemble = np.sqrt(var_ratio) * (updated_ensemble - post_mean) + post_mean;
obs_increments = updated_ensemble - ensemble;
return obs_increments
# regression the obs_inc to model grid corresponds state_ens
def get_state_increments(state_ens, obs_ens, obs_incs):
covar = np.cov(state_ens, obs_ens);
state_incs = obs_incs * covar[0,1]/covar[1,1];
return state_incs
def eakf_analysis(ensemble_in,obs_in,obs_error_var,H_mat,obs_every_n_vars,local_para):
# c = 8; # localization factor
L = len(ensemble_in); # model dimension (model grids)
m = len(obs_in); # number of obs sites
for i in range(m):
ensemble_proj = np.dot(H_mat,ensemble_in);
obs_proj = ensemble_proj[i]; # project model grid to obs site
obs_inc = obs_increment_eakf(obs_proj,obs_in[i],obs_error_var);
for j in range(L):
state_inc=get_state_increments(ensemble_in[j],obs_proj,obs_inc);
dist = np.abs(obs_every_n_vars*i-j);
if dist>L/2.0:
dist=L-dist;
cov_factor = comp_cov_factor(dist,local_para);
ensemble_in[j]=ensemble_in[j]+cov_factor*state_inc;
ensemble_out = ensemble_in;
return ensemble_out
Xassim = np.zeros_like(Xtrue);
Xspread = np.zeros_like(Xtrue);
N = 40; # ensemble size
infl = 1.01;
local_para = 4;
if not os.path.exists('Ensemble'+str(N)+'.npz'):
Ens0 = np.repeat(x0true,N)+np.random.randn(model_dim*N);
Ens0 = np.reshape(Ens0,(model_dim,-1)); # initial ensemble
print('Initial Ensemble generated')
np.savez('Ensemble'+str(N)+'.npz',Ens0);
else:
print('Initial Ensemble loaded')
TMPDAT = np.load('Ensemble'+str(N)+'.npz')
Ens0=TMPDAT['arr_0']
method = 'EAKF'
Ens = Ens0;
Ens2 = np.zeros_like(Ens);
t=0;
while t<assim_period_steps:
for n in range(N):
Ens2[:,n]=L96_adv_1step(Ens[:,n],delta_t);
Ens_mean = np.mean(Ens2,axis=1);
for n in range(N):
Ens2[:,n]= (Ens2[:,n]-Ens_mean)*infl + Ens_mean;
if t%assim_every_n_steps==0: # if it is the assimilation step;
tassim = t//assim_every_n_steps;
Ens2 = eakf_analysis(Ens2,OBS[:,tassim],obs_error_var,H_mat,obs_every_n_vars,local_para);
else:
pass
Xassim[:,t] = np.mean(Ens2, axis=1);
Xspread[:,t] = np.std(Ens2, axis=1);
t+=1;
Ens = Ens2;
print('Assimilation finished!')
获取同化结果之后,可以画图验证:
plt.figure(figsize=(10,6))
plt.subplot(211)
plt.xticks([],fontsize = 28);
plt.yticks(fontsize = 28);
plt.title(str('True State and data assimilation analyses').title(),fontsize=28)
plt.plot(tgrid,Xtrue[0],'k:',lw=4,label = 'True')
plt.ylabel(r'$x_1$ (observed)',fontsize=28)
plt.xlim(0,model_dim)
plt.plot(tobs,OBS[0],'r+',mew=4,ms=16,label = 'OBS')
plt.plot(tgrid,Xassim[0],lw=4,label = method +'_mean')
plt.legend(fontsize=28,ncol=3)
plt.tight_layout(2)
plt.subplot(212)
plt.xticks(np.array([0,10,20,30,40]),fontsize = 28);
plt.yticks(fontsize = 28);
plt.plot(tgrid,Xtrue[1],'k:',lw=4)
plt.ylabel(r'$x_2$ (unobserved)',fontsize=28)
plt.xlim(0,model_dim)
plt.plot(tgrid,Xassim[1],lw=4)
plt.xlabel(str('dimensionless time').title(),fontsize=28)
计算各时间的所有变量的均方根误差和离散度,以及各变量的时间均方根误差和离散度如下:
RMSE = np.zeros_like(tgrid);
RMSS = np.zeros_like(tgrid);
for j in range(assim_period_steps):
RMSE[j] = np.sqrt(np.mean((Xtrue[:,j]-Xassim[:,j])**2));
RMSS[j] = np.sqrt(np.mean(Xspread[:,j]**2));
tRMSE = np.zeros(model_dim);
tRMSS = np.zeros(model_dim);
for i in range(model_dim):
tRMSE[i] = np.sqrt(np.mean((Xtrue[i,:]-Xassim[i,:])**2));
tRMSS[i] = np.sqrt(np.mean(Xspread[i,:]**2));
plt.figure(figsize=(10,6))
plt.subplot(211)
plt.xticks(np.array([0,10,20,30,40]),fontsize = 28);
plt.yticks(np.array([0.5,1.0,1.5]),fontsize = 28);
plt.plot(tgrid,RMSE,lw=4,label = 'RMSE')
plt.plot(tgrid,RMSS,'r--',lw=2,label = 'RMSS')
plt.legend(ncol=2,fontsize = 28,loc=4)
plt.xlim(0,model_dim)
plt.ylim(0.2,1.5)
plt.xlabel(str('dimensionless time').title(),fontsize = 28)
plt.tight_layout(3)
plt.text(1, 0.5, 'average RMSE = %.2f' %np.mean(RMSE), bbox=dict(facecolor='grey', alpha=0.2),fontsize = 17)
plt.subplot(212)
plt.xticks(np.array([0.4,0.6,0.8,1.0,1.2]),fontsize = 28);
plt.yticks(np.array([0.4,0.6,0.8,1.0,1.2]),fontsize = 28);
plt.plot(tRMSS,tRMSE,'rx',ms=14,mew=3)
plt.xlim(0.3,1.2);
plt.ylim(0.3,1.2);
plt.ylabel(str('temporal RMSE').capitalize(),fontsize = 28)
plt.xlabel(str('temporal RMSS').capitalize(),fontsize = 28)
plt.plot(np.arange(0,5,.1),np.arange(0,5,.1),'k:',lw=4,label='y=x');
plt.legend(fontsize = 28);
import numpy as np
[xgrd,ygrd] = np.meshgrid(range(36),range(assim_period_steps))
plt.figure(figsize=(10,10))
plt.subplot(121)
z_lim = np.linspace(0,3,7)
plt.title('ABS ERRORS',fontsize=20)
plt.contourf(xgrd,ygrd,(np.abs(Xassim-Xtrue)).T,z_lim,cmap=plt.cm.rainbow)
plt.xticks(np.arange(0,36,10),fontsize=18)
plt.xlabel('Variable Index',fontsize=18)
plt.xlim([0,35])
plt.yticks(np.arange(0,1000,200),fontsize=18)
plt.ylabel('Model Steps',fontsize=18)
plt.colorbar()
plt.subplot(122)
plt.title('SPREAD',fontsize=20)
z_lim2=np.linspace(0,3,7)
plt.contourf(xgrd,ygrd,Xspread.T,z_lim2,cmap=plt.cm.rainbow)
plt.xticks(np.arange(0,36,10),fontsize=18)
plt.xlabel('Variable Index',fontsize=18)
plt.xlim([0,35])
plt.yticks([],fontsize=18)
plt.colorbar()