前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >ClusterFuzz的bot源码(fuzz engine的选择与调度之libfuzzer)阅读

ClusterFuzz的bot源码(fuzz engine的选择与调度之libfuzzer)阅读

作者头像
用户1423082
发布2024-12-31 20:15:30
发布2024-12-31 20:15:30
4200
代码可运行
举报
文章被收录于专栏:giantbranch's bloggiantbranch's blog
运行总次数:0
代码可运行

回顾与总览

上一次我们选择了fuzz task的代码进行阅读,这次我们进一步深入,看看fuzz engine的选择

先回到上次说的引擎类:https://www.giantbranch.cn/2020/05/22/ClusterFuzz%E7%9A%84bot%E6%BA%90%E7%A0%81(fuzz%20task)%E9%98%85%E8%AF%BB/#%E5%BC%95%E6%93%8E%E7%B1%BB

注册的时候当时我们有疑问说怎么没有afl,现在在读,除了有afl了,还有一个blackbox

src/python/bot/fuzzers/init.py

代码语言:javascript
代码运行次数:0
复制
def run():
  """Initialise builtin fuzzing engines."""
  engine.register('afl', afl_engine.AFLEngine)
  engine.register('blackbox', blackbox_engine.BlackboxEngine)
  engine.register('honggfuzz', honggfuzz_engine.HonggfuzzEngine)
  engine.register('libFuzzer', libFuzzer_engine.LibFuzzerEngine)
  engine.register('syzkaller', syzkaller_engine.SyzkallerEngine)

现在fuzz_task整个调用路径是:

代码语言:javascript
代码运行次数:0
复制
获取到任务->fuzz_task.py中的execute_task->FuzzingSession->run()->engine.get获取具体的引擎类,调用do_engine_fuzzing(engine_impl)  ->  run_engine_fuzzer(engine_impl, self.fuzz_target.binary, sync_corpus_directory, self.testcase_directory) ->

run_engine_fuzzer中,调用prepare生成FuzzOptions(里面也设置了一些fuzz的策略),之后最后调用下面的函数启动fuzz

代码语言:javascript
代码运行次数:0
复制
options = engine_impl.prepare(sync_corpus_directory, target_path, build_dir)
fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT')
  additional_processing_time = engine_impl.fuzz_additional_processing_timeout(
      options)
......
......
......
result = engine_impl.fuzz(target_path, options, testcase_directory,
                            fuzz_test_timeout)

即下面的类中的fuzz的函数

代码语言:javascript
代码运行次数:0
复制
AFLEngine
BlackboxEngine
HonggfuzzEngine
LibFuzzerEngine
SyzkallerEngine

先来看libfuzzer

libfuzzer

prepare

先看prepare,首先获取参数

代码语言:javascript
代码运行次数:0
复制
arguments = fuzzer.get_arguments(target_path)

参数就是先看看XXX.options文件是否存在(其中XXX为fuzz_target的名字),存在则返回fuzzer_options(类型FuzzerOptions类),通过fuzzer_options.get_engine_arguments(‘libfuzzer’)获取FuzzerArguments(arguments),之后通过获取FuzzerArguments的list方法转化为元素为”-%s=%s”的形式的list,之后就是加上rss_limit_mb设置内存限制,还有timeout的设置

之后获取grammar,这个也是从XXX.options的grammar section中获取的(这个在oss-fuzz中的项目中的options中没找到有这个section,是跟peach相关的,peach的模板的)

代码语言:javascript
代码运行次数:0
复制
grammar = fuzzer.get_grammar(target_path)

继续,是生成一个策略池,之后选择策略

代码语言:javascript
代码运行次数:0
复制
strategy_pool = strategy_selection.generate_weighted_strategy_pool(
        strategy_list=strategy.LIBFUZZER_STRATEGY_LIST,
        use_generator=True,
        engine_name=self.name)
strategy_info = libfuzzer.pick_strategies(strategy_pool, target_path,
                                              corpus_dir, arguments, grammar)

generate_weighted_strategy_pool

首先generate_weighted_strategy_pool是根据经验设定好的概率生成策略池

策略列表如下,还是很多的

