阿里云安全会扫描系统存在的漏洞,可在云安全中心控制台查看,开通企业版可在控制台直接修复,不过企业版很贵..
而且控制台只显示漏洞信息并未给出修复命令,但调用 DescribeVulList api看到返回结果是有修复命令的,所以可以自行拉取漏洞信息和修复命令进行手动修复。
以下python 代码可逐个获取实例的漏洞信息到csv 并汇总, 分离出每个实例漏洞的修复命令到文本文件。
# -*- coding: UTF-8 -*-
# 收集阿里ecs漏洞详情及修复命令
# by mo
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdksas.request.v20181203.DescribeVulListRequest import DescribeVulListRequest
from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
import json, csv, time, glob, os, sys, os.path
access_key_id = "" # 填入 accessKeyId, 没有的话去控制台新建
access_secret = "" # 填入 accessSecret
region_id = "" # 填入 RegionId 如:cn-hangzhou
client = AcsClient(access_key_id, access_secret, region_id)
# 获取所有实例内网ip,系统类型,实例名
def get_ip_os_name():
request = DescribeInstancesRequest()
request.set_accept_format('json')
request.set_PageNumber(1) # 超过百台ecs 则需修改页面数多次获取
request.set_PageSize(100)
request.set_Status("Running")
response = client.do_action_with_exception(request)
ip_os_name_list = []
# python2: print(response)
# print(str(response, encoding='utf-8'))
res = json.loads(response)
for ins in res["Instances"]["Instance"]:
private_ip = ins["VpcAttributes"]["PrivateIpAddress"]["IpAddress"][0]
os_type = ins["OSType"]
ins_name = ins["InstanceName"]
ip_os_name_list.append(private_ip)
ip_os_name_list.append(os_type)
ip_os_name_list.append(ins_name)
return ip_os_name_list
def get_cve_to_csv(ip, os_type, name):
if os_type == "linux":
bug_type = "cve"
else:
bug_type = "sys"
request = DescribeVulListRequest()
request.set_accept_format('json')
request.set_Lang("zh")
request.set_Remark(ip)
request.set_Type(bug_type)
#request.set_Necessity("asap") # asap: 高危 later: 中 nntf: 低
#request.set_Dealed("n")
request.set_PageSize(100)
response = client.do_action_with_exception(request)
# python2: print(response)
# print(str(response, encoding='utf-8'))
res = json.loads(response)
# ip = res["VulRecords"][0]["IntranetIp"]
folder_path = "C:/Users/Admin/Desktop/tmp/ali_cve/" # 保存目录, 需要提前建好
save_path = folder_path + time.strftime("%Y-%m-%d", time.localtime()) # 根据日期保存
try:
os.mkdir(save_path)
except:
prompt = "nothing to do "
filename = save_path + "/" + ip + "_" + name + ".csv"
with open(filename, 'w', encoding="utf-8", newline="") as f:
writer = csv.writer(f)
csv_head = ["实例id", "实例名称", "公网ip", "内网ip", "漏洞名称", "漏洞状态", "漏洞等级", "漏洞文件", "当前版本", "安全版本", "修复命令"]
writer.writerow(csv_head)
cmd = ""
for i in range(0, len(res["VulRecords"])):
instance_id = res["VulRecords"][i]["InstanceId"] # 根据返回的字典取出对应属性值,返回的字典可到json解析网站解析方便查看结构,列表元素取值要加下标
instance_name = res["VulRecords"][i]["InstanceName"]
pub_ip = res["VulRecords"][i]["Ip"]
inner_ip = res["VulRecords"][i]["IntranetIp"]
alias_name = res["VulRecords"][i]["AliasName"]
necessity_status = res["VulRecords"][i]["ExtendContentJson"]["Necessity"]["Status"]
rep_status = res["VulRecords"][i]["Status"]
if rep_status == 1:
status = "未修复" # status: 漏洞状态 3:回滚失败 4:修复中 5:回滚中 6:验证中 8:修复成功待重启 9:回滚成功 10:已忽略 12:漏洞不存在
elif rep_status == 2:
status = "修复失败"
elif rep_status == 7:
status = "修复成功"
elif rep_status == 6:
status = "验证中"
elif rep_status == 8:
status = "修复成功待重启"
else:
status = rep_status
if os_type == "linux": # linux 才有以下项目
for j in res["VulRecords"][i]["ExtendContentJson"]["RpmEntityList"]:
path = j["Path"]
update_cmd = j["UpdateCmd"]
now_version = j["Version"]
match_detail = j["MatchDetail"]
row_csv = [instance_id, instance_name, pub_ip, inner_ip, alias_name, status, necessity_status, path,
now_version, match_detail, update_cmd]
writer.writerow(row_csv)
# print(instance_id, instance_name, pub_ip, inner_ip, alias_name, status, necessity_status, path, now_version, match_detail, update_cmd)
cmd_append = update_cmd if "kernel" not in update_cmd and status == "未修复" else " " # 排除kernel 相关,只要未修复的命令
if cmd_append not in cmd: # 去重
cmd = cmd + cmd_append + "\n"
else: # 写入windows 详情
row_csv = [instance_id, instance_name, pub_ip, inner_ip, alias_name, status, necessity_status]
writer.writerow(row_csv)
if os_type == "linux": # 生成linux 修复命令文本文件
script_txt = save_path + "/" + ip + "_" + name + ".txt"
with open(script_txt, 'w', encoding='utf-8') as f1:
f1.write(cmd)
return save_path
def main():
# 逐个获取
list1 = get_ip_os_name() # [ip1, os1, name1, ip2, os2, name2, ... ]
for i in range(int(len(list1) / 3)):
ip = list1[i]
os_type = list1[i + 1]
name = list1[i + 2]
save_path = get_cve_to_csv(ip, os_type, name)
print("%s %s done" % (ip, name))
list1.pop(0)
list1.pop(0) # 每三个元素为一组传入数据,每个循环弹出列表前两个元素
# 合并到一个csv
csv_list = glob.glob(save_path + "/*.csv")
for i in csv_list:
fr = open(i, 'rb').read()
with open(save_path + '/漏洞汇总.csv', 'ab') as f:
f.write(fr)
print(u'合并完毕!')
if __name__ == "__main__":
main()
Comments | NOTHING