前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >安全预算不足?用Masscan、Nmap与ELK让甲方的内网资产收集不再难!

安全预算不足?用Masscan、Nmap与ELK让甲方的内网资产收集不再难!

作者头像
释然
发布2024-06-26 09:11:45
980
发布2024-06-26 09:11:45
举报
文章被收录于专栏:释然IT杂谈释然IT杂谈

在当今企业数字化转型的浪潮中,信息安全部门承担着日益增长的资产保护责任。为应对这一挑战,本文介绍一种高效的内网资产收集方法,尤其适合那些缺乏足够安全预算购买商业安全工具的组织或企业。通过开源工具的利用,即使是拥有有限技术资源的团队也能实现对内网资产的全面测绘和管理。

工具部署:

首先,需要部署以下工具:

  • Masscan:一个快速的网络扫描工具,用于识别网络中活跃的主机。
  • Nmap:一个深入的网络分析工具,用于识别服务、版本信息以及潜在的漏洞。
  • ELK Stack:一个强大的日志分析和可视化平台,用于数据的存储和展示。

部署步骤如下:

安装Masscan及其依赖项,确保环境支持高效的网络扫描。

代码语言:javascript
复制
yum install git gcc make libpcap-devel
git clone https://github.com/robertdavidgraham/masscan
cd masscan
make
cp bin/masscan  /bin

安装最新版本的Nmap,获取最准确的扫描结果。

代码语言:javascript
复制
wget https://nmap.org/dist/nmap-7.95-2.x86_64.rpm
#释然IT杂谈
rpm -ivh nmap-7.95-2.x86_64.rpm

通过Docker快速启动Elasticsearch和Kibana服务,为数据管理和可视化提供支持。

代码语言:javascript
复制
docker run -d --name es -p 127.0.0.1:9201:9200 -p 9300:9300 -e ES_JAVA_OPTS="-Xms2G -Xmx2G" -e "discovery.type=single-node"  docker.elastic.co/elasticsearch/elasticsearch-oss:7.1.1   

docker run --name kibana -d -p 5601:5601 -e ELASTICSEARCH_HOSTS=http://127.0.0.1:9201   docker.elastic.co/kibana/kibana-oss:7.1.1   

资产收集与分析流程:

资产收集的流程分为三个关键步骤:

  • 主机发现:使用Masscan对指定网络范围进行扫描,快速识别存活的主机。
  • 详细扫描:对发现的存活主机使用Nmap进行详细扫描,并将结果以XML格式导出。
  • 数据整合与可视化:将XML数据转换为JSON格式,导入到Elasticsearch中,并通过Kibana实现数据的可视化展示。

自动化实现:

为提高效率,自动化脚本是关键:

  • 利用多线程技术并行执行Masscan和Nmap扫描。
  • 开发自定义函数将扫描结果从XML转换为JSON格式。
  • 将转换后的数据索引到Elasticsearch中,并使用Kibana进行数据的实时监控和分析。(释然IT杂谈)

代码语言:javascript
复制
from elasticsearch import Elasticsearch
from settings import es_ip, es_port, ip_list
from utils import Logger
import xmltodict
import os
import threading
import time
import json

logger = Logger.get_logger(__name__, path=os.getcwd())
es = Elasticsearch([{'host': es_ip, 'port': es_port}])


def xml_to_json(path):
    try:
        with open(path, 'r') as load_f:
            temp_ = xmltodict.parse(load_f).get("nmaprun")
            return {key: temp_[key]
                    for key in temp_
                    if key not in ["verbose", 'scaninfo', 'taskbegin', 'taskend', "debugging"]}

    except Exception as e:
        logger.error(str(e)+path)
        return {}


def json_to_es(index, json_):
    try:
        es.index(index=index, doc_type="vuln", body=json_)
    except Exception as e:
        try:
            es.index(index=index + '_', doc_type="vuln", body=json.dumps(json_))
        except Exception as e:
            try:
                es.index(index=index + '__', doc_type="vuln", body=json.dumps(json_))
            except Exception as e:
                logger.error(str(e))
                pass


def masscan_scan(ip):
    try:
        if ip:
            if not os.path.exists('tmp'):
                os.makedirs('tmp')
            os.system('masscan --ping {0} --rate 1000 -oL tmp/{1}.txt'.format(ip, ip.split('/')[0]))
    except Exception as e:
        logger.error(str(e))


def nmap_scan(ip):
    try:
        if ip:
            if not os.path.exists('report'):
                os.makedirs('report')
            os.system('nmap -sV -Pn -A -T5 --script=nmap-vulners/vulners.nse -oX report/{0}.xml {1}'.format(ip, ip))
    except Exception as e:
        logger.error(str(e))


def masscan_scan_worker():
#释然IT杂谈
    t_obj = []
    for i in range(len(ip_list)):
        t = threading.Thread(target=masscan_scan, args=(ip_list[i],))
        t_obj.append(t)
        t.start()
    for t in t_obj:
        t.join()
    if not os.path.exists('alive_host'):
        os.makedirs('alive_host')
    cmd = """ awk '{print $4}' tmp/*.txt | tr -s '\n' > alive_host/host.txt """
    os.system(cmd)
    os.system("""rm -f tmp/*.txt""")


def read_txt(path):
    lines = []
    try:
        with open(path, 'r') as file_to_read:
            while True:
                line = file_to_read.readline()
                if not line:
                    break
                line = line.strip('\n')
                lines.append(line)
    except Exception as e:
        logger.error(str(e))
    return lines


def nmap_scan_worker():
    lst = read_txt('alive_host/host.txt')
    tmp_ = [lst[i:i+5] for i in range(0, len(lst), 5)]
    print tmp_
    if tmp_:
        for list_ in tmp_:
            t_obj = []
            for i in range(len(list_)):
                t = threading.Thread(target=nmap_scan, args=(list_[i],))
                t_obj.append(t)
                t.start()
            for t in t_obj:
                t.join()


def nmap_to_es(index):
        if os.path.exists('report'):
            files = os.listdir('report')
        for file in files:
                json_to_es(index, xml_to_json('report'+'/'+file))
        os.system("""rm -f report/*.xml""")


if __name__ == '__main__':

    while True:
        now = time.strftime('%Y-%m-%d')
        masscan_scan_worker()
        nmap_scan_worker()
        es.indices.delete('nmap-*')
        nmap_to_es('nmap-' + now)

完整代码: https://github.com/njcx/nmap_to_es.git

结果展示:

最终,通过Kibana Dashboard,用户可以直观地查看内网资产的状态,包括但不限于开放端口、运行的服务及其版本信息,以及可能存在的安全漏洞。

本文提供的内网资产收集方法,不仅成本低廉,而且易于操作和实现。希望这些经验能够为信息安全从业者提供实用的参考,帮助他们更好地理解和保护企业的关键资产。

文章声明

本文素材整理自网络公开领域,版权归原作者所有,由释然IT杂谈排版成文,转载请注明出处,侵删。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 释然IT杂谈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 完整代码: https://github.com/njcx/nmap_to_es.git
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档