代码语言:javascript
代码运行次数:0
复制
LIBFUZZER_STRATEGY_LIST = [
    CORPUS_MUTATION_RADAMSA_STRATEGY,
    RANDOM_MAX_LENGTH_STRATEGY,
    CORPUS_MUTATION_ML_RNN_STRATEGY,
    VALUE_PROFILE_STRATEGY,
    FORK_STRATEGY,
    CORPUS_SUBSET_STRATEGY,
    RECOMMENDED_DICTIONARY_STRATEGY,
    DATAFLOW_TRACING_STRATEGY,
    MUTATOR_PLUGIN_STRATEGY,
    MUTATOR_PLUGIN_RADAMSA_STRATEGY,
    PEACH_GRAMMAR_MUTATION_STRATEGY,
]

generate_weighted_strategy_pool函数首先获取环境变量

代码语言:javascript
代码运行次数:0
复制
distribution = environment.get_value('STRATEGY_SELECTION_DISTRIBUTION')

之后从STRATEGY_SELECTION_DISTRIBUTION这里随机选取一个作为策略

代码语言:javascript
代码运行次数:0
复制
strategy_selection = utils.random_weighted_choice(distribution_tuples,
                                                    'probability')

,否则使用默认的,调用generate_default_strategy_pool

代码语言:javascript
代码运行次数:0
复制
return generate_default_strategy_pool(strategy_list, use_generator)

一开始初始化一个StrategyPool类,之后选择生成器,最后就将LIBFUZZER_STRATEGY_LIST中非GENERATORS的策略都加到策略池

代码语言:javascript
代码运行次数:0
复制
def generate_default_strategy_pool(strategy_list, use_generator):
  """Return a strategy pool representing a selection of strategies for launcher
  to consider.

  Select strategies according to default strategy selection method."""
  pool = StrategyPool()

  # If use_generator is enabled, decide whether to include radamsa, ml rnn,
  # or no generator (mutually exclusive).
  if use_generator:
    choose_generator(pool)

  # Decide whether or not to add non-generator strategies according to
  # probability parameters.
  for value in [
      strategy_entry for strategy_entry in strategy_list
      if strategy_entry not in GENERATORS
  ]:
    if do_strategy(value):
      pool.add_strategy(value)

  logs.log('Strategy pool was generated according to default parameters. '
           'Chosen strategies: ' + ', '.join(pool.strategy_names))
  return pool

这个choose_generator的功能是通过生成随机数,跟radamsa_prob + ml_rnn_prob比较,假如生成的随机数比较大(比radamsa_prob + ml_rnn_prob大),那就都不选择, 不选用radamsa和ml_rnn(机器学习相关的),假如比较小再调用一次decide_with_probability进行选择radamsa或者是ml_rnn

代码语言:javascript
代码运行次数:0
复制
# /src/python/bot/fuzzers/engine_common.py
def decide_with_probability(probability):
  """Decide if we want to do something with the given probability."""
  return random.SystemRandom().random() < probability
  
# /src/python/bot/fuzzers/strategy_selection.py
 def choose_generator(strategy_pool):
  """Chooses whether to use radamsa, ml rnn, or no generator and updates the
  strategy pool."""

  radamsa_prob = engine_common.get_strategy_probability(
      strategy.CORPUS_MUTATION_RADAMSA_STRATEGY.name,
      default=strategy.CORPUS_MUTATION_RADAMSA_STRATEGY.probability)

  ml_rnn_prob = engine_common.get_strategy_probability(
      strategy.CORPUS_MUTATION_ML_RNN_STRATEGY.name,
      default=strategy.CORPUS_MUTATION_ML_RNN_STRATEGY.probability)

  if engine_common.decide_with_probability(radamsa_prob + ml_rnn_prob):
    if engine_common.decide_with_probability(
        radamsa_prob / (radamsa_prob + ml_rnn_prob)):
      strategy_pool.add_strategy(strategy.CORPUS_MUTATION_RADAMSA_STRATEGY)
    else:
      strategy_pool.add_strategy(strategy.CORPUS_MUTATION_ML_RNN_STRATEGY)

libfuzzer.pick_strategies

接下来看libfuzzer.pick_strategies,里面就是对各种策略处理,实际将策略需要的工作完成,并返回StrategyInfo

代码语言:javascript
代码运行次数:0
复制
StrategyInfo(fuzzing_strategies, arguments, additional_corpus_dirs,
                      extra_env, use_dataflow_tracing, is_mutations_run)
