首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >pastis样本同步代码阅读记录

pastis样本同步代码阅读记录

作者头像
用户1423082
发布2024-12-31 20:17:50
发布2024-12-31 20:17:50
14000
代码可运行
举报
文章被收录于专栏:giantbranch's bloggiantbranch's blog
运行总次数:0
代码可运行

简单看下https://github.com/quarkslab/pastis/的样本同步相关的代码

fuzz端

fuzz端就两个功能,发送新增的样本或者crash,以及接收新样本

发送新增样本或者crash

以afl++为例: https://github.com/quarkslab/pastis/blob/56f71b9c7cf25ddf2035d1abbe35f67d55378bb9/engines/pastis-aflpp/pastisaflpp/driver.py#L51

会对corpus_dir和crash_dir的文件创建进行hook,一旦有新的文件创建,就调用__send_seed__send_crash函数, afl的fuzzer_stats文件有修改也会调用__send_telemetry发送

代码语言:javascript
代码运行次数:0
运行
复制
# Configure hookds on workspace
self.workspace.add_creation_hook(self.workspace.corpus_dir, self.__send_seed)
self.workspace.add_creation_hook(self.workspace.crash_dir, self.__send_crash)
self.workspace.add_file_modification_hook(self.workspace.stats_dir, self.__send_telemetry)

下面就是发送函数

代码语言:javascript
代码运行次数:0
运行
复制
def __send_seed(self, filename: Path):
    self.__send(filename, SeedType.INPUT)

def __send_crash(self, filename: Path):
    # Skip README file that AFL adds to the crash folder.
    if filename.name != 'README.txt':
        self.__send(filename, SeedType.CRASH)

def __send(self, filename: Path, typ: SeedType):
    self._tot_seeds += 1
    file = Path(filename)
    raw = file.read_bytes()
    h = self.hash_seed(raw)
    logging.debug(f'[{typ.name}] Sending new: {h} [{self._tot_seeds}]')
    if h not in self._seed_recvs:
        self._agent.send_seed(typ, raw)
    else:
        logging.info("seed (previously sent) do not send it back")
    self._queue_to_send.append((filename, True if typ == SeedType.CRASH else False))

接收新样本

class AFLPPDriver类的__init__函数会调用self.__setup_agent()来初始化回调函数

代码语言:javascript
代码运行次数:0
运行
复制
def __setup_agent(self):
    # Register callbacks.
    self._agent.register_seed_callback(self.__seed_received)
    self._agent.register_stop_callback(self.__stop_received)

接收函数

代码语言:javascript
代码运行次数:0
运行
复制
def __seed_received(self, typ: SeedType, seed: bytes):
    h = self.hash_seed(seed)
    logging.info(f"[SEED] received  {h} ({typ.name})")
    self._seed_recvs.add(h)
    self.add_seed(seed)

通过add_seed可以看到,通过md5文件名存储,说明通过md5去重的

代码语言:javascript
代码运行次数:0
运行
复制
def add_seed(self, seed: bytes):
    seed_path = self.workspace.dynamic_input_dir / f"seed-{hashlib.md5(seed).hexdigest()}"
    seed_path.write_bytes(seed)

服务端

下面是收到样本(seed)的回调函数

代码语言:javascript
代码运行次数:0
运行
复制
def register_seed_callback(self, cb: Callable) -> None:
    """
    Register a callback called when an input seed is received from the
    broker. The callback function take 2 parameters seed type and content.

    :param cb: callback function
    """
    self.register_callback(MessageType.INPUT_SEED, cb)

在set_proxy函数中会设置这个回调函数

代码语言:javascript
代码运行次数:0
运行
复制
def _register_all(self):
    self.register_seed_callback(self.seed_received)
    self.register_hello_callback(self.hello_received)
    self.register_log_callback(self.log_received)
    self.register_telemetry_callback(self.telemetry_received)
    self.register_stop_coverage_callback(self.stop_coverage_received)
    self.register_data_callback(self.data_received)

而在PastisBroker类的__init__函数会调用self._register_all(),收到种子后就调用seed_received函数

可以看到这里哦天哪故宫md5计算,但是没用来判断,只是用_seed_pool这个字段判断,

代码语言:javascript
代码运行次数:0
运行
复制
def seed_received(self, cli_id: bytes, typ: SeedType, seed: bytes):
    cli = self.get_client(cli_id)
    if not cli:
        return
    is_new = seed not in self._seed_pool
    h = md5(seed).hexdigest()

    # Show log message and save seed to file
    self.statmanager.update_seed_stat(cli, typ)  # Add info only if new
    cli.log(LogLevel.INFO, f"seed {h} [{self._colored_seed_type(typ)}][{self._colored_seed_newness(is_new)}]")
    cli.add_own_seed(seed)  # Add seed in client's seed
    self.write_seed(typ, cli.strid, seed) # Write seed to file

    if is_new:
        self._seed_pool[seed] = typ  # Save it in the local pool
    else:
        pass
        # logging.warning(f"receive duplicate seed {h} by {cli.strid}")

    # Iterate on all clients and send it to whomever never received it
    if self.broker_mode == BrokingMode.FULL:
        self.send_seed_to_all_others(cli.netid, typ, seed)

    if self.is_proxied:  # Forward it to the proxy
        self._proxy.send_seed(typ, seed)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • fuzz端
    • 发送新增样本或者crash
    • 接收新样本
  • 服务端
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档