554 lines
21 KiB
Python
554 lines
21 KiB
Python
import random
|
|
|
|
from lldbsuite.test.decorators import *
|
|
from lldbsuite.test.lldbtest import *
|
|
|
|
from fork_testbase import GdbRemoteForkTestBase
|
|
|
|
|
|
class TestGdbRemoteFork(GdbRemoteForkTestBase):
|
|
def setUp(self):
|
|
GdbRemoteForkTestBase.setUp(self)
|
|
if self.getPlatform() == "linux" and self.getArchitecture() in ['arm', 'aarch64']:
|
|
self.skipTest("Unsupported for Arm/AArch64 Linux")
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_fork_multithreaded(self):
|
|
_, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"])
|
|
|
|
# detach the forked child
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $D;{}#00".format(child_pid),
|
|
"send packet: $OK#00",
|
|
"read packet: $k#00",
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_fork(self):
|
|
parent_pid, _ = self.fork_and_detach_test("fork")
|
|
|
|
# resume the parent
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $c#00",
|
|
"send packet: $W00;process:{}#00".format(parent_pid),
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vfork(self):
|
|
parent_pid, parent_tid = self.fork_and_detach_test("vfork")
|
|
|
|
# resume the parent
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $c#00",
|
|
{"direction": "send",
|
|
"regex": r"[$]T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*"
|
|
.format(parent_pid, parent_tid),
|
|
},
|
|
"read packet: $c#00",
|
|
"send packet: $W00;process:{}#00".format(parent_pid),
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_fork_follow(self):
|
|
self.fork_and_follow_test("fork")
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vfork_follow(self):
|
|
self.fork_and_follow_test("vfork")
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_select_wrong_pid(self):
|
|
self.build()
|
|
self.prep_debug_monitor_and_inferior()
|
|
self.add_qSupported_packets(["multiprocess+"])
|
|
ret = self.expect_gdbremote_sequence()
|
|
self.assertIn("multiprocess+", ret["qSupported_response"])
|
|
self.reset_test_sequence()
|
|
|
|
# get process pid
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $qC#00",
|
|
{"direction": "send", "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*",
|
|
"capture": {1: "pid", 2: "tid"}},
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
pid, tid = (int(ret[x], 16) for x in ("pid", "tid"))
|
|
self.reset_test_sequence()
|
|
|
|
self.test_sequence.add_log_lines([
|
|
# try switching to correct pid
|
|
"read packet: $Hgp{:x}.{:x}#00".format(pid, tid),
|
|
"send packet: $OK#00",
|
|
"read packet: $Hcp{:x}.{:x}#00".format(pid, tid),
|
|
"send packet: $OK#00",
|
|
# try switching to invalid tid
|
|
"read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1),
|
|
"send packet: $E15#00",
|
|
"read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1),
|
|
"send packet: $E15#00",
|
|
# try switching to invalid pid
|
|
"read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid),
|
|
"send packet: $Eff#00",
|
|
"read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid),
|
|
"send packet: $Eff#00",
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_detach_current(self):
|
|
self.build()
|
|
self.prep_debug_monitor_and_inferior()
|
|
self.add_qSupported_packets(["multiprocess+"])
|
|
ret = self.expect_gdbremote_sequence()
|
|
self.assertIn("multiprocess+", ret["qSupported_response"])
|
|
self.reset_test_sequence()
|
|
|
|
# get process pid
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $qC#00",
|
|
{"direction": "send", "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*",
|
|
"capture": {1: "pid"}},
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
pid = ret["pid"]
|
|
self.reset_test_sequence()
|
|
|
|
# detach the process
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $D;{}#00".format(pid),
|
|
"send packet: $OK#00",
|
|
"read packet: $qC#00",
|
|
"send packet: $E44#00",
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_detach_all(self):
|
|
self.detach_all_test()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_kill_all(self):
|
|
parent_pid, _, child_pid, _ = self.start_fork_test(["fork"])
|
|
|
|
exit_regex = "[$]X09;process:([0-9a-f]+)#.*"
|
|
self.test_sequence.add_log_lines([
|
|
# kill all processes
|
|
"read packet: $k#00",
|
|
{"direction": "send", "regex": exit_regex,
|
|
"capture": {1: "pid1"}},
|
|
{"direction": "send", "regex": exit_regex,
|
|
"capture": {1: "pid2"}},
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
self.assertEqual(set([ret["pid1"], ret["pid2"]]),
|
|
set([parent_pid, child_pid]))
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vkill_child(self):
|
|
self.vkill_test(kill_child=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vkill_parent(self):
|
|
self.vkill_test(kill_parent=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vkill_both(self):
|
|
self.vkill_test(kill_parent=True, kill_child=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_c_parent(self):
|
|
self.resume_one_test(run_order=["parent", "parent"])
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_c_child(self):
|
|
self.resume_one_test(run_order=["child", "child"])
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_c_parent_then_child(self):
|
|
self.resume_one_test(run_order=["parent", "parent", "child", "child"])
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_c_child_then_parent(self):
|
|
self.resume_one_test(run_order=["child", "child", "parent", "parent"])
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_c_interspersed(self):
|
|
self.resume_one_test(run_order=["parent", "child", "parent", "child"])
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_parent(self):
|
|
self.resume_one_test(run_order=["parent", "parent"], use_vCont=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_child(self):
|
|
self.resume_one_test(run_order=["child", "child"], use_vCont=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_parent_then_child(self):
|
|
self.resume_one_test(run_order=["parent", "parent", "child", "child"],
|
|
use_vCont=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_child_then_parent(self):
|
|
self.resume_one_test(run_order=["child", "child", "parent", "parent"],
|
|
use_vCont=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_interspersed(self):
|
|
self.resume_one_test(run_order=["parent", "child", "parent", "child"],
|
|
use_vCont=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_two_processes(self):
|
|
parent_pid, parent_tid, child_pid, child_tid = (
|
|
self.start_fork_test(["fork", "stop"]))
|
|
|
|
self.test_sequence.add_log_lines([
|
|
# try to resume both processes
|
|
"read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
|
|
parent_pid, parent_tid, child_pid, child_tid),
|
|
"send packet: $E03#00",
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_all_processes_explicit(self):
|
|
self.start_fork_test(["fork", "stop"])
|
|
|
|
self.test_sequence.add_log_lines([
|
|
# try to resume all processes implicitly
|
|
"read packet: $vCont;c:p-1.-1#00",
|
|
"send packet: $E03#00",
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_all_processes_implicit(self):
|
|
self.start_fork_test(["fork", "stop"])
|
|
|
|
self.test_sequence.add_log_lines([
|
|
# try to resume all processes implicitly
|
|
"read packet: $vCont;c#00",
|
|
"send packet: $E03#00",
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_threadinfo(self):
|
|
parent_pid, parent_tid, child_pid, child_tid = (
|
|
self.start_fork_test(["fork", "thread:new", "stop"]))
|
|
pidtids = [
|
|
(parent_pid, parent_tid),
|
|
(child_pid, child_tid),
|
|
]
|
|
|
|
self.add_threadinfo_collection_packets()
|
|
ret = self.expect_gdbremote_sequence()
|
|
prev_pidtids = set(self.parse_threadinfo_packets(ret))
|
|
self.assertEqual(prev_pidtids,
|
|
frozenset((int(pid, 16), int(tid, 16))
|
|
for pid, tid in pidtids))
|
|
self.reset_test_sequence()
|
|
|
|
for pidtid in pidtids:
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hcp{}.{}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
"read packet: $c#00",
|
|
{"direction": "send",
|
|
"regex": self.stop_regex.format(*pidtid),
|
|
},
|
|
], True)
|
|
self.add_threadinfo_collection_packets()
|
|
ret = self.expect_gdbremote_sequence()
|
|
self.reset_test_sequence()
|
|
new_pidtids = set(self.parse_threadinfo_packets(ret))
|
|
added_pidtid = new_pidtids - prev_pidtids
|
|
prev_pidtids = new_pidtids
|
|
|
|
# verify that we've got exactly one new thread, and that
|
|
# the PID matches
|
|
self.assertEqual(len(added_pidtid), 1)
|
|
self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16))
|
|
|
|
for pidtid in new_pidtids:
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_memory_read_write(self):
|
|
self.build()
|
|
INITIAL_DATA = "Initial message"
|
|
self.prep_debug_monitor_and_inferior(
|
|
inferior_args=["set-message:{}".format(INITIAL_DATA),
|
|
"get-data-address-hex:g_message",
|
|
"fork",
|
|
"print-message:",
|
|
"stop",
|
|
])
|
|
self.add_qSupported_packets(["multiprocess+",
|
|
"fork-events+"])
|
|
ret = self.expect_gdbremote_sequence()
|
|
self.assertIn("fork-events+", ret["qSupported_response"])
|
|
self.reset_test_sequence()
|
|
|
|
# continue and expect fork
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $c#00",
|
|
{"type": "output_match",
|
|
"regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
|
|
"capture": {1: "addr"}},
|
|
{"direction": "send", "regex": self.fork_regex.format("fork"),
|
|
"capture": self.fork_capture},
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
pidtids = {
|
|
"parent": (ret["parent_pid"], ret["parent_tid"]),
|
|
"child": (ret["child_pid"], ret["child_tid"]),
|
|
}
|
|
addr = ret["addr"]
|
|
self.reset_test_sequence()
|
|
|
|
for name, pidtid in pidtids.items():
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hgp{}.{}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
# read the current memory contents
|
|
"read packet: $m{},{:x}#00".format(addr,
|
|
len(INITIAL_DATA) + 1),
|
|
{"direction": "send",
|
|
"regex": r"^[$](.+)#.*$",
|
|
"capture": {1: "data"}},
|
|
# write a new value
|
|
"read packet: $M{},{:x}:{}#00".format(addr,
|
|
len(name) + 1,
|
|
seven.hexlify(
|
|
name + "\0")),
|
|
"send packet: $OK#00",
|
|
# resume the process and wait for the trap
|
|
"read packet: $Hcp{}.{}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
"read packet: $c#00",
|
|
{"type": "output_match",
|
|
"regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"),
|
|
"capture": {1: "printed_message"}},
|
|
{"direction": "send",
|
|
"regex": self.stop_regex.format(*pidtid),
|
|
},
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
data = seven.unhexlify(ret["data"])
|
|
self.assertEqual(data, INITIAL_DATA + "\0")
|
|
self.assertEqual(ret["printed_message"], name);
|
|
self.reset_test_sequence()
|
|
|
|
# we do the second round separately to make sure that initial data
|
|
# is correctly preserved while writing into the first process
|
|
|
|
for name, pidtid in pidtids.items():
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hgp{}.{}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
# read the current memory contents
|
|
"read packet: $m{},{:x}#00".format(addr,
|
|
len(name) + 1),
|
|
{"direction": "send",
|
|
"regex": r"^[$](.+)#.*$",
|
|
"capture": {1: "data"}},
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(ret.get("data"))
|
|
data = seven.unhexlify(ret.get("data"))
|
|
self.assertEqual(data, name + "\0")
|
|
self.reset_test_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_register_read_write(self):
|
|
parent_pid, parent_tid, child_pid, child_tid = (
|
|
self.start_fork_test(["fork", "thread:new", "stop"]))
|
|
pidtids = [
|
|
(parent_pid, parent_tid),
|
|
(child_pid, child_tid),
|
|
]
|
|
|
|
for pidtid in pidtids:
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hcp{}.{}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
"read packet: $c#00",
|
|
{"direction": "send",
|
|
"regex": self.stop_regex.format(*pidtid),
|
|
},
|
|
], True)
|
|
|
|
self.add_threadinfo_collection_packets()
|
|
ret = self.expect_gdbremote_sequence()
|
|
self.reset_test_sequence()
|
|
|
|
pidtids = set(self.parse_threadinfo_packets(ret))
|
|
self.assertEqual(len(pidtids), 4)
|
|
# first, save register values from all the threads
|
|
thread_regs = {}
|
|
for pidtid in pidtids:
|
|
for regno in range(256):
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
"read packet: $p{:x}#00".format(regno),
|
|
{"direction": "send",
|
|
"regex": r"^[$](.+)#.*$",
|
|
"capture": {1: "data"}},
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
data = ret.get("data")
|
|
self.assertIsNotNone(data)
|
|
# ignore registers shorter than 32 bits (this also catches
|
|
# "Exx" errors)
|
|
if len(data) >= 8:
|
|
break
|
|
else:
|
|
self.skipTest("no usable register found")
|
|
thread_regs[pidtid] = (regno, data)
|
|
|
|
vals = set(x[1] for x in thread_regs.values())
|
|
# NB: cheap hack to make the loop below easier
|
|
new_val = next(iter(vals))
|
|
|
|
# then, start altering them and verify that we don't unexpectedly
|
|
# change the value from another thread
|
|
for pidtid in pidtids:
|
|
old_val = thread_regs[pidtid]
|
|
regno = old_val[0]
|
|
old_val_length = len(old_val[1])
|
|
# generate a unique new_val
|
|
while new_val in vals:
|
|
new_val = ('{{:0{}x}}'.format(old_val_length)
|
|
.format(random.getrandbits(old_val_length*4)))
|
|
vals.add(new_val)
|
|
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
"read packet: $p{:x}#00".format(regno),
|
|
{"direction": "send",
|
|
"regex": r"^[$](.+)#.*$",
|
|
"capture": {1: "data"}},
|
|
"read packet: $P{:x}={}#00".format(regno, new_val),
|
|
"send packet: $OK#00",
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
data = ret.get("data")
|
|
self.assertIsNotNone(data)
|
|
self.assertEqual(data, old_val[1])
|
|
thread_regs[pidtid] = (regno, new_val)
|
|
|
|
# finally, verify that new values took effect
|
|
for pidtid in pidtids:
|
|
old_val = thread_regs[pidtid]
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
"read packet: $p{:x}#00".format(old_val[0]),
|
|
{"direction": "send",
|
|
"regex": r"^[$](.+)#.*$",
|
|
"capture": {1: "data"}},
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
data = ret.get("data")
|
|
self.assertIsNotNone(data)
|
|
self.assertEqual(data, old_val[1])
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_qC(self):
|
|
parent_pid, parent_tid, child_pid, child_tid = (
|
|
self.start_fork_test(["fork", "thread:new", "stop"]))
|
|
pidtids = [
|
|
(parent_pid, parent_tid),
|
|
(child_pid, child_tid),
|
|
]
|
|
|
|
for pidtid in pidtids:
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hcp{}.{}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
"read packet: $c#00",
|
|
{"direction": "send",
|
|
"regex": self.stop_regex.format(*pidtid),
|
|
},
|
|
], True)
|
|
|
|
self.add_threadinfo_collection_packets()
|
|
ret = self.expect_gdbremote_sequence()
|
|
self.reset_test_sequence()
|
|
|
|
pidtids = set(self.parse_threadinfo_packets(ret))
|
|
self.assertEqual(len(pidtids), 4)
|
|
for pidtid in pidtids:
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
"read packet: $qC#00",
|
|
"send packet: $QCp{:x}.{:x}#00".format(*pidtid),
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_T(self):
|
|
parent_pid, parent_tid, child_pid, child_tid = (
|
|
self.start_fork_test(["fork", "thread:new", "stop"]))
|
|
pidtids = [
|
|
(parent_pid, parent_tid),
|
|
(child_pid, child_tid),
|
|
]
|
|
|
|
for pidtid in pidtids:
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Hcp{}.{}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
"read packet: $c#00",
|
|
{"direction": "send",
|
|
"regex": self.stop_regex.format(*pidtid),
|
|
},
|
|
], True)
|
|
|
|
self.add_threadinfo_collection_packets()
|
|
ret = self.expect_gdbremote_sequence()
|
|
self.reset_test_sequence()
|
|
|
|
pidtids = set(self.parse_threadinfo_packets(ret))
|
|
self.assertEqual(len(pidtids), 4)
|
|
max_pid = max(pid for pid, tid in pidtids)
|
|
max_tid = max(tid for pid, tid in pidtids)
|
|
bad_pidtids = (
|
|
(max_pid, max_tid + 1, "E02"),
|
|
(max_pid + 1, max_tid, "E01"),
|
|
(max_pid + 1, max_tid + 1, "E01"),
|
|
)
|
|
|
|
for pidtid in pidtids:
|
|
self.test_sequence.add_log_lines(
|
|
[
|
|
# test explicit PID+TID
|
|
"read packet: $Tp{:x}.{:x}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
# test implicit PID via Hg
|
|
"read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
|
|
"send packet: $OK#00",
|
|
"read packet: $T{:x}#00".format(max_tid + 1),
|
|
"send packet: $E02#00",
|
|
"read packet: $T{:x}#00".format(pidtid[1]),
|
|
"send packet: $OK#00",
|
|
], True)
|
|
for pid, tid, expected in bad_pidtids:
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $Tp{:x}.{:x}#00".format(pid, tid),
|
|
"send packet: ${}#00".format(expected),
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|