DATAFLOW_TRACING_STRATEGY

对于有DFSAN构建的程序以及策略池中有DATAFLOW_TRACING_STRATEGY,先获取dataflow_binary_path(DFSAN的二进制fuzzer路径),之后判断dataflow_trace_dir是否存在,不存在就不执行这个策略了,存在则添加参数 -data_flow_trace=dataflow_trace_dir,后面再加参数-focus_function=auto,最后将策略的名字添加到fuzzing_strategies

代码语言:javascript
代码运行次数:0
复制
# Depends on the presense of DFSan instrumented build.
  dataflow_build_dir = environment.get_value('DATAFLOW_BUILD_DIR')
  use_dataflow_tracing = (
      dataflow_build_dir and
      strategy_pool.do_strategy(strategy.DATAFLOW_TRACING_STRATEGY))
  if use_dataflow_tracing:
    dataflow_binary_path = os.path.join(
        dataflow_build_dir, os.path.relpath(fuzzer_path, build_directory))
    dataflow_trace_dir = dataflow_binary_path + DATAFLOW_TRACE_DIR_SUFFIX
    if os.path.exists(dataflow_trace_dir):
      arguments.append(
          '%s%s' % (constants.DATA_FLOW_TRACE_FLAG, dataflow_trace_dir))
      arguments.append('%s%s' % (constants.FOCUS_FUNCTION_FLAG, 'auto'))
      fuzzing_strategies.append(strategy.DATAFLOW_TRACING_STRATEGY.name)
    else:
      logs.log_warn(
          'Dataflow trace is not found in dataflow build, skipping strategy.')
      use_dataflow_tracing = False
CORPUS_MUTATION

接下来是Generate new testcase mutations的

首先看看strategy_pool中是否有CORPUS_MUTATION_ML_RNN_STRATEGY或者CORPUS_MUTATION_RADAMSA_STRATEGY(ML_RNN的优先级高于RADAMSA),有的话is_mutations_run就为True

is_mutations_run为True,先create_corpus_directory(‘mutations’)创建样本目录,之后生成样本,将使用的策略的名字添加到fuzzing_strategies,最后将new_testcase_mutations_directory添加到additional_corpus_dirs

代码语言:javascript
代码运行次数:0
复制
# Select a generator to attempt to use for existing testcase mutations.
  candidate_generator = engine_common.select_generator(strategy_pool,
                                                       fuzzer_path)
  is_mutations_run = (not environment.is_ephemeral() and
                      candidate_generator != engine_common.Generator.NONE)
					  

	# Generate new testcase mutations using radamsa, etc.
  if is_mutations_run:
    new_testcase_mutations_directory = create_corpus_directory('mutations')
    generator_used = engine_common.generate_new_testcase_mutations(
        corpus_directory, new_testcase_mutations_directory,
        project_qualified_fuzzer_name, candidate_generator)

    # Add the used generator strategy to our fuzzing strategies list.
    if generator_used:
      if candidate_generator == engine_common.Generator.RADAMSA:
        fuzzing_strategies.append(
            strategy.CORPUS_MUTATION_RADAMSA_STRATEGY.name)
      elif candidate_generator == engine_common.Generator.ML_RNN:
        fuzzing_strategies.append(strategy.CORPUS_MUTATION_ML_RNN_STRATEGY.name)

    additional_corpus_dirs.append(new_testcase_mutations_directory)

策略的核心函数是generate_new_testcase_mutations,根据candidate_generator使用generate_new_testcase_mutations_using_radamsa(RADAMSA会随机选择corpus_directory中符合大小的样,循环编译2000次)或者generate_new_testcase_mutations_using_ml_rnn函数去生成新的样本,假如生成的样本的数量比原来多,才会返回true

