123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- This takes a crashing qtest trace and tries to remove superfluous operations
- """
- import sys
- import os
- import subprocess
- import time
- import struct
- QEMU_ARGS = None
- QEMU_PATH = None
- TIMEOUT = 5
- CRASH_TOKEN = None
- # Minimization levels
- M1 = False # try removing IO commands iteratively
- M2 = False # try setting bits in operand of write/out to zero
- write_suffix_lookup = {"b": (1, "B"),
- "w": (2, "H"),
- "l": (4, "L"),
- "q": (8, "Q")}
- def usage():
- sys.exit("""\
- Usage:
- QEMU_PATH="/path/to/qemu" QEMU_ARGS="args" {} [Options] input_trace output_trace
- By default, will try to use the second-to-last line in the output to identify
- whether the crash occred. Optionally, manually set a string that idenitifes the
- crash by setting CRASH_TOKEN=
- Options:
- -M1: enable a loop around the remove minimizer, which may help decrease some
- timing dependent instructions. Off by default.
- -M2: try setting bits in operand of write/out to zero. Off by default.
- """.format((sys.argv[0])))
- deduplication_note = """\n\
- Note: While trimming the input, sometimes the mutated trace triggers a different
- type crash but indicates the same bug. Under this situation, our minimizer is
- incapable of recognizing and stopped from removing it. In the future, we may
- use a more sophisticated crash case deduplication method.
- \n"""
- def check_if_trace_crashes(trace, path):
- with open(path, "w") as tracefile:
- tracefile.write("".join(trace))
- rc = subprocess.Popen("timeout -s 9 {timeout}s {qemu_path} {qemu_args} 2>&1\
- < {trace_path}".format(timeout=TIMEOUT,
- qemu_path=QEMU_PATH,
- qemu_args=QEMU_ARGS,
- trace_path=path),
- shell=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- encoding="utf-8")
- global CRASH_TOKEN
- if CRASH_TOKEN is None:
- try:
- outs, _ = rc.communicate(timeout=5)
- CRASH_TOKEN = " ".join(outs.splitlines()[-2].split()[0:3])
- except subprocess.TimeoutExpired:
- print("subprocess.TimeoutExpired")
- return False
- print("Identifying Crashes by this string: {}".format(CRASH_TOKEN))
- global deduplication_note
- print(deduplication_note)
- return True
- for line in iter(rc.stdout.readline, ""):
- if "CLOSED" in line:
- return False
- if CRASH_TOKEN in line:
- return True
- print("\nWarning:")
- print(" There is no 'CLOSED'or CRASH_TOKEN in the stdout of subprocess.")
- print(" Usually this indicates a different type of crash.\n")
- return False
- # If previous write commands write the same length of data at the same
- # interval, we view it as a hint.
- def split_write_hint(newtrace, i):
- HINT_LEN = 3 # > 2
- if i <=(HINT_LEN-1):
- return None
- #find previous continuous write traces
- k = 0
- l = i-1
- writes = []
- while (k != HINT_LEN and l >= 0):
- if newtrace[l].startswith("write "):
- writes.append(newtrace[l])
- k += 1
- l -= 1
- elif newtrace[l] == "":
- l -= 1
- else:
- return None
- if k != HINT_LEN:
- return None
- length = int(writes[0].split()[2], 16)
- for j in range(1, HINT_LEN):
- if length != int(writes[j].split()[2], 16):
- return None
- step = int(writes[0].split()[1], 16) - int(writes[1].split()[1], 16)
- for j in range(1, HINT_LEN-1):
- if step != int(writes[j].split()[1], 16) - \
- int(writes[j+1].split()[1], 16):
- return None
- return (int(writes[0].split()[1], 16)+step, length)
- def remove_lines(newtrace, outpath):
- remove_step = 1
- i = 0
- while i < len(newtrace):
- # 1.) Try to remove lines completely and reproduce the crash.
- # If it works, we're done.
- if (i+remove_step) >= len(newtrace):
- remove_step = 1
- prior = newtrace[i:i+remove_step]
- for j in range(i, i+remove_step):
- newtrace[j] = ""
- print("Removing {lines} ...\n".format(lines=prior))
- if check_if_trace_crashes(newtrace, outpath):
- i += remove_step
- # Double the number of lines to remove for next round
- remove_step *= 2
- continue
- # Failed to remove multiple IOs, fast recovery
- if remove_step > 1:
- for j in range(i, i+remove_step):
- newtrace[j] = prior[j-i]
- remove_step = 1
- continue
- newtrace[i] = prior[0] # remove_step = 1
- # 2.) Try to replace write{bwlq} commands with a write addr, len
- # command. Since this can require swapping endianness, try both LE and
- # BE options. We do this, so we can "trim" the writes in (3)
- if (newtrace[i].startswith("write") and not
- newtrace[i].startswith("write ")):
- suffix = newtrace[i].split()[0][-1]
- assert(suffix in write_suffix_lookup)
- addr = int(newtrace[i].split()[1], 16)
- value = int(newtrace[i].split()[2], 16)
- for endianness in ['<', '>']:
- data = struct.pack("{end}{size}".format(end=endianness,
- size=write_suffix_lookup[suffix][1]),
- value)
- newtrace[i] = "write {addr} {size} 0x{data}\n".format(
- addr=hex(addr),
- size=hex(write_suffix_lookup[suffix][0]),
- data=data.hex())
- if(check_if_trace_crashes(newtrace, outpath)):
- break
- else:
- newtrace[i] = prior[0]
- # 3.) If it is a qtest write command: write addr len data, try to split
- # it into two separate write commands. If splitting the data operand
- # from length/2^n bytes to the left does not work, try to move the pivot
- # to the right side, then add one to n, until length/2^n == 0. The idea
- # is to prune unnecessary bytes from long writes, while accommodating
- # arbitrary MemoryRegion access sizes and alignments.
- # This algorithm will fail under some rare situations.
- # e.g., xxxxxxxxxuxxxxxx (u is the unnecessary byte)
- if newtrace[i].startswith("write "):
- addr = int(newtrace[i].split()[1], 16)
- length = int(newtrace[i].split()[2], 16)
- data = newtrace[i].split()[3][2:]
- if length > 1:
- # Can we get a hint from previous writes?
- hint = split_write_hint(newtrace, i)
- if hint is not None:
- hint_addr = hint[0]
- hint_len = hint[1]
- if hint_addr >= addr and hint_addr+hint_len <= addr+length:
- newtrace[i] = "write {addr} {size} 0x{data}\n".format(
- addr=hex(hint_addr),
- size=hex(hint_len),
- data=data[(hint_addr-addr)*2:\
- (hint_addr-addr)*2+hint_len*2])
- if check_if_trace_crashes(newtrace, outpath):
- # next round
- i += 1
- continue
- newtrace[i] = prior[0]
- # Try splitting it using a binary approach
- leftlength = int(length/2)
- rightlength = length - leftlength
- newtrace.insert(i+1, "")
- power = 1
- while leftlength > 0:
- newtrace[i] = "write {addr} {size} 0x{data}\n".format(
- addr=hex(addr),
- size=hex(leftlength),
- data=data[:leftlength*2])
- newtrace[i+1] = "write {addr} {size} 0x{data}\n".format(
- addr=hex(addr+leftlength),
- size=hex(rightlength),
- data=data[leftlength*2:])
- if check_if_trace_crashes(newtrace, outpath):
- break
- # move the pivot to right side
- if leftlength < rightlength:
- rightlength, leftlength = leftlength, rightlength
- continue
- power += 1
- leftlength = int(length/pow(2, power))
- rightlength = length - leftlength
- if check_if_trace_crashes(newtrace, outpath):
- i -= 1
- else:
- newtrace[i] = prior[0]
- del newtrace[i+1]
- i += 1
- def clear_bits(newtrace, outpath):
- # try setting bits in operands of out/write to zero
- i = 0
- while i < len(newtrace):
- if (not newtrace[i].startswith("write ") and not
- newtrace[i].startswith("out")):
- i += 1
- continue
- # write ADDR SIZE DATA
- # outx ADDR VALUE
- print("\nzero setting bits: {}".format(newtrace[i]))
- prefix = " ".join(newtrace[i].split()[:-1])
- data = newtrace[i].split()[-1]
- data_bin = bin(int(data, 16))
- data_bin_list = list(data_bin)
- for j in range(2, len(data_bin_list)):
- prior = newtrace[i]
- if (data_bin_list[j] == '1'):
- data_bin_list[j] = '0'
- data_try = hex(int("".join(data_bin_list), 2))
- # It seems qtest only accepts padded hex-values.
- if len(data_try) % 2 == 1:
- data_try = data_try[:2] + "0" + data_try[2:]
- newtrace[i] = "{prefix} {data_try}\n".format(
- prefix=prefix,
- data_try=data_try)
- if not check_if_trace_crashes(newtrace, outpath):
- data_bin_list[j] = '1'
- newtrace[i] = prior
- i += 1
- def minimize_trace(inpath, outpath):
- global TIMEOUT
- with open(inpath) as f:
- trace = f.readlines()
- start = time.time()
- if not check_if_trace_crashes(trace, outpath):
- sys.exit("The input qtest trace didn't cause a crash...")
- end = time.time()
- print("Crashed in {} seconds".format(end-start))
- TIMEOUT = (end-start)*5
- print("Setting the timeout for {} seconds".format(TIMEOUT))
- newtrace = trace[:]
- global M1, M2
- # remove lines
- old_len = len(newtrace) + 1
- while(old_len > len(newtrace)):
- old_len = len(newtrace)
- print("trace length = ", old_len)
- remove_lines(newtrace, outpath)
- if not M1 and not M2:
- break
- newtrace = list(filter(lambda s: s != "", newtrace))
- assert(check_if_trace_crashes(newtrace, outpath))
- # set bits to zero
- if M2:
- clear_bits(newtrace, outpath)
- assert(check_if_trace_crashes(newtrace, outpath))
- if __name__ == '__main__':
- if len(sys.argv) < 3:
- usage()
- if "-M1" in sys.argv:
- M1 = True
- if "-M2" in sys.argv:
- M2 = True
- QEMU_PATH = os.getenv("QEMU_PATH")
- QEMU_ARGS = os.getenv("QEMU_ARGS")
- if QEMU_PATH is None or QEMU_ARGS is None:
- usage()
- # if "accel" not in QEMU_ARGS:
- # QEMU_ARGS += " -accel qtest"
- CRASH_TOKEN = os.getenv("CRASH_TOKEN")
- QEMU_ARGS += " -qtest stdio -monitor none -serial none "
- minimize_trace(sys.argv[-2], sys.argv[-1])
|