feat: init commit

This commit is contained in:
Rónán Carrigan
2022-01-02 23:15:23 +00:00
commit b162ba1b42
12 changed files with 679 additions and 0 deletions

View File

@@ -0,0 +1,40 @@
import argparse
import json
from enum import Enum
from typing import List
class TestRunner(str, Enum):
PYTEST = "pytest"
UNITTEST = "unittest"
def get_adapter(runner: TestRunner):
if runner == TestRunner.PYTEST:
from .pytest import PytestNeotestAdapter
return PytestNeotestAdapter()
elif runner == TestRunner.UNITTEST:
from .unittest import UnittestNeotestAdapter
return UnittestNeotestAdapter()
raise NotImplementedError(runner)
parser = argparse.ArgumentParser()
parser.add_argument("--runner", required=True)
parser.add_argument(
"--results-file",
dest="results_file",
required=True,
help="File to store result JSON in",
)
parser.add_argument("args", nargs="*")
def main(argv: List[str]):
args = parser.parse_args(argv)
adapter = get_adapter(TestRunner(args.runner))
results = adapter.run(args.args)
with open(args.results_file, "w") as results_file:
json.dump(results, results_file)

42
neotest_python/base.py Normal file
View File

@@ -0,0 +1,42 @@
from enum import Enum
from typing import TYPE_CHECKING, Dict, List, Optional
class NeotestResultStatus(str, Enum):
SKIPPED = "skipped"
PASSED = "passed"
FAILED = "failed"
def __gt__(self, other) -> bool:
members = list(self.__class__.__members__.values())
return members.index(self) > members.index(other)
if TYPE_CHECKING:
from typing import TypedDict
class NeotestError(TypedDict):
message: str
line: Optional[int]
class NeotestResult(TypedDict):
short: Optional[str]
status: NeotestResultStatus
errors: Optional[List[NeotestError]]
else:
NeotestError = Dict
NeotestResult = Dict
class NeotestAdapter:
def update_result(
self, base: Optional[NeotestResult], update: NeotestResult
) -> NeotestResult:
if not base:
return update
return {
"status": max(base["status"], update["status"]),
"errors": (base.get("errors") or []) + (update.get("errors") or []) or None,
"short": (base.get("short") or "") + (update.get("short") or ""),
}

94
neotest_python/pytest.py Normal file
View File

@@ -0,0 +1,94 @@
from io import StringIO
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, cast
from .base import NeotestAdapter, NeotestError, NeotestResult, NeotestResultStatus
if TYPE_CHECKING:
from _pytest._code.code import ExceptionChainRepr
from _pytest.config import Config
from _pytest.reports import TestReport
class PytestNeotestAdapter(NeotestAdapter):
def get_short_output(self, config: "Config", report: "TestReport") -> Optional[str]:
from _pytest.terminal import TerminalReporter
buffer = StringIO()
# Hack to get pytest to write ANSI codes
setattr(buffer, "isatty", lambda: True)
reporter = TerminalReporter(config, buffer)
# Taked from `_pytest.terminal.TerminalReporter
msg = reporter._getfailureheadline(report)
if report.outcome == NeotestResultStatus.FAILED:
reporter.write_sep("_", msg, red=True, bold=True)
else:
reporter.write_sep("_", msg, green=True, bold=True)
reporter._outrep_summary(report)
reporter.print_teardown_sections(report)
buffer.seek(0)
return buffer.read()
def run(self, args: List[str]) -> Dict[str, NeotestResult]:
results: Dict[str, NeotestResult] = {}
pytest_config: "Config"
class NeotestResultCollector:
@staticmethod
def pytest_cmdline_main(config: "Config"):
nonlocal pytest_config
pytest_config = config
@staticmethod
def pytest_runtest_logreport(report: "TestReport"):
if report.when != "call" and not (
report.outcome == "skipped" and report.when == "setup"
):
return
file_path, *name_path = report.nodeid.split("::")
abs_path = str(Path(pytest_config.rootpath, file_path))
test_name, *namespaces = reversed(name_path)
valid_test_name, *_ = test_name.split("[") # ]
errors: List[NeotestError] = []
short = self.get_short_output(pytest_config, report)
if report.outcome == "failed":
exc_repr = cast("ExceptionChainRepr", report.longrepr)
exc_repr.toterminal
reprtraceback = exc_repr.reprtraceback
error_message = exc_repr.reprcrash.message # type: ignore
error_line = None
for repr in reversed(reprtraceback.reprentries):
if (
hasattr(repr, "reprfileloc")
and repr.reprfileloc.path == file_path
):
error_line = repr.reprfileloc.lineno - 1
errors.append({"message": error_message, "line": error_line})
pos_id = "::".join([abs_path, *namespaces, valid_test_name])
results[pos_id] = self.update_result(
results.get(pos_id),
{
"short": short,
"status": NeotestResultStatus(report.outcome),
"errors": errors,
},
)
results[abs_path] = self.update_result(
results.get(abs_path),
{
"short": None,
"status": NeotestResultStatus(report.outcome),
"errors": errors,
},
)
import pytest
pytest.main(args=args, plugins=[NeotestResultCollector])
return results
def update_report(self, report: Optional[Dict], update: Dict):
...

