Lorenz 96模式同化的孪生实验示例

沈浙奇 (主页)

zqshen@hhu.edu.cn


1.简介

在这个实例中,我们使用python实现Lorenz 96模式的集合卡尔曼滤波器同化,采用的同化方法为集合调整卡尔曼滤波器(EAKF)。

EAKF的算法描述如下: $x$代表模式状态,$y^o$代表观测,其方差为 $\sigma_o^2$。$h$是观测算子将模式状态投影到观测点上。

1.将$n$份的模式结果都投影到某个观测点上 \begin{equation} y_{p,n}=h(x_{p,n}),n=1\ldots,N \end{equation} 其中下标$p$代表预报

2.使用预报投影的集合方差$\sigma_p^2$和观测的方差求分析场在观测点位置的方差和平均值 \begin{equation}\label{eakf_var} \sigma_u^2=[(\sigma_p^2)^{-1}+(\sigma_o^2)^{-1}]^{-1} \end{equation} \begin{equation}\label{eakf_mean} \overline{y}_u=\sigma_u^2(\overline{y}_p/\sigma_p^2+y^o/\sigma_o^2). \end{equation} 其中下标$u$代表分析

3.那么,为了让分析集合得到这样的均值和方差,需要的观测增量为 \begin{equation}\label{eakf_obsinc} y_{u,n}=(\sigma_u/\sigma_p)(y_{p,n}-\overline{y}_p)+\overline{y}_u, n=1,\ldots,N \end{equation}

4.从而,模式网格点上的增量可以通过相关性计算如下 \begin{equation}\label{eakf_stateinc} \Delta x_{m,n} = (\sigma_{x_m,y}/\sigma_p^2)\Delta y_n, n=1,\ldots,N \end{equation}

5.* 一般情况下,需要引入一个局地化因子再把增量加到模式网格上


我们针对Lorenz96模式的同化问题来使用EAKF。以下使用python实现:

首先载入需要的库,包括numpy(用于科学计算)和matlplotlib.pyplot(用于画图)

In [1]:
import numpy as np;
import matplotlib.pyplot as plt;

Lorenz 96模式的模式方程如下 \begin{equation} \frac{dx_j}{dt}=(x_{j+1}-x_{j-2})x_{j-1}-x_j+F, \end{equation}

为了写出这个模式:

  • 1) 定义模式方程的右端项
In [2]:
def L96_rhs(x):
    forcing = 8;
    dx=np.ones_like(x);
    L = len(x);
    for i in range(L):
        if i==0:
            dx[i] = (x[i+1]-x[L-2])*x[L-1]-x[i]+forcing;
        elif i==1:
            dx[i] = (x[i+1]-x[L-1])*x[i-1]-x[i]+forcing;
        elif i==L-1:
            dx[i] = (x[0]-x[i-2])*x[i-1]-x[i]+forcing;
        else:
            dx[i] = (x[i+1]-x[i-2])*x[i-1]-x[i]+forcing;

    return dx
  • 2) 使用Runge-Kutta 4次5阶公式进行常微分方程的求解,方法定义如下
In [3]:
def RK45(x,func,h):
    K1=func(x);
    K2=func(x+h/2*K1);
    K3=func(x+h/2*K2);
    K4=func(x+h*K3);
    x1=x+h/6*(K1+2*K2+2*K3+K4);
    return x1
  • 3) 使用RK45解方程来驱动模式前进一步
In [4]:
def L96_adv_1step(x0,delta_t):
    x1=RK45(x0,L96_rhs,delta_t)
    return x1

2.实验流程

设置基本参数:积分步长和模式变量数

In [5]:
delta_t=0.05;
model_dim = 36;

一般来说,整个实验的初值最好是用随机的初值进行长时间积分的spinup得到。spinup即长时间的积分,其目的是为了保证变量之间的关系符合物理关系。

In [6]:
import os
if not os.path.exists('x0true.npz'):
    x0true=np.random.randn(model_dim);
    t=0;
    spinup_steps = 14400;
    x1 = x0true;
    while t<spinup_steps:
        x2=L96_adv_1step(x1,delta_t);
        t+=1;
        x1=x2;
    x0true = x2;
    del x1;x2
    print('finish spinup');
    np.savez('x0true.npz',x0true);
else:
    TMPDAT = np.load('x0true.npz')
    print('load spinup')
    x0true = TMPDAT['arr_0']
load spinup

因为模式的0.05对应6小时,所以14400大约是150天。 spinup一般来说只需要运行一次,后续的同化不需要采用,所以保存数据,在下一次同化实验的时候直接读取。

