cdxy.me
Cyber Security / Data Science / Trading

继上一篇文章,本篇尝试结合基础攻防数据测试效果。

基础攻防领域的异常检测问题

在基础攻防领域的异常检测应用场景很多,如暴力破解、端口扫描、DDoS、CC、爬虫等。这些行为的共性是——流量侧的统计特征会观测到明显异常。

如:

  • 某服务器被反射DDoS攻击时,会观测到QPS/源IP数量急剧增高。
  • 某IP对某端口进行暴力破解时,会观测到count(distinct request) group by src_ip,dst_ip,dst_port急剧升高。
  • 某扫描器对HTTP POST请求参数key=value的值进行fuzz时,会观测到count(distinct value) group by src_ip,dst_ip,key急剧升高。

固定阈值

如何运用上文这些异常特征,来为入侵检测服务?

一种古老的方法是:

  • 固定阈值——即规定1分钟内访问的端口数超过50,判定端口扫描。1分钟内尝试登录的密码超过100个,判定暴力破解。

举例说明这种方法的局限:

  • 目录fuzz是一种场景的入侵前奏,黑客通过枚举敏感URL地址来碰撞网站的后台地址、或者碰撞出敏感文件。这种方式在统计特征上表达为QPS升高、URL文件后缀种类变多、HTTP 404比例升高等。如果单从这些维度判定(如单时间窗口内QPS高于10,URL后缀高于20,404比例在0.9以上)则会导致误报——一种典型的场景是CDN/资源存储型站点。这种站点存放了不同使用者的大量文件,QPS、文件后缀数量、404比例都普遍很高。

在这种场景下,异常检测可以作为降低误报的工具,通过历史数据的学习来评估某个统计特征是否适用于目标环境。

3-sigma与滑动窗口

3sigma方法中,异常值被定义为一组结果值中与平均值的偏差超过三倍标准差的值。在标注正态分布的假设下,超过3sigma的数据概率为0.003,因此可认为是异常值。

在之前的DataCon安全大数据分析比赛中,我们使用3-sigma找出各种DNS流量中攻击案例:

在持续的异常判定模型中,这种方法一般被用于baseline。通过计算历史一段时间窗口内的3-sigma阈值,并据此判定下一时刻出现的数据是否为异常,过程表现为滑动窗口,因此从时序来看3-sigma阈值线是动态变化的。

取某网站的QPS五分钟统计量共1343条,窗口长度300,将QPS值(蓝色)、均值(橙色)、3-sigma阈值(绿色)绘制如下。

png

下方黑色竖线表示异常检测 (观测值>历史3-sigma阈值) 的结果。

动态阈值算法

前文提到paper中通过时序异常检测识别卫星信号异常,流程如下:

  1. 滑动窗口+LSTM学习时序行为,并给出下一步预测。
  2. 求预测值与真实观测值的误差。
  3. 对误差使用EWMA平滑。
  4. 采用动态阈值方法判定该误差是否为异常。

其中安全场景时序特征弱,LSTM价值在于对波形的向量化,这里不需要学习波形和时序平滑。直接跳入第四步看动态阈值算法带来的变化。

原文算法实现

  • https://github.com/khundman/telemanom