代码语言:javascript
代码运行次数:0
复制
def generate_new_testcase_mutations(corpus_directory,
                                    new_testcase_mutations_directory,
                                    fuzzer_name, candidate_generator):
  """Generate new testcase mutations, using existing corpus directory or other
  methods.

  Returns true if mutations are successfully generated using radamsa or ml rnn.
  A false return signifies either no generator use or unsuccessful generation of
  testcase mutations."""
  generation_timeout = get_new_testcase_mutations_timeout()
  pre_mutations_filecount = shell.get_directory_file_count(
      new_testcase_mutations_directory)

  # Generate new testcase mutations using Radamsa.
  if candidate_generator == Generator.RADAMSA:
    generate_new_testcase_mutations_using_radamsa(
        corpus_directory, new_testcase_mutations_directory, generation_timeout)
  # Generate new testcase mutations using ML RNN model.
  elif candidate_generator == Generator.ML_RNN:
    generate_new_testcase_mutations_using_ml_rnn(
        corpus_directory, new_testcase_mutations_directory, fuzzer_name,
        generation_timeout)

  # If new mutations are successfully generated, return true.
  if shell.get_directory_file_count(
      new_testcase_mutations_directory) > pre_mutations_filecount:
    return True

  return False
RANDOM_MAX_LENGTH_STRATEGY

这个就是最大长度策略,首先判断是否已经存在-max_len=参数了,存在就不做任何操作

假如不存在,则生成一个0到10000范围内的数,作为-max_len的值

代码语言:javascript
代码运行次数:0
复制
if strategy_pool.do_strategy(strategy.RANDOM_MAX_LENGTH_STRATEGY):
    max_len_argument = fuzzer_utils.extract_argument(
        existing_arguments, constants.MAX_LEN_FLAG, remove=False)
    if not max_len_argument:
      max_length = random.SystemRandom().randint(1, MAX_VALUE_FOR_MAX_LENGTH)
      arguments.append('%s%d' % (constants.MAX_LEN_FLAG, max_length))
      fuzzing_strategies.append(strategy.RANDOM_MAX_LENGTH_STRATEGY.name)
RECOMMENDED_DICTIONARY_STRATEGY

这是推荐字典策略,函数add_recommended_dictionary

代码语言:javascript
代码运行次数:0
复制
if (strategy_pool.do_strategy(strategy.RECOMMENDED_DICTIONARY_STRATEGY) and
      add_recommended_dictionary(arguments, project_qualified_fuzzer_name,
                                 fuzzer_path)):
    fuzzing_strategies.append(strategy.RECOMMENDED_DICTIONARY_STRATEGY.name)

add_recommended_dictionary就是从谷歌云下载recommended_dictionary.dict,假如原来有字典则与原来的字典合并,并使用合并后的字典

代码语言:javascript
代码运行次数:0
复制
def add_recommended_dictionary(arguments, fuzzer_name, fuzzer_path):
  """Add recommended dictionary from GCS to existing .dict file or create
  a new one and update the arguments as needed.
  This function modifies |arguments| list in some cases."""
  recommended_dictionary_path = os.path.join(
      fuzzer_utils.get_temp_dir(),
      dictionary_manager.RECOMMENDED_DICTIONARY_FILENAME)

  dict_manager = dictionary_manager.DictionaryManager(fuzzer_name)

  try:
    # Bail out if cannot download recommended dictionary from GCS.
    if not dict_manager.download_recommended_dictionary_from_gcs(
        recommended_dictionary_path):
      return False
  except Exception as ex:
    logs.log_error(
        'Exception downloading recommended dictionary:\n%s.' % str(ex))
    return False

  # Bail out if the downloaded dictionary is empty.
  if not os.path.getsize(recommended_dictionary_path):
    return False

  # Check if there is an existing dictionary file in arguments.
  original_dictionary_path = fuzzer_utils.extract_argument(
      arguments, constants.DICT_FLAG)
  merged_dictionary_path = (
      original_dictionary_path or
      dictionary_manager.get_default_dictionary_path(fuzzer_path))
  merged_dictionary_path += MERGED_DICT_SUFFIX

  dictionary_manager.merge_dictionary_files(original_dictionary_path,
                                            recommended_dictionary_path,
                                            merged_dictionary_path)
  arguments.append(constants.DICT_FLAG + merged_dictionary_path)
  return True
VALUE_PROFILE_STRATEGY

这个简单,就是添加参数-use_value_profile=1,下面是帮助信息,应该是使用特殊的值来指导模糊测试

Experimental. Use value profile to guide fuzzing.

代码语言:javascript
代码运行次数:0
复制
if strategy_pool.do_strategy(strategy.VALUE_PROFILE_STRATEGY):
    arguments.append(constants.VALUE_PROFILE_ARGUMENT)
    fuzzing_strategies.append(strategy.VALUE_PROFILE_STRATEGY.name)
