使用OpenAPI导出阿里云ecs系统漏洞及修复命令

发布于 2020-11-05  1.77k 次阅读


阿里云安全会扫描系统存在的漏洞,可在云安全中心控制台查看,开通企业版可在控制台直接修复,不过企业版很贵..
而且控制台只显示漏洞信息并未给出修复命令,但调用 DescribeVulList api看到返回结果是有修复命令的,所以可以自行拉取漏洞信息和修复命令进行手动修复。

以下python 代码可逐个获取实例的漏洞信息到csv 并汇总, 分离出每个实例漏洞的修复命令到文本文件。


# -*- coding: UTF-8 -*-

# 收集阿里ecs漏洞详情及修复命令
# by mo

from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdksas.request.v20181203.DescribeVulListRequest import DescribeVulListRequest
from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
import json, csv, time, glob, os, sys, os.path

access_key_id = ""  # 填入 accessKeyId, 没有的话去控制台新建
access_secret = ""  # 填入 accessSecret
region_id = ""  # 填入 RegionId 如:cn-hangzhou
client = AcsClient(access_key_id, access_secret, region_id)


# 获取所有实例内网ip,系统类型,实例名
def get_ip_os_name():
    request = DescribeInstancesRequest()
    request.set_accept_format('json')
    request.set_PageNumber(1)  # 超过百台ecs 则需修改页面数多次获取
    request.set_PageSize(100)
    request.set_Status("Running")

    response = client.do_action_with_exception(request)
    ip_os_name_list = []

    # python2:  print(response)
    # print(str(response, encoding='utf-8'))
    res = json.loads(response)
    for ins in res["Instances"]["Instance"]:
        private_ip = ins["VpcAttributes"]["PrivateIpAddress"]["IpAddress"][0]
        os_type = ins["OSType"]
        ins_name = ins["InstanceName"]
        ip_os_name_list.append(private_ip)
        ip_os_name_list.append(os_type)
        ip_os_name_list.append(ins_name)
    return ip_os_name_list


def get_cve_to_csv(ip, os_type, name):
    if os_type == "linux":
        bug_type = "cve"
    else:
        bug_type = "sys"
    request = DescribeVulListRequest()
    request.set_accept_format('json')

    request.set_Lang("zh")
    request.set_Remark(ip)
    request.set_Type(bug_type)
    #request.set_Necessity("asap")  # asap: 高危  later: 中  nntf: 低  
    #request.set_Dealed("n")
    request.set_PageSize(100)

    response = client.do_action_with_exception(request)
    # python2:  print(response)
    # print(str(response, encoding='utf-8'))
    res = json.loads(response)
    # ip = res["VulRecords"][0]["IntranetIp"]
    folder_path = "C:/Users/Admin/Desktop/tmp/ali_cve/"   # 保存目录, 需要提前建好
    save_path = folder_path + time.strftime("%Y-%m-%d", time.localtime())  # 根据日期保存
    try:
        os.mkdir(save_path)
    except:
        prompt = "nothing to do "
    filename = save_path + "/" + ip + "_" + name + ".csv"
    with open(filename, 'w', encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        csv_head = ["实例id", "实例名称", "公网ip", "内网ip", "漏洞名称", "漏洞状态", "漏洞等级", "漏洞文件", "当前版本", "安全版本", "修复命令"]
        writer.writerow(csv_head)
        cmd = ""
        for i in range(0, len(res["VulRecords"])):
            instance_id = res["VulRecords"][i]["InstanceId"]  # 根据返回的字典取出对应属性值,返回的字典可到json解析网站解析方便查看结构,列表元素取值要加下标
            instance_name = res["VulRecords"][i]["InstanceName"]
            pub_ip = res["VulRecords"][i]["Ip"]
            inner_ip = res["VulRecords"][i]["IntranetIp"]
            alias_name = res["VulRecords"][i]["AliasName"]
            necessity_status = res["VulRecords"][i]["ExtendContentJson"]["Necessity"]["Status"]
            rep_status = res["VulRecords"][i]["Status"]
            if rep_status == 1:
                status = "未修复"     # status: 漏洞状态  3:回滚失败  4:修复中  5:回滚中 6:验证中  8:修复成功待重启 9:回滚成功 10:已忽略 12:漏洞不存在
            elif rep_status == 2:
                status = "修复失败"
            elif rep_status == 7:
                status = "修复成功"
            elif rep_status == 6:
                status = "验证中"
            elif rep_status == 8:
                status = "修复成功待重启"
            else:
                status = rep_status

            if os_type == "linux":     # linux 才有以下项目
                for j in res["VulRecords"][i]["ExtendContentJson"]["RpmEntityList"]:
                    path = j["Path"]
                    update_cmd = j["UpdateCmd"]
                    now_version = j["Version"]
                    match_detail = j["MatchDetail"]
                    row_csv = [instance_id, instance_name, pub_ip, inner_ip, alias_name, status, necessity_status, path,
                               now_version, match_detail, update_cmd]
                    writer.writerow(row_csv)
                    # print(instance_id, instance_name, pub_ip, inner_ip, alias_name, status, necessity_status, path, now_version, match_detail, update_cmd)
                    cmd_append = update_cmd if "kernel" not in update_cmd and status == "未修复" else " "   # 排除kernel 相关,只要未修复的命令
                    if cmd_append not in cmd:    # 去重
                        cmd = cmd + cmd_append + "\n"
            else:  # 写入windows 详情
                row_csv = [instance_id, instance_name, pub_ip, inner_ip, alias_name, status, necessity_status]
                writer.writerow(row_csv)

        if os_type == "linux":  # 生成linux 修复命令文本文件
            script_txt = save_path + "/" + ip + "_" + name + ".txt"
            with open(script_txt, 'w', encoding='utf-8') as f1:
                f1.write(cmd)

    return save_path

def main():
    # 逐个获取
    list1 = get_ip_os_name()  # [ip1, os1, name1, ip2, os2, name2, ... ]
    for i in range(int(len(list1) / 3)):
        ip = list1[i]
        os_type = list1[i + 1]
        name = list1[i + 2]
        save_path = get_cve_to_csv(ip, os_type, name)
        print("%s %s done" % (ip, name))
        list1.pop(0)
        list1.pop(0)  # 每三个元素为一组传入数据,每个循环弹出列表前两个元素

    # 合并到一个csv
    csv_list = glob.glob(save_path + "/*.csv")
    for i in csv_list:
        fr = open(i, 'rb').read()
        with open(save_path + '/漏洞汇总.csv', 'ab') as f:
            f.write(fr)
    print(u'合并完毕!')


if __name__ == "__main__":
    main()


使用OpenAPI导出阿里云ecs系统漏洞及修复命令
导出效果
使用OpenAPI导出阿里云ecs系统漏洞及修复命令
导出效果