def find_epsilon(e_s, error_buffer, sd_lim=12.0):
    '''Find the anomaly threshold that maximizes function representing tradeoff between a) number of anomalies
    and anomalous ranges and b) the reduction in mean and st dev if anomalous points are removed from errors
    (see https://arxiv.org/pdf/1802.04431.pdf)

    Args:
        e_s (array): residuals between y_test and y_hat values (smoothes using ewma)
        error_buffer (int): if an anomaly is detected at a point, this is the number of surrounding values 
            to add the anomalous range. this promotes grouping of nearby sequences and more intuitive results
        sd_lim (float): The max number of standard deviations above the mean to calculate as part of the 
            argmax function

    Returns:
        sd_threshold (float): the calculated anomaly threshold in number of standard deviations above the mean
    '''

    mean = np.mean(e_s)
    sd = np.std(e_s)

    max_s = 0
    sd_threshold = sd_lim # default if no winner or too many anomalous ranges

    for z in np.arange(2.5, sd_lim, 0.5):
        epsilon = mean + (sd*z)
        pruned_e_s, pruned_i, i_anom  = [], [], []

        for i,e in enumerate(e_s):
            if e < epsilon:
                pruned_e_s.append(e)
                pruned_i.append(i)
            if e > epsilon:
                for j in range(0, error_buffer):
                    if not i + j in i_anom and not i + j >= len(e_s):
                        i_anom.append(i + j)
                    if not i - j in i_anom and not i - j < 0:
                        i_anom.append(i - j)

        if len(i_anom) > 0:
            # preliminarily group anomalous indices into continuous sequences (# sequences needed for scoring)
            i_anom = sorted(list(set(i_anom)))
            groups = [list(group) for group in mit.consecutive_groups(i_anom)]
            E_seq = [(g[0], g[-1]) for g in groups if not g[0] == g[-1]]

            perc_removed = 1.0 - (float(len(pruned_e_s)) / float(len(e_s)))
            mean_perc_decrease = (mean - np.mean(pruned_e_s)) / mean
            sd_perc_decrease = (sd - np.std(pruned_e_s)) / sd
            s = (mean_perc_decrease + sd_perc_decrease) / (len(E_seq)**2 + len(i_anom))
#             print('z=',z,'s=',s)

            # sanity checks
            if s >= max_s and len(E_seq) <= 5 and len(i_anom) < (len(e_s)*0.5):
                sd_threshold = z
                max_s = s

    return sd_threshold #multiply by sd to get epsilon

通过刚才的QPS数据测试效果。

def calc(x,y,rolling_window,err_buffer,max_z=6):
    y = y.astype(float)
    mean = y.rolling(rolling_window).mean()
    ewma = y.ewm(span=rolling_window*0.03,ignore_na=True).mean()
    std = y.rolling(rolling_window).std()
    epsilon = y.rolling(rolling_window).apply(lambda x:find_epsilon(x,err_buffer,max_z),raw=True)
#     epsilon = y.rolling(rolling_window).apply(lambda x:find_epsilon_new(x,err_buffer,max_z),raw=True)
    threshold_test = mean+epsilon*std
    threshold_3sigma = mean+3*std

    print('calc on: \nlen(y)',len(y),'\nrolling_window',rolling_window,'\nerr_buffer',err_buffer,'\nmax_z',max_z)

    plt.figure(figsize=(20,5))
    plt.plot(x,y,label='y') # 观测值
    plt.plot(x,mean,label='mean') # 均值
    plt.plot(x,ewma,label='ewma') # 加权平均
    plt.plot(x,threshold_3sigma,label='z_3') # 3-sigma
    plt.plot(x,threshold_test,label='z_dynamic') # 动态阈值

    for i in range(len(y)):
        if y[i]>threshold_3sigma[i]: # 3-sigma检测结果
            plt.vlines(x[i],ymin=-1,ymax=-0.1)
        if y[i]>threshold_test[i]: # 动态阈值检测结果
            plt.vlines(x[i],ymin=-2,ymax=-1.1)

    plt.legend()
    plt.show()

动态阈值结果比3-sigma少了一些异常点。

png

样本在10.25日出现唯一一次目录fuzz行为,同时触发了QPS和URL后缀数量的异常。动态阈值检测过程规避了一些可能误报的点。

png

改动1:
算法失效时将使用default 3-sigma曲线暴露更多异常点。

sd_threshold = sd_lim # default if no winner or too many anomalous ranges

png

改动2:
对threshold做平滑避免突发型尖峰带来的漏报。

threshold_ewma = threshold_test.ewm(span=rolling_window*0.03,ignore_na=True).mean()

png

模型在不同场景表现

大型波动

png

小型波动

png

持续增长

png

异常检测方法在入侵检测落地的假设是——异常是威胁的必要非充分条件。这意味着上述方法直接产出告警会产生大量误报,但可作已有模型降误报使用。