105
neotest_python/unittest.py Normal file
View File

@@ -0,0 +1,105 @@
import inspect
import traceback
import unittest
from pathlib import Path
from types import TracebackType
from typing import Any, Dict, Iterator, List, Tuple
from unittest import TestCase, TestResult, TestSuite
from unittest.runner import TextTestResult, TextTestRunner
from .base import NeotestAdapter, NeotestResultStatus
class UnittestNeotestAdapter(NeotestAdapter):
def iter_suite(
self, suite: "TestSuite | TestCase"
) -> Iterator["TestCase | TestSuite"]:
if isinstance(suite, TestSuite):
for sub in suite:
for case in self.iter_suite(sub):
yield case
else:
yield suite
def case_file(self, case) -> str:
return str(Path(inspect.getmodule(case).__file__).absolute()) # type: ignore
def case_id_elems(self, case) -> List[str]:
file = self.case_file(case)
elems = [file, case.__class__.__name__]
if isinstance(case, TestCase):
elems.append(case._testMethodName)
return elems
def case_id(self, case: "TestCase | TestSuite") -> str:
return "::".join(self.case_id_elems(case))
def run(self, args: List[str]) -> Dict:
results = {}
errs: Dict[str, Tuple[Exception, Any, TracebackType]] = {}
class NeotestTextTestResult(TextTestResult):
def addFailure(_, test: TestCase, err) -> None:
errs[self.case_id(test)] = err
return super().addFailure(test, err)
class NeotestUnittestRunner(TextTestRunner):
def run(_, test: "TestSuite | TestCase") -> "TestResult": # type: ignore
for case in self.iter_suite(test):
results[self.case_id(case)] = {
"status": NeotestResultStatus.PASSED,
"short": None,
}
results[self.case_file(case)] = {
"status": NeotestResultStatus.PASSED,
"short": None,
}
result = super().run(test)
for case, message in result.failures:
case_id = self.case_id(case)
error_line = None
case_file = self.case_file(case)
if case_id in errs:
trace = errs[case_id][2]
summary = traceback.extract_tb(trace)
error_line = next(
frame.lineno - 1
for frame in reversed(summary)
if frame.filename == case_file
)
results[case_id] = self.update_result(
results.get(case_id),
{
"status": NeotestResultStatus.FAILED,
"errors": [{"message": message, "line": error_line}],
"short": None,
},
)
results[case_file] = self.update_result(
results.get(case_file),
{
"status": NeotestResultStatus.FAILED,
"errors": [{"message": message, "line": error_line}],
"short": None,
},
)
for case, message in result.skipped:
results[self.case_id(case)] = self.update_result(
results[self.case_id(case)],
{
"short": None,
"status": NeotestResultStatus.SKIPPED,
"errors": None,
},
)
return result
unittest.main(
module=None,
argv=args,
testRunner=NeotestUnittestRunner(resultclass=NeotestTextTestResult),
exit=False,
)
return results