refactor(pytest): PytestNeotestAdapter (#24)

Having `NeotestResultCollector` (a pytest plugin) as a inner local
class would make the code a bit difficult to read due to quite much
indentation. This commit does refactoring on NeotestResultCollector
to make it a module-level class with a reference to NeotestAdapter.

This refactoring would make easier adding more pytest plugins
(e.g., debugger integration) in the future.

There should be no changes in behaviors.
This commit is contained in:
Jongwook Choi
2022-10-29 08:23:40 -04:00
committed by GitHub
parent 9e2db9375c
commit 2dc9c95fe9
3 changed files with 107 additions and 85 deletions

View File

@@ -10,7 +10,33 @@ if TYPE_CHECKING:
class PytestNeotestAdapter(NeotestAdapter):
def get_short_output(self, config: "Config", report: "TestReport") -> Optional[str]:
def run(
self,
args: List[str],
stream: Callable[[str, NeotestResult], None],
) -> Dict[str, NeotestResult]:
import pytest
result_collector = NeotestResultCollector(self, stream=stream)
pytest.main(args=args, plugins=[result_collector])
return result_collector.results
class NeotestResultCollector:
def __init__(
self,
adapter: PytestNeotestAdapter,
stream: Callable[[str, NeotestResult], None],
):
self.stream = stream
self.adapter = adapter
self.pytest_config: "Config" = None # type: ignore
self.results: Dict[str, NeotestResult] = {}
def _get_short_output(self, config: "Config", report: "TestReport") -> Optional[str]:
from _pytest.terminal import TerminalReporter
buffer = StringIO()
@@ -32,88 +58,75 @@ class PytestNeotestAdapter(NeotestAdapter):
buffer.seek(0)
return buffer.read()
def run(
self, args: List[str], stream: Callable[[str, NeotestResult], None]
) -> Dict[str, NeotestResult]:
results: Dict[str, NeotestResult] = {}
pytest_config: "Config"
from _pytest._code.code import ExceptionChainRepr
def pytest_deselected(self, items: List["pytest.Item"]):
for report in items:
file_path, *name_path = report.nodeid.split("::")
abs_path = str(Path(self.pytest_config.rootdir, file_path))
test_name, *namespaces = reversed(name_path)
valid_test_name, *params = test_name.split("[") # ]
pos_id = "::".join([abs_path, *namespaces, valid_test_name])
result = self.adapter.update_result(
self.results.get(pos_id),
{
"short": None,
"status": NeotestResultStatus.SKIPPED,
"errors": [],
},
)
if not params:
self.stream(pos_id, result)
self.results[pos_id] = result
class NeotestResultCollector:
@staticmethod
def pytest_deselected(items: List):
for report in items:
file_path, *name_path = report.nodeid.split("::")
abs_path = str(Path(pytest_config.rootdir, file_path))
test_name, *namespaces = reversed(name_path)
valid_test_name, *params = test_name.split("[") # ]
pos_id = "::".join([abs_path, *namespaces, valid_test_name])
result = self.update_result(
results.get(pos_id),
{
"short": None,
"status": NeotestResultStatus.SKIPPED,
"errors": [],
},
)
if not params:
stream(pos_id, result)
results[pos_id] = result
def pytest_cmdline_main(self, config: "Config"):
self.pytest_config = config
@staticmethod
def pytest_cmdline_main(config: "Config"):
nonlocal pytest_config
pytest_config = config
def pytest_runtest_logreport(self, report: "TestReport"):
if report.when != "call" and not (
report.outcome == "skipped" and report.when == "setup"
):
return
@staticmethod
def pytest_runtest_logreport(report: "TestReport"):
if report.when != "call" and not (
report.outcome == "skipped" and report.when == "setup"
):
return
file_path, *name_path = report.nodeid.split("::")
abs_path = str(Path(pytest_config.rootdir, file_path))
test_name, *namespaces = reversed(name_path)
valid_test_name, *params = test_name.split("[") # ]
pos_id = "::".join([abs_path, *namespaces, valid_test_name])
file_path, *name_path = report.nodeid.split("::")
abs_path = str(Path(self.pytest_config.rootdir, file_path))
test_name, *namespaces = reversed(name_path)
valid_test_name, *params = test_name.split("[") # ]
pos_id = "::".join([abs_path, *namespaces, valid_test_name])
errors: List[NeotestError] = []
short = self.get_short_output(pytest_config, report)
if report.outcome == "failed":
exc_repr = report.longrepr
# Test fails due to condition outside of test e.g. xfail
if isinstance(exc_repr, str):
errors.append({"message": exc_repr, "line": None})
# Test failed internally
elif isinstance(exc_repr, ExceptionChainRepr):
reprtraceback = exc_repr.reprtraceback
error_message = exc_repr.reprcrash.message # type: ignore
error_line = None
for repr in reversed(reprtraceback.reprentries):
if (
hasattr(repr, "reprfileloc")
and repr.reprfileloc.path == file_path
):
error_line = repr.reprfileloc.lineno - 1
errors.append({"message": error_message, "line": error_line})
else:
# TODO: Figure out how these are returned and how to represent
raise Exception(
"Unhandled error type, please report to neotest-python repo"
)
result = self.update_result(
results.get(pos_id),
{
"short": short,
"status": NeotestResultStatus(report.outcome),
"errors": errors,
},
errors: List[NeotestError] = []
short = self._get_short_output(self.pytest_config, report)
if report.outcome == "failed":
from _pytest._code.code import ExceptionChainRepr
exc_repr = report.longrepr
# Test fails due to condition outside of test e.g. xfail
if isinstance(exc_repr, str):
errors.append({"message": exc_repr, "line": None})
# Test failed internally
elif isinstance(exc_repr, ExceptionChainRepr):
reprtraceback = exc_repr.reprtraceback
error_message = exc_repr.reprcrash.message # type: ignore
error_line = None
for repr in reversed(reprtraceback.reprentries):
if (
hasattr(repr, "reprfileloc")
and repr.reprfileloc.path == file_path
):
error_line = repr.reprfileloc.lineno - 1
errors.append({"message": error_message, "line": error_line})
else:
# TODO: Figure out how these are returned and how to represent
raise Exception(
"Unhandled error type, please report to neotest-python repo"
)
if not params:
stream(pos_id, result)
results[pos_id] = result
import pytest
pytest.main(args=args, plugins=[NeotestResultCollector])
return results
result: NeotestResult = self.adapter.update_result(
self.results.get(pos_id),
{
"short": short,
"status": NeotestResultStatus(report.outcome),
"errors": errors,
},
)
if not params:
self.stream(pos_id, result)
self.results[pos_id] = result