为了进行孪生实验,需要自己构造数据进行同化,得到真实场和观测值,观测点可以是稀疏的,这里设置每4步同化观测一次,每次观测到一半的变量,观测具有随机误差,误差的高斯分布方差可控制,观测算子可以是非线性的算子,在这个例子里,我只考虑线性的观测算子,但是只观测一半的位置。

  • 1)首先定义观测相关的参数和观测算子如下
In [7]:
assim_period_steps = 1000;
tgrid=np.linspace(0,delta_t*assim_period_steps,assim_period_steps,endpoint=False);
assim_every_n_steps = 4;
obs_error_var = 1;
H_mat = np.eye(model_dim);
obs_every_n_vars = 2;
H_mat = H_mat[0:model_dim:obs_every_n_vars,:];  # observation factor in matrix form

tobs = tgrid[0:assim_period_steps:assim_every_n_steps];
  • 2)进行观测模拟如下:
In [8]:
if not os.path.exists('truth_obs.npz'):
    t=0;x1=x0true;
    Xtrue=np.zeros([model_dim,assim_period_steps]);
    OBS = np.zeros([model_dim//obs_every_n_vars,assim_period_steps//assim_every_n_steps]);
    while t<assim_period_steps:
        x2=L96_adv_1step(x1,delta_t)
        Xtrue[:,t]=x2;
        if t%assim_every_n_steps==0:
            OBS[:,t//assim_every_n_steps]=np.dot(H_mat,Xtrue[:,t])+np.random.randn(model_dim//obs_every_n_vars);
            tobs[t//assim_every_n_steps]=tgrid[t];
        x1=x2;
        t+=1;   
    del x1;x2
    print('Experimental data generated')
    np.savez('truth_obs.npz',Xtrue,OBS);
else:
    print('Experimental data loaded')
    ExprData = np.load('truth_obs.npz')
    Xtrue = ExprData['arr_0'];
    OBS = ExprData['arr_1'];    
Experimental data loaded

可以画图大致看一下精确解和观测

In [9]:
[xgrd,ygrd] = np.meshgrid(range(36),range(assim_period_steps))
plt.figure(figsize=(12,6))
plt.subplot(121)
z_lim = np.linspace(-10,10)
plt.title('TRUTH',fontsize=20)
plt.contourf(xgrd,ygrd,Xtrue.T,z_lim,cmap=plt.cm.rainbow)
plt.xticks(np.arange(0,36,10),fontsize=18)
plt.xlabel('Variable Index',fontsize=18)
plt.xlim([0,35])
plt.yticks(np.arange(0,1000,200),fontsize=18)
plt.ylabel('Model Steps',fontsize=18)
plt.colorbar()
plt.subplot(222)
plt.plot(tgrid,Xtrue[0],'k:',lw=4,label = 'True')
plt.plot(tobs,OBS[0],'r+',mew=4,ms=16,label = 'OBS')
plt.legend(fontsize=28,ncol=3)
plt.xlim([0,10])
plt.xlabel(str('dimensionless time').title(),fontsize=16)
plt.ylabel(r'$x_1$',fontsize=16)
plt.tight_layout(2)
plt.subplot(224)
plt.plot(np.arange(36),Xtrue[:,0],'k:',lw=4,label = 'True')
plt.plot(np.linspace(0,35,num=18),OBS[:,0],'r+',mew=4,ms=16,label = 'OBS')
plt.legend(fontsize=28,ncol=3)
plt.xlabel('Variable index',fontsize=16)
plt.ylabel('Truth value',fontsize=16)
Out[9]:
Text(0, 0.5, 'Truth value')

定义EAKF的分析算法如下:

  • com_cov_factor 计算局地化因子
  • obs_increment_eakf 计算观测增量
  • get_state_increments 把观测增量回归到模式网格
  • eakf_analysis 使用EAKF将输入的预报集合进行分析,输出分析集合
In [10]:
def comp_cov_factor(z_in,c):
    z=abs(z_in);
    if z<=c:
        r = z/c;
        cov_factor=((( -0.25*r +0.5)*r +0.625)*r -5.0/3.0)*r**2 + 1.0;
    elif z>=c*2.0:
        cov_factor=0.0;
    else:
        r = z / c;
        cov_factor = ((((r/12.0 -0.5)*r +0.625)*r +5.0/3.0)*r -5.0)*r + 4.0 - 2.0 / (3.0 * r);
    return cov_factor;
# observation increment for the observation site
def obs_increment_eakf(ensemble, observation, obs_error_var):
    prior_mean = np.mean(ensemble);
    prior_var = np.var(ensemble);
    if prior_var >0.1:
        post_var = 1.0 / (1.0 / prior_var + 1.0 / obs_error_var);
        post_mean = post_var * (prior_mean / prior_var + observation / obs_error_var);
    else:
        post_var = prior_var;
        post_mean = prior_mean;
    
    updated_ensemble = ensemble - prior_mean + post_mean;

    var_ratio = post_var / prior_var;
    updated_ensemble = np.sqrt(var_ratio) * (updated_ensemble - post_mean) + post_mean;

    obs_increments = updated_ensemble - ensemble;
    return obs_increments
# regression the obs_inc to model grid corresponds state_ens
def get_state_increments(state_ens, obs_ens, obs_incs):
    covar = np.cov(state_ens, obs_ens);
    state_incs = obs_incs * covar[0,1]/covar[1,1];
    return state_incs

def eakf_analysis(ensemble_in,obs_in,obs_error_var,H_mat,obs_every_n_vars,local_para):
#    c = 8; # localization factor
    L = len(ensemble_in);     # model dimension (model grids)
    m = len(obs_in);    # number of obs sites
    for i in range(m):
        ensemble_proj = np.dot(H_mat,ensemble_in); 
        obs_proj = ensemble_proj[i];   # project model grid to obs site
        obs_inc = obs_increment_eakf(obs_proj,obs_in[i],obs_error_var);
        for j in range(L):
            state_inc=get_state_increments(ensemble_in[j],obs_proj,obs_inc);
            dist = np.abs(obs_every_n_vars*i-j);
            if dist>L/2.0:
                dist=L-dist;
            cov_factor = comp_cov_factor(dist,local_para);
            
            ensemble_in[j]=ensemble_in[j]+cov_factor*state_inc;
    ensemble_out = ensemble_in;
    return ensemble_out
  • 3)下面即开始进行同化实验,同化结果考察分析场的平均值以及离散度(spread)。首先设定一些基本同化参数:
    • 集合成员数N,
    • inflation系数,
    • localization系数
In [11]:
Xassim = np.zeros_like(Xtrue);
Xspread = np.zeros_like(Xtrue);
N = 40; # ensemble size
infl = 1.01;
local_para = 4;
  • 4)开始准备同化之前,需要一个初始集合,我们使用随机扰动的方法获取初始集合
In [12]:
if not os.path.exists('Ensemble'+str(N)+'.npz'):
    Ens0 = np.repeat(x0true,N)+np.random.randn(model_dim*N);
    Ens0 = np.reshape(Ens0,(model_dim,-1));   # initial ensemble
    print('Initial Ensemble generated')
    np.savez('Ensemble'+str(N)+'.npz',Ens0);
else:
    print('Initial Ensemble loaded')
    TMPDAT = np.load('Ensemble'+str(N)+'.npz')
    Ens0=TMPDAT['arr_0']
Initial Ensemble loaded
  • 5)使用EAKF进行同化
In [13]:
method = 'EAKF'

Ens = Ens0;
Ens2 = np.zeros_like(Ens);
t=0;
while t<assim_period_steps:
    for n in range(N):
        Ens2[:,n]=L96_adv_1step(Ens[:,n],delta_t);
    Ens_mean = np.mean(Ens2,axis=1);
    for n in range(N):
        Ens2[:,n]= (Ens2[:,n]-Ens_mean)*infl + Ens_mean;
    if t%assim_every_n_steps==0:    # if it is the assimilation step;
        tassim = t//assim_every_n_steps;
        Ens2 = eakf_analysis(Ens2,OBS[:,tassim],obs_error_var,H_mat,obs_every_n_vars,local_para);
    else:
        pass
    Xassim[:,t] = np.mean(Ens2, axis=1);
    Xspread[:,t] = np.std(Ens2, axis=1);
    t+=1;
    Ens = Ens2;
    
print('Assimilation finished!')
Assimilation finished!

获取同化结果之后,可以画图验证:

  • 图1:$x_1$变量和$x_2$变量的时间序列
In [14]:
plt.figure(figsize=(10,6))
plt.subplot(211)
plt.xticks([],fontsize = 28);
plt.yticks(fontsize = 28);
plt.title(str('True State and data assimilation analyses').title(),fontsize=28)
plt.plot(tgrid,Xtrue[0],'k:',lw=4,label = 'True')
plt.ylabel(r'$x_1$ (observed)',fontsize=28)
plt.xlim(0,model_dim)
plt.plot(tobs,OBS[0],'r+',mew=4,ms=16,label = 'OBS')
plt.plot(tgrid,Xassim[0],lw=4,label = method +'_mean')
plt.legend(fontsize=28,ncol=3)
plt.tight_layout(2)
plt.subplot(212)
plt.xticks(np.array([0,10,20,30,40]),fontsize = 28);
plt.yticks(fontsize = 28);
plt.plot(tgrid,Xtrue[1],'k:',lw=4)
plt.ylabel(r'$x_2$ (unobserved)',fontsize=28)  
plt.xlim(0,model_dim)
plt.plot(tgrid,Xassim[1],lw=4)
plt.xlabel(str('dimensionless time').title(),fontsize=28)
Out[14]:
Text(0.5, 0, 'Dimensionless Time')

计算各时间的所有变量的均方根误差和离散度,以及各变量的时间均方根误差和离散度如下:

In [15]:
RMSE = np.zeros_like(tgrid);
RMSS = np.zeros_like(tgrid);
for j in range(assim_period_steps):
    RMSE[j] = np.sqrt(np.mean((Xtrue[:,j]-Xassim[:,j])**2));
    RMSS[j] = np.sqrt(np.mean(Xspread[:,j]**2));
tRMSE = np.zeros(model_dim);
tRMSS = np.zeros(model_dim);
for i in range(model_dim):
    tRMSE[i] = np.sqrt(np.mean((Xtrue[i,:]-Xassim[i,:])**2));
    tRMSS[i] = np.sqrt(np.mean(Xspread[i,:]**2));
  • 图2:RMSE和RMSS
In [16]:
plt.figure(figsize=(10,6))
plt.subplot(211)
plt.xticks(np.array([0,10,20,30,40]),fontsize = 28);
plt.yticks(np.array([0.5,1.0,1.5]),fontsize = 28);
plt.plot(tgrid,RMSE,lw=4,label = 'RMSE')
plt.plot(tgrid,RMSS,'r--',lw=2,label = 'RMSS')
plt.legend(ncol=2,fontsize = 28,loc=4)
plt.xlim(0,model_dim)
plt.ylim(0.2,1.5)
plt.xlabel(str('dimensionless time').title(),fontsize = 28)
plt.tight_layout(3)
plt.text(1, 0.5, 'average RMSE = %.2f' %np.mean(RMSE), bbox=dict(facecolor='grey', alpha=0.2),fontsize = 17)
plt.subplot(212)
plt.xticks(np.array([0.4,0.6,0.8,1.0,1.2]),fontsize = 28);
plt.yticks(np.array([0.4,0.6,0.8,1.0,1.2]),fontsize = 28);
plt.plot(tRMSS,tRMSE,'rx',ms=14,mew=3)
plt.xlim(0.3,1.2);
plt.ylim(0.3,1.2);
plt.ylabel(str('temporal RMSE').capitalize(),fontsize = 28)
plt.xlabel(str('temporal RMSS').capitalize(),fontsize = 28)
plt.plot(np.arange(0,5,.1),np.arange(0,5,.1),'k:',lw=4,label='y=x');
plt.legend(fontsize = 28);
  • 图3:整体的误差和离散度
In [17]:
import numpy as np
[xgrd,ygrd] = np.meshgrid(range(36),range(assim_period_steps))
plt.figure(figsize=(10,10))
plt.subplot(121)
z_lim = np.linspace(0,3,7)
plt.title('ABS ERRORS',fontsize=20)
plt.contourf(xgrd,ygrd,(np.abs(Xassim-Xtrue)).T,z_lim,cmap=plt.cm.rainbow)
plt.xticks(np.arange(0,36,10),fontsize=18)
plt.xlabel('Variable Index',fontsize=18)
plt.xlim([0,35])
plt.yticks(np.arange(0,1000,200),fontsize=18)
plt.ylabel('Model Steps',fontsize=18)
plt.colorbar()
plt.subplot(122)
plt.title('SPREAD',fontsize=20)
z_lim2=np.linspace(0,3,7)
plt.contourf(xgrd,ygrd,Xspread.T,z_lim2,cmap=plt.cm.rainbow)
plt.xticks(np.arange(0,36,10),fontsize=18)
plt.xlabel('Variable Index',fontsize=18)
plt.xlim([0,35])
plt.yticks([],fontsize=18)
plt.colorbar()
Out[17]:
<matplotlib.colorbar.Colorbar at 0x7f8360a46e20>

思考与练习:

  1. 考虑使用不同的集合成员数N
  2. 针对不同的N,讨论使用不同的局地化参数local_para的效果
  3. 对应最优同化效果的local_para是否与N有关
  4. 同化效果计算得到的RMSE和RMSS的是否有一定的关系