205 lines
8.3 KiB
Python
205 lines
8.3 KiB
Python
from lldbsuite.test.decorators import *
|
|
from lldbsuite.test.lldbtest import *
|
|
|
|
from fork_testbase import GdbRemoteForkTestBase
|
|
|
|
|
|
class TestGdbRemoteForkNonStop(GdbRemoteForkTestBase):
|
|
def setUp(self):
|
|
GdbRemoteForkTestBase.setUp(self)
|
|
if self.getPlatform() == "linux" and self.getArchitecture() in ['arm', 'aarch64']:
|
|
self.skipTest("Unsupported for Arm/AArch64 Linux")
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vfork_nonstop(self):
|
|
parent_pid, parent_tid = self.fork_and_detach_test("vfork",
|
|
nonstop=True)
|
|
|
|
# resume the parent
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $c#00",
|
|
"send packet: $OK#00",
|
|
{"direction": "send",
|
|
"regex": r"%Stop:T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*"
|
|
.format(parent_pid, parent_tid),
|
|
},
|
|
"read packet: $vStopped#00",
|
|
"send packet: $OK#00",
|
|
"read packet: $c#00",
|
|
"send packet: $OK#00",
|
|
"send packet: %Stop:W00;process:{}#00".format(parent_pid),
|
|
"read packet: $vStopped#00",
|
|
"send packet: $OK#00",
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_fork_nonstop(self):
|
|
parent_pid, _ = self.fork_and_detach_test("fork", nonstop=True)
|
|
|
|
# resume the parent
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $c#00",
|
|
"send packet: $OK#00",
|
|
"send packet: %Stop:W00;process:{}#00".format(parent_pid),
|
|
"read packet: $vStopped#00",
|
|
"send packet: $OK#00",
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_fork_follow_nonstop(self):
|
|
self.fork_and_follow_test("fork", nonstop=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vfork_follow_nonstop(self):
|
|
self.fork_and_follow_test("vfork", nonstop=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_detach_all_nonstop(self):
|
|
self.detach_all_test(nonstop=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_kill_all_nonstop(self):
|
|
parent_pid, _, child_pid, _ = self.start_fork_test(["fork"],
|
|
nonstop=True)
|
|
|
|
exit_regex = "X09;process:([0-9a-f]+)"
|
|
# Depending on a potential race, the second kill may make it into
|
|
# the async queue before we issue vStopped or after. In the former
|
|
# case, we should expect the exit status in reply to vStopped.
|
|
# In the latter, we should expect an OK response (queue empty),
|
|
# followed by another async notification.
|
|
vstop_regex = "[$](OK|{})#.*".format(exit_regex)
|
|
self.test_sequence.add_log_lines([
|
|
# kill all processes
|
|
"read packet: $k#00",
|
|
"send packet: $OK#00",
|
|
{"direction": "send", "regex": "%Stop:{}#.*".format(exit_regex),
|
|
"capture": {1: "pid1"}},
|
|
"read packet: $vStopped#00",
|
|
{"direction": "send", "regex": vstop_regex,
|
|
"capture": {1: "vstop_reply", 2: "pid2"}},
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
pid1 = ret["pid1"]
|
|
if ret["vstop_reply"] == "OK":
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines([
|
|
{"direction": "send", "regex": "%Stop:{}#.*".format(exit_regex),
|
|
"capture": {1: "pid2"}},
|
|
], True)
|
|
ret = self.expect_gdbremote_sequence()
|
|
pid2 = ret["pid2"]
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $vStopped#00",
|
|
"send packet: $OK#00",
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
self.assertEqual(set([pid1, pid2]), set([parent_pid, child_pid]))
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vkill_both_nonstop(self):
|
|
self.vkill_test(kill_parent=True, kill_child=True, nonstop=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_c_interspersed_nonstop(self):
|
|
self.resume_one_test(run_order=["parent", "child", "parent", "child"],
|
|
nonstop=True)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_interspersed_nonstop(self):
|
|
self.resume_one_test(run_order=["parent", "child", "parent", "child"],
|
|
use_vCont=True, nonstop=True)
|
|
|
|
def get_all_output_via_vStdio(self, output_test):
|
|
# The output may be split into an arbitrary number of messages.
|
|
# Loop until we have everything. The first message is waiting for us
|
|
# in the packet queue.
|
|
output = self._server.get_raw_output_packet()
|
|
while not output_test(output):
|
|
self._server.send_packet(b"vStdio")
|
|
output += self._server.get_raw_output_packet()
|
|
return output
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_c_both_nonstop(self):
|
|
lock1 = self.getBuildArtifact("lock1")
|
|
lock2 = self.getBuildArtifact("lock2")
|
|
parent_pid, parent_tid, child_pid, child_tid = (
|
|
self.start_fork_test(["fork", "process:sync:" + lock1, "print-pid",
|
|
"process:sync:" + lock2, "stop"],
|
|
nonstop=True))
|
|
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $Hcp{}.{}#00".format(parent_pid, parent_tid),
|
|
"send packet: $OK#00",
|
|
"read packet: $c#00",
|
|
"send packet: $OK#00",
|
|
"read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
|
|
"send packet: $OK#00",
|
|
"read packet: $c#00",
|
|
"send packet: $OK#00",
|
|
{"direction": "send", "regex": "%Stop:T.*"},
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
output = self.get_all_output_via_vStdio(
|
|
lambda output: output.count(b"PID: ") >= 2)
|
|
self.assertEqual(output.count(b"PID: "), 2)
|
|
self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
|
|
self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_both_nonstop(self):
|
|
lock1 = self.getBuildArtifact("lock1")
|
|
lock2 = self.getBuildArtifact("lock2")
|
|
parent_pid, parent_tid, child_pid, child_tid = (
|
|
self.start_fork_test(["fork", "process:sync:" + lock1, "print-pid",
|
|
"process:sync:" + lock2, "stop"],
|
|
nonstop=True))
|
|
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
|
|
parent_pid, parent_tid, child_pid, child_tid),
|
|
"send packet: $OK#00",
|
|
{"direction": "send", "regex": "%Stop:T.*"},
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
output = self.get_all_output_via_vStdio(
|
|
lambda output: output.count(b"PID: ") >= 2)
|
|
self.assertEqual(output.count(b"PID: "), 2)
|
|
self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
|
|
self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)
|
|
|
|
def vCont_both_nonstop_test(self, vCont_packet):
|
|
lock1 = self.getBuildArtifact("lock1")
|
|
lock2 = self.getBuildArtifact("lock2")
|
|
parent_pid, parent_tid, child_pid, child_tid = (
|
|
self.start_fork_test(["fork", "process:sync:" + lock1, "print-pid",
|
|
"process:sync:" + lock2, "stop"],
|
|
nonstop=True))
|
|
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: ${}#00".format(vCont_packet),
|
|
"send packet: $OK#00",
|
|
{"direction": "send", "regex": "%Stop:T.*"},
|
|
], True)
|
|
self.expect_gdbremote_sequence()
|
|
|
|
output = self.get_all_output_via_vStdio(
|
|
lambda output: output.count(b"PID: ") >= 2)
|
|
self.assertEqual(output.count(b"PID: "), 2)
|
|
self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
|
|
self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_both_implicit_nonstop(self):
|
|
self.vCont_both_nonstop_test("vCont;c")
|
|
|
|
@add_test_categories(["fork"])
|
|
def test_vCont_both_minus_one_nonstop(self):
|
|
self.vCont_both_nonstop_test("vCont;c:p-1.-1")
|