FORK_STRATEGY

这个是fork策略,从MAX_FUZZ_THREADS获取max_fuzz_threads,默认值是1,

-fork=的参数是cpu的核心数除以max_fuzz_threads,最小为1

代码语言:javascript
代码运行次数:0
复制
# Do not use fork mode for DFT-based fuzzing. This is needed in order to
 # collect readable and actionable logs from fuzz targets running with DFT.
 if (not is_fuchsia and not is_android and not is_ephemeral and
     not use_dataflow_tracing and
     strategy_pool.do_strategy(strategy.FORK_STRATEGY)):
   max_fuzz_threads = environment.get_value('MAX_FUZZ_THREADS', 1)
   num_fuzz_processes = max(1, utils.cpu_count() // max_fuzz_threads)
   arguments.append('%s%d' % (constants.FORK_FLAG, num_fuzz_processes))
   fuzzing_strategies.append(
       '%s_%d' % (strategy.FORK_STRATEGY.name, num_fuzz_processes))
MUTATOR_PLUGIN_STRATEGY

这个是use_mutator_plugin函数通过设置extra_env['LD_PRELOAD'] = mutator_plugin_path来生效的

代码语言:javascript
代码运行次数:0
复制
extra_env = {}
if (strategy_pool.do_strategy(strategy.MUTATOR_PLUGIN_STRATEGY) and
    use_mutator_plugin(target_name, extra_env)):
  fuzzing_strategies.append(strategy.MUTATOR_PLUGIN_STRATEGY.name)
PEACH_GRAMMAR_MUTATION_STRATEGY
代码语言:javascript
代码运行次数:0
复制
if (not has_existing_mutator_strategy(fuzzing_strategies) and
      strategy_pool.do_strategy(strategy.PEACH_GRAMMAR_MUTATION_STRATEGY) and
      use_peach_mutator(extra_env, grammar)):
    fuzzing_strategies.append(
        '%s_%s' % (strategy.PEACH_GRAMMAR_MUTATION_STRATEGY.name, grammar))

首先假如fuzzing_strategies已经有以下策略的其中一个,就不执行PEACH_GRAMMAR_MUTATION_STRATEGY策略了

代码语言:javascript
代码运行次数:0
复制
MUTATOR_STRATEGIES = [
    strategy.PEACH_GRAMMAR_MUTATION_STRATEGY.name,
    strategy.MUTATOR_PLUGIN_STRATEGY.name,
    strategy.MUTATOR_PLUGIN_RADAMSA_STRATEGY.name
]

PEACH_GRAMMAR_MUTATION_STRATEGY策略也是通过环境变量来生效的,在use_peach_mutator函数中主要是下面环境变量

代码语言:javascript
代码运行次数:0
复制
# Set title and pit environment variables
  extra_env['PIT_FILENAME'] = pit_path
  extra_env['PIT_TITLE'] = grammar
# Set LD_PRELOAD.
  peach_path = os.path.join(unzipped, 'peach_mutator', 'src', 'peach.so')
  extra_env['LD_PRELOAD'] = peach_path
# Set Python path.
  new_path = [
      os.path.join(unzipped, 'peach_mutator', 'src'),
      os.path.join(unzipped, 'peach_mutator', 'third_party', 'peach'),
  ] + sys.path

  extra_env['PYTHONPATH'] = os.pathsep.join(new_path)
MUTATOR_PLUGIN_RADAMSA_STRATEGY

这里的逻辑也是跟上面一样,MUTATOR_STRATEGIES其中之一已经存在,就不执行策略了

代码语言:javascript
代码运行次数:0
复制
if (not has_existing_mutator_strategy(fuzzing_strategies) and
      strategy_pool.do_strategy(strategy.MUTATOR_PLUGIN_RADAMSA_STRATEGY) and
      use_radamsa_mutator_plugin(extra_env)):
    fuzzing_strategies.append(strategy.MUTATOR_PLUGIN_RADAMSA_STRATEGY.name)

use_radamsa_mutator_plugin函数就是通过环境变量LD_PRELOAD生效的,extra_env['LD_PRELOAD'] = radamsa_path

代码语言:javascript
代码运行次数:0
复制
def use_radamsa_mutator_plugin(extra_env):
  """Decide whether to use Radamsa in process. If yes, add the path to the
  radamsa shared object to LD_PRELOAD in |extra_env| and return True."""
  # Radamsa will only work on LINUX ASAN jobs.
  # TODO(mpherman): Include architecture info in job definition and exclude
  # i386.
  if environment.is_lib() or not is_linux_asan():
    return False

  radamsa_path = os.path.join(environment.get_platform_resources_directory(),
                              'radamsa', 'libradamsa.so')

  logs.log('Using Radamsa mutator plugin : %s' % radamsa_path)
  extra_env['LD_PRELOAD'] = radamsa_path
  return True

libfuzzer.pick_strategies之后

展开参数

代码语言:javascript
代码运行次数:0
复制
arguments.extend(strategy_info.arguments)

解压corpus

代码语言:javascript
代码运行次数:0
复制
# Check for seed corpus and add it into corpus directory.
engine_common.unpack_seed_corpus_if_needed(target_path, corpus_dir)

假如策略里面有CORPUS_SUBSET_STRATEGY,选择一些数量的corpus作为初始的corpus

代码语言:javascript
代码运行次数:0
复制
# Pick a few testcases from our corpus to use as the initial corpus.
subset_size = engine_common.random_choice(
    engine_common.CORPUS_SUBSET_NUM_TESTCASES)

if (not strategy_info.use_dataflow_tracing and
    strategy_pool.do_strategy(strategy.CORPUS_SUBSET_STRATEGY) and
    shell.get_directory_file_count(corpus_dir) > subset_size):
  # Copy |subset_size| testcases into 'subset' directory.
  corpus_subset_dir = self._create_temp_corpus_dir('subset')
  libfuzzer.copy_from_corpus(corpus_subset_dir, corpus_dir, subset_size)
  strategy_info.fuzzing_strategies.append(
      strategy.CORPUS_SUBSET_STRATEGY.name + '_' + str(subset_size))
  strategy_info.additional_corpus_dirs.append(corpus_subset_dir)
else:
  strategy_info.additional_corpus_dirs.append(corpus_dir)

存在字典参数,检查字典文件参数,并检查字典是否存在

如果不存在字典参数,则检查%target_binary_name%.dict是否存在

最后还检查字典的格式并尝试修复,比如缺少双引号

代码语言:javascript
代码运行次数:0
复制
# Check dict argument to make sure that it's valid.
   dict_path = fuzzer_utils.extract_argument(
       arguments, constants.DICT_FLAG, remove=False)
   if dict_path and not os.path.exists(dict_path):
     logs.log_error('Invalid dict %s for %s.' % (dict_path, target_path))
     fuzzer_utils.extract_argument(arguments, constants.DICT_FLAG)
  
# If there's no dict argument, check for %target_binary_name%.dict file.
   dict_path = fuzzer_utils.extract_argument(
       arguments, constants.DICT_FLAG, remove=False)
   if not dict_path:
     dict_path = dictionary_manager.get_default_dictionary_path(target_path)
     if os.path.exists(dict_path):
       arguments.append(constants.DICT_FLAG + dict_path)
	
# If we have a dictionary, correct any items that are not formatted properly
   # (e.g. quote items that are missing them).
   dictionary_manager.correct_if_needed(dict_path)

prepare函数最后调用process_strategies,返回一个stats,哪个策略开没开,或者策略选择的值,就是strategies变量

代码语言:javascript
代码运行次数:0
复制
strategies = stats.process_strategies(
       strategy_info.fuzzing_strategies, name_modifier=lambda x: x)
   return LibFuzzerOptions(
       corpus_dir, arguments, strategies, strategy_info.additional_corpus_dirs,
       strategy_info.extra_env, strategy_info.use_dataflow_tracing,
       strategy_info.is_mutations_run)

fuzz的最大时间

从环境变量获取要fuzz的时长,减去在fuzz中的其他操作的时间,比如合并样本,字典分析等

代码语言:javascript
代码运行次数:0
复制
fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT')
additional_processing_time = engine_impl.fuzz_additional_processing_timeout(
    options)
fuzz_test_timeout -= additional_processing_time
if fuzz_test_timeout <= 0:
  raise FuzzTaskException(
      f'Invalid engine timeout: '
      f'{fuzz_test_timeout} - {additional_processing_time}')

实际fuzz

实际fuzz就是下面这行

代码语言:javascript
代码运行次数:0
复制
result = engine_impl.fuzz(target_path, options, testcase_directory,
                            fuzz_test_timeout)

跟进这个fuzz函数

下面的第一行profiler是性能分析相关的,假如设置了USE_PYTHON_PROFILER,并且不是False,就会启动Google Cloud Profiler

第二行就是通过libfuzzer.get_runner一般正常情况是返回的是LibFuzzerRunner(fuzzer_path)

第三行是设置sanitizer_options,比如exitcode为77

第四行创建一个临时的目录作为corpus的目录,之后地5行跟options.fuzz_corpus_dirs合并变成一个corpus_directories数组

之后调用runner.fuzz,就是实际起fuzz了

fuzz之后就简单概括下: 1、将fuzzer的输出splitlines 2、根据log看看有没有crash,并提取crash的文件的路径 3、如果libfuzzer的返回值非0,但是又没找到crash文件,那么这个应该是启动的时候就崩溃了,这时使用空文件作为crash文件 4、根据log_lines的信息,设置一些stats的值,比如crash_count,slow_unit_count,timeout_count,edges_total等 5、删除一些影响merge和字典分析的参数,比如-fork,-max_len,-runs等 6、给复现crash设置更大的超时时间 7、复制crash文件到主crash目录 8、从log中生成推荐字典 9、返回fuzz的结果

代码语言:javascript
代码运行次数:0
复制
profiler.start_if_needed('libfuzzer_fuzz')
   runner = libfuzzer.get_runner(target_path)
   libfuzzer.set_sanitizer_options(target_path, fuzz_options=options)

   # Directory to place new units.
   new_corpus_dir = self._create_temp_corpus_dir('new')

   corpus_directories = [new_corpus_dir] + options.fuzz_corpus_dirs
   fuzz_result = runner.fuzz(
       corpus_directories,
       fuzz_timeout=max_time,
       additional_args=options.arguments,
       artifact_prefix=reproducers_dir,
       extra_env=options.extra_env)

log_lines = fuzz_result.output.splitlines()
   # Output can be large, so save some memory by removing reference to the
   # original output which is no longer needed.
   fuzz_result.output = None

   # Check if we crashed, and get the crash testcase path.
   crash_testcase_file_path = runner.get_testcase_path(log_lines)

   # If we exited with a non-zero return code with no crash file in output from
   # libFuzzer, this is most likely a startup crash. Use an empty testcase to
   # to store it as a crash.
   if not crash_testcase_file_path and fuzz_result.return_code:
     crash_testcase_file_path = self._create_empty_testcase_file(
         reproducers_dir)

   # Parse stats information based on libFuzzer output.
   parsed_stats = libfuzzer.parse_log_stats(log_lines)

   # Extend parsed stats by additional performance features.
   parsed_stats.update(
       stats.parse_performance_features(log_lines, options.strategies,
                                        options.arguments))

   # Set some initial stat overrides.
   timeout_limit = fuzzer_utils.extract_argument(
       options.arguments, constants.TIMEOUT_FLAG, remove=False)

   expected_duration = runner.get_max_total_time(max_time)
   actual_duration = int(fuzz_result.time_executed)
   fuzzing_time_percent = 100 * actual_duration / float(expected_duration)
   parsed_stats.update({
       'timeout_limit': int(timeout_limit),
       'expected_duration': expected_duration,
       'actual_duration': actual_duration,
       'fuzzing_time_percent': fuzzing_time_percent,
   })

   # Remove fuzzing arguments before merge and dictionary analysis step.
   merge_arguments = options.arguments[:]
   libfuzzer.remove_fuzzing_arguments(merge_arguments, is_merge=True)
   self._merge_new_units(target_path, options.corpus_dir, new_corpus_dir,
                         options.fuzz_corpus_dirs, merge_arguments,
                         parsed_stats)

   fuzz_logs = '\n'.join(log_lines)
   crashes = []
   if crash_testcase_file_path:
     reproduce_arguments = options.arguments[:]
     libfuzzer.remove_fuzzing_arguments(reproduce_arguments)

     # Use higher timeout for reproduction.
     libfuzzer.fix_timeout_argument_for_reproduction(reproduce_arguments)

     # Write the new testcase.
     # Copy crash testcase contents into the main testcase path.
     crashes.append(
         engine.Crash(crash_testcase_file_path, fuzz_logs, reproduce_arguments,
                      actual_duration))

   libfuzzer.analyze_and_update_recommended_dictionary(
       runner, project_qualified_fuzzer_name, log_lines, options.corpus_dir,
       merge_arguments)

   return engine.FuzzResult(fuzz_logs, fuzz_result.command, crashes,
                            parsed_stats, fuzz_result.time_executed)

最后进去runner.fuzz函数看看,首先找到LibFuzzerRunner,发现fuzz函数实际调用的是LibFuzzerCommon.fuzz

代码语言:javascript
代码运行次数:0
复制
class LibFuzzerRunner(new_process.UnicodeProcessRunner, LibFuzzerCommon):
  """libFuzzer runner (when minijail is not used)."""

  def __init__(self, executable_path, default_args=None):
    """Inits the LibFuzzerRunner.

    Args:
      executable_path: Path to the fuzzer executable.
      default_args: Default arguments to always pass to the fuzzer.
    """
    super().__init__(executable_path=executable_path, default_args=default_args)

  def fuzz(self,
           corpus_directories,
           fuzz_timeout,
           artifact_prefix=None,
           additional_args=None,
           extra_env=None):
    """LibFuzzerCommon.fuzz override."""
    additional_args = copy.copy(additional_args)
    if additional_args is None:
      additional_args = []

    return LibFuzzerCommon.fuzz(self, corpus_directories, fuzz_timeout,
                                artifact_prefix, additional_args, extra_env)

找到LibFuzzerCommon.fuzz,里面处理了一下-artifact_prefix ,加上-max_total_time=和-print_final_stats=1,最后再加corpus_directories列表,最后就调用run_and_wait函数了(就是最多等待fuzz_timeout时间就退出,或者libfuzzer自动退出)

代码语言:javascript
代码运行次数:0
复制
def fuzz(self,
           corpus_directories,
           fuzz_timeout,
           artifact_prefix=None,
           additional_args=None,
           extra_env=None):
    """Running fuzzing command.

    Args:
      corpus_directories: List of corpus directory paths to be passed to
          libFuzzer.
      fuzz_timeout: The maximum time in seconds that libFuzzer is allowed to run
          for.
      artifact_prefix: The directory to store new fuzzing artifacts (crashes,
          timeouts, slow units)
      additional_args: A sequence of additional arguments to be passed to the
          executable.
      extra_env: A dictionary containing environment variables and their values.
          These will be added to the environment of the new process.

    Returns:
      A process.ProcessResult.
    """
    additional_args = copy.copy(additional_args)
    if additional_args is None:
      additional_args = []

    max_total_time = self.get_max_total_time(fuzz_timeout)
    if any(arg.startswith(constants.FORK_FLAG) for arg in additional_args):
      max_total_time -= self.LIBFUZZER_FORK_MODE_CLEAN_EXIT_TIME
    assert max_total_time > 0

    # Old libFuzzer jobs specify -artifact_prefix through additional_args
    if artifact_prefix:
      additional_args.append(
          '%s%s' % (constants.ARTIFACT_PREFIX_FLAG,
                    self._normalize_artifact_prefix(artifact_prefix)))

    additional_args.extend([
        '%s%d' % (constants.MAX_TOTAL_TIME_FLAG, max_total_time),
        constants.PRINT_FINAL_STATS_ARGUMENT,
        # FIXME: temporarily disabled due to a lack of crash information in
        # output.
        # '-close_fd_mask=3',
    ])

    additional_args.extend(corpus_directories)
    return self.run_and_wait(
        additional_args=additional_args,
        timeout=fuzz_timeout - self.SIGTERM_WAIT_TIME,
        terminate_before_kill=True,
        terminate_wait_time=self.SIGTERM_WAIT_TIME,
        max_stdout_len=MAX_OUTPUT_LEN,
        extra_env=extra_env)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-01-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 回顾与总览
  • libfuzzer
    • prepare
      • generate_weighted_strategy_pool
      • libfuzzer.pick_strategies
      • libfuzzer.pick_strategies之后
    • fuzz的最大时间
    • 实际fuzz
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档