* include type of unhandled pytest exc_repr in error message This makes triage of such issues a little simpler. * pytest: expect the more general ExceptionRepr class This is the (abstract) supertype of the currently-used ExceptionChainRepr, and defines all of the attributes currently used by the code. (This changes results in sensible output for `ReprExceptionInfo` instances, which is what my pytest invocations were generating.) * pytest: use pytest_runtest_makereport for consistent exception handling The `report` passed to `pytest_runtest_logreport` has a different internal exception representation depending on the `--tb` option with which `pytest` is configured: some of these representations do not include the traceback frames to allow us to calculate line numbers. `pytest_runtest_makereport`, however, has access to the original `ExceptionInfo` object when an exception is raised: this commit switches to using a `pytest_runtest_makereport` hookwrapper, so we can access the pytest-generated report as before, but get exception handling independent of `--tb` setting. Fixes: #28
184 lines
6.6 KiB
Python
184 lines
6.6 KiB
Python
from io import StringIO
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Union
|
|
|
|
from .base import NeotestAdapter, NeotestError, NeotestResult, NeotestResultStatus
|
|
|
|
import pytest
|
|
|
|
if TYPE_CHECKING:
|
|
from pytest import CallInfo, Item
|
|
from _pytest.config import Config
|
|
from _pytest.reports import TestReport
|
|
|
|
|
|
class PytestNeotestAdapter(NeotestAdapter):
|
|
def run(
|
|
self,
|
|
args: List[str],
|
|
stream: Callable[[str, NeotestResult], None],
|
|
) -> Dict[str, NeotestResult]:
|
|
import pytest
|
|
|
|
result_collector = NeotestResultCollector(self, stream=stream)
|
|
pytest.main(args=args, plugins=[
|
|
result_collector,
|
|
NeotestDebugpyPlugin(),
|
|
])
|
|
return result_collector.results
|
|
|
|
|
|
class NeotestResultCollector:
|
|
def __init__(
|
|
self,
|
|
adapter: PytestNeotestAdapter,
|
|
stream: Callable[[str, NeotestResult], None],
|
|
):
|
|
self.stream = stream
|
|
self.adapter = adapter
|
|
|
|
self.pytest_config: "Config" = None # type: ignore
|
|
self.results: Dict[str, NeotestResult] = {}
|
|
|
|
def _get_short_output(
|
|
self, config: "Config", report: "TestReport"
|
|
) -> Optional[str]:
|
|
from _pytest.terminal import TerminalReporter
|
|
|
|
buffer = StringIO()
|
|
# Hack to get pytest to write ANSI codes
|
|
setattr(buffer, "isatty", lambda: True)
|
|
reporter = TerminalReporter(config, buffer)
|
|
|
|
# Taked from `_pytest.terminal.TerminalReporter
|
|
msg = reporter._getfailureheadline(report)
|
|
if report.outcome == NeotestResultStatus.FAILED:
|
|
reporter.write_sep("_", msg, red=True, bold=True)
|
|
elif report.outcome == NeotestResultStatus.SKIPPED:
|
|
reporter.write_sep("_", msg, cyan=True, bold=True)
|
|
else:
|
|
reporter.write_sep("_", msg, green=True, bold=True)
|
|
reporter._outrep_summary(report)
|
|
reporter.print_teardown_sections(report)
|
|
|
|
buffer.seek(0)
|
|
return buffer.read()
|
|
|
|
def pytest_deselected(self, items: List["pytest.Item"]):
|
|
for report in items:
|
|
file_path, *name_path = report.nodeid.split("::")
|
|
abs_path = str(Path(self.pytest_config.rootdir, file_path))
|
|
*namespaces, test_name = name_path
|
|
valid_test_name, *params = test_name.split("[") # ]
|
|
pos_id = "::".join([abs_path, *(namespaces), valid_test_name])
|
|
result = self.adapter.update_result(
|
|
self.results.get(pos_id),
|
|
{
|
|
"short": None,
|
|
"status": NeotestResultStatus.SKIPPED,
|
|
"errors": [],
|
|
},
|
|
)
|
|
if not params:
|
|
self.stream(pos_id, result)
|
|
self.results[pos_id] = result
|
|
|
|
def pytest_cmdline_main(self, config: "Config"):
|
|
self.pytest_config = config
|
|
|
|
@pytest.hookimpl(hookwrapper=True)
|
|
def pytest_runtest_makereport(self, item: "Item", call: "CallInfo") -> None:
|
|
# pytest generates the report.outcome field in its internal
|
|
# pytest_runtest_makereport implementation, so call it first. (We don't
|
|
# implement pytest_runtest_logreport because it doesn't have access to
|
|
# call.excinfo.)
|
|
outcome = yield
|
|
report = outcome.get_result()
|
|
|
|
if report.when != "call" and not (
|
|
report.outcome == "skipped" and report.when == "setup"
|
|
):
|
|
return
|
|
|
|
file_path, *name_path = item.nodeid.split("::")
|
|
abs_path = str(Path(self.pytest_config.rootdir, file_path))
|
|
*namespaces, test_name = name_path
|
|
valid_test_name, *params = test_name.split("[") # ]
|
|
pos_id = "::".join([abs_path, *namespaces, valid_test_name])
|
|
|
|
errors: List[NeotestError] = []
|
|
short = self._get_short_output(self.pytest_config, report)
|
|
|
|
if report.outcome == "failed":
|
|
from _pytest._code.code import ExceptionRepr
|
|
|
|
exc_repr = report.longrepr
|
|
# Test fails due to condition outside of test e.g. xfail
|
|
if isinstance(exc_repr, str):
|
|
errors.append({"message": exc_repr, "line": None})
|
|
# Test failed internally
|
|
elif isinstance(exc_repr, ExceptionRepr):
|
|
error_message = exc_repr.reprcrash.message # type: ignore
|
|
error_line = None
|
|
for traceback_entry in reversed(call.excinfo.traceback):
|
|
if str(traceback_entry.path) == abs_path:
|
|
error_line = traceback_entry.lineno
|
|
errors.append({"message": error_message, "line": error_line})
|
|
else:
|
|
# TODO: Figure out how these are returned and how to represent
|
|
raise Exception(
|
|
f"Unhandled error type ({type(exc_repr)}), please report to"
|
|
" neotest-python repo"
|
|
)
|
|
result: NeotestResult = self.adapter.update_result(
|
|
self.results.get(pos_id),
|
|
{
|
|
"short": short,
|
|
"status": NeotestResultStatus(report.outcome),
|
|
"errors": errors,
|
|
},
|
|
)
|
|
if not params:
|
|
self.stream(pos_id, result)
|
|
self.results[pos_id] = result
|
|
|
|
|
|
class NeotestDebugpyPlugin:
|
|
"""A pytest plugin that would make debugpy stop at thrown exceptions."""
|
|
|
|
def pytest_exception_interact(
|
|
self,
|
|
node: Union['pytest.Item', 'pytest.Collector'],
|
|
call: 'pytest.CallInfo',
|
|
report: Union['pytest.CollectReport', 'pytest.TestReport'],
|
|
):
|
|
# call.excinfo: _pytest._code.ExceptionInfo
|
|
self.maybe_debugpy_postmortem(call.excinfo._excinfo)
|
|
|
|
@staticmethod
|
|
def maybe_debugpy_postmortem(excinfo):
|
|
"""Make the debugpy debugger enter and stop at a raised exception.
|
|
|
|
excinfo: A (type(e), e, e.__traceback__) tuple. See sys.exc_info()
|
|
"""
|
|
# Reference: https://github.com/microsoft/debugpy/issues/723
|
|
import threading
|
|
try:
|
|
import pydevd
|
|
except ImportError:
|
|
return # debugpy or pydevd not available, do nothing
|
|
|
|
py_db = pydevd.get_global_debugger()
|
|
if py_db is None:
|
|
# Do nothing if not running with a DAP debugger,
|
|
# e.g. neotest was invoked with {strategy = dap}
|
|
return
|
|
|
|
thread = threading.current_thread()
|
|
additional_info = py_db.set_additional_thread_info(thread)
|
|
additional_info.is_tracing += 1
|
|
try:
|
|
py_db.stop_on_unhandled_exception(py_db, thread, additional_info, excinfo)
|
|
finally:
|
|
additional_info.is_tracing -= 1
|