Compare commits

...

10 Commits

Author SHA1 Message Date
Itai Bohadana
ff75e11bca feat(pytest): use socket instead of a shitton of processes 2025-08-26 15:48:59 +03:00
nicos68
ed9b4d794b feat(pytest): use python_functions config option (#100) 2025-06-23 22:28:18 +01:00
EdVanDance
34c9f6f3dc fix: python detection on windows (#92)
- virtualenvs on windows use `Scripts` instead of `bin`
  993ba1316a
- line endings of course
2025-05-28 16:18:54 +01:00
Maksym Bieńkowski
61e878f4b3 fix(django): make the test runner non-interactive by default (#95) 2025-05-28 16:17:40 +01:00
Illia Denysenko
ae21072712 feat: uv python detection (#98) (#98) 2025-05-28 16:16:29 +01:00
Neal Joslin
a2861ab3c9 fix(django): duplicate --failfast check (#84) 2024-10-02 20:26:48 +01:00
Liam Jarvis
72603dfdba fix: remove global print function call (#79)
This commit removes a call to an unknown print function call
2024-08-19 10:40:46 +01:00
Rónán Carrigan
e5bff6dcf3 fix(pytest): handle failed setup
Previously we would only handle skipped setups, we need to handle failed
setups in cases of fixture lookup errors.

See #37
2024-08-14 16:21:33 +01:00
Rónán Carrigan
0ab9ad3570 feat: multiple adapter instances
Allows calling the module multiple times with different configs for use
in different in projects.
2024-08-14 14:31:43 +01:00
johnybx
2e83d2bc00 fix: - use --verbosity=0 when collecting pytest parametrized tests (#60) 2024-01-15 08:03:46 +00:00
10 changed files with 627 additions and 287 deletions

View File

@@ -0,0 +1,139 @@
local nio = require("nio")
local lib = require("neotest.lib")
local pytest = require("neotest-python.pytest")
local base = require("neotest-python.base")
---@class neotest-python._AdapterConfig
---@field dap_args? table
---@field pytest_discovery? boolean
---@field is_test_file fun(file_path: string):boolean
---@field get_python_command fun(root: string):string[]
---@field get_args fun(runner: string, position: neotest.Position, strategy: string): string[]
---@field get_runner fun(python_command: string[]): string
---@param config neotest-python._AdapterConfig
---@return neotest.Adapter
return function(config)
---@param run_args neotest.RunArgs
---@param results_path string
---@param stream_path string
---@param runner string
---@return string[]
local function build_script_args(run_args, results_path, stream_path, runner)
local script_args = {
"--results-file",
results_path,
"--stream-file",
stream_path,
"--runner",
runner,
}
if config.pytest_discovery then
table.insert(script_args, "--emit-parameterized-ids")
end
local position = run_args.tree:data()
table.insert(script_args, "--")
vim.list_extend(script_args, config.get_args(runner, position, run_args.strategy))
if run_args.extra_args then
vim.list_extend(script_args, run_args.extra_args)
end
if position then
table.insert(script_args, position.id)
end
return script_args
end
---@type neotest.Adapter
return {
name = "neotest-python",
root = base.get_root,
filter_dir = function(name)
return name ~= "venv"
end,
is_test_file = config.is_test_file,
discover_positions = function(path)
local root = base.get_root(path) or vim.loop.cwd() or ""
local python_command = config.get_python_command(root)
local runner = config.get_runner(python_command)
local positions = lib.treesitter.parse_positions(path, base.treesitter_queries(runner, config, python_command), {
require_namespaces = runner == "unittest",
})
if runner == "pytest" and config.pytest_discovery then
pytest.augment_positions(python_command, base.get_script_path(), path, positions, root)
end
return positions
end,
---@param args neotest.RunArgs
---@return neotest.RunSpec
build_spec = function(args)
local position = args.tree:data()
local root = base.get_root(position.path) or vim.loop.cwd() or ""
local python_command = config.get_python_command(root)
local runner = config.get_runner(python_command)
local results_path = nio.fn.tempname()
local stream_path = nio.fn.tempname()
lib.files.write(stream_path, "")
local stream_data, stop_stream = lib.files.stream_lines(stream_path)
local script_args = build_script_args(args, results_path, stream_path, runner)
local script_path = base.get_script_path()
local strategy_config
if args.strategy == "dap" then
strategy_config =
base.create_dap_config(python_command, script_path, script_args, config.dap_args)
end
---@type neotest.RunSpec
return {
command = vim.iter({ python_command, script_path, script_args }):flatten():totable(),
context = {
results_path = results_path,
stop_stream = stop_stream,
},
stream = function()
return function()
local lines = stream_data()
local results = {}
for _, line in ipairs(lines) do
local result = vim.json.decode(line, { luanil = { object = true } })
results[result.id] = result.result
end
return results
end
end,
strategy = strategy_config,
}
end,
---@param spec neotest.RunSpec
---@param result neotest.StrategyResult
---@return neotest.Result[]
results = function(spec, result)
spec.context.stop_stream()
local success, data = pcall(lib.files.read, spec.context.results_path)
if not success then
data = "{}"
end
local results = vim.json.decode(data, { luanil = { object = true } })
for _, pos_result in pairs(results) do
result.output_path = pos_result.output_path
end
return results
end,
}
end

View File

@@ -1,4 +1,4 @@
local async = require("neotest.async") local nio = require("nio")
local lib = require("neotest.lib") local lib = require("neotest.lib")
local Path = require("plenary.path") local Path = require("plenary.path")
@@ -14,30 +14,35 @@ function M.is_test_file(file_path)
end end
M.module_exists = function(module, python_command) M.module_exists = function(module, python_command)
return lib.process.run(vim.tbl_flatten({ return lib.process.run(vim
python_command, .iter({
"-c", python_command,
"import " .. module, "-c",
})) == 0 "import " .. module,
})
:flatten()
:totable()) == 0
end end
local python_command_mem = {} local python_command_mem = {}
local venv_bin = vim.loop.os_uname().sysname:match("Windows") and "Scripts" or "bin"
---@return string[] ---@return string[]
function M.get_python_command(root) function M.get_python_command(root)
root = root or vim.loop.cwd()
if python_command_mem[root] then if python_command_mem[root] then
return python_command_mem[root] return python_command_mem[root]
end end
-- Use activated virtualenv. -- Use activated virtualenv.
if vim.env.VIRTUAL_ENV then if vim.env.VIRTUAL_ENV then
python_command_mem[root] = { Path:new(vim.env.VIRTUAL_ENV, "bin", "python").filename } python_command_mem[root] = { Path:new(vim.env.VIRTUAL_ENV, venv_bin, "python").filename }
return python_command_mem[root] return python_command_mem[root]
end end
for _, pattern in ipairs({ "*", ".*" }) do for _, pattern in ipairs({ "*", ".*" }) do
local match = async.fn.glob(Path:new(root or async.fn.getcwd(), pattern, "pyvenv.cfg").filename) local match = nio.fn.glob(Path:new(root or nio.fn.getcwd(), pattern, "pyvenv.cfg").filename)
if match ~= "" then if match ~= "" then
python_command_mem[root] = { (Path:new(match):parent() / "bin" / "python").filename } python_command_mem[root] = { (Path:new(match):parent() / venv_bin / "python").filename }
return python_command_mem[root] return python_command_mem[root]
end end
end end
@@ -45,7 +50,7 @@ function M.get_python_command(root)
if lib.files.exists("Pipfile") then if lib.files.exists("Pipfile") then
local success, exit_code, data = pcall(lib.process.run, { "pipenv", "--py" }, { stdout = true }) local success, exit_code, data = pcall(lib.process.run, { "pipenv", "--py" }, { stdout = true })
if success and exit_code == 0 then if success and exit_code == 0 then
local venv = data.stdout:gsub("\n", "") local venv = data.stdout:gsub("\r?\n", "")
if venv then if venv then
python_command_mem[root] = { Path:new(venv).filename } python_command_mem[root] = { Path:new(venv).filename }
return python_command_mem[root] return python_command_mem[root]
@@ -60,19 +65,131 @@ function M.get_python_command(root)
{ stdout = true } { stdout = true }
) )
if success and exit_code == 0 then if success and exit_code == 0 then
local venv = data.stdout:gsub("\n", "") local venv = data.stdout:gsub("\r?\n", "")
if venv then if venv then
python_command_mem[root] = { Path:new(venv, "bin", "python").filename } python_command_mem[root] = { Path:new(venv, venv_bin, "python").filename }
return python_command_mem[root] return python_command_mem[root]
end end
end end
end end
if lib.files.exists("uv.lock") then
local success, exit_code, data = pcall(
lib.process.run,
{ "uv", "run", "python", "-c", "import sys; print(sys.executable)" },
{ stdout = true }
)
if success and exit_code == 0 then
python_command_mem[root] = { Path:new(data).filename }
return python_command_mem[root]
end
end
-- Fallback to system Python. -- Fallback to system Python.
python_command_mem[root] = { python_command_mem[root] = {
async.fn.exepath("python3") or async.fn.exepath("python") or "python", nio.fn.exepath("python3") or nio.fn.exepath("python") or "python",
} }
return python_command_mem[root] return python_command_mem[root]
end end
---@return string
function M.get_script_path()
local paths = vim.api.nvim_get_runtime_file("neotest.py", true)
for _, path in ipairs(paths) do
if vim.endswith(path, ("neotest-python%sneotest.py"):format(lib.files.sep)) then
return path
end
end
error("neotest.py not found")
end
---@param python_command string[]
---@param config neotest-python._AdapterConfig
---@param runner string
---@return string
local function scan_test_function_pattern(runner, config, python_command)
local test_function_pattern = "^test"
return test_function_pattern
end
---@param python_command string[]
---@param config neotest-python._AdapterConfig
---@param runner string
---@return string
M.treesitter_queries = function(runner, config, python_command)
local test_function_pattern = scan_test_function_pattern(runner, config, python_command)
return string.format(
[[
;; Match undecorated functions
((function_definition
name: (identifier) @test.name)
(#match? @test.name "%s"))
@test.definition
;; Match decorated function, including decorators in definition
(decorated_definition
((function_definition
name: (identifier) @test.name)
(#match? @test.name "%s")))
@test.definition
;; Match decorated classes, including decorators in definition
(decorated_definition
(class_definition
name: (identifier) @namespace.name))
@namespace.definition
;; Match undecorated classes: namespaces nest so #not-has-parent is used
;; to ensure each namespace is annotated only once
(
(class_definition
name: (identifier) @namespace.name)
@namespace.definition
(#not-has-parent? @namespace.definition decorated_definition)
)
]],
test_function_pattern,
test_function_pattern
)
end
M.get_root =
lib.files.match_root_pattern("pyproject.toml", "setup.cfg", "mypy.ini", "pytest.ini", "setup.py")
function M.create_dap_config(python_path, script_path, script_args, dap_args)
return vim.tbl_extend("keep", {
type = "python",
name = "Neotest Debugger",
request = "launch",
python = python_path,
program = script_path,
cwd = nio.fn.getcwd(),
args = script_args,
}, dap_args or {})
end
local stored_runners = {}
function M.get_runner(python_path)
local command_str = table.concat(python_path, " ")
if stored_runners[command_str] then
return stored_runners[command_str]
end
local vim_test_runner = vim.g["test#python#runner"]
if vim_test_runner == "pyunit" then
return "unittest"
end
if
vim_test_runner and lib.func_util.index({ "unittest", "pytest", "django" }, vim_test_runner)
then
return vim_test_runner
end
local runner = M.module_exists("pytest_", python_path) and "pytest"
or M.module_exists("django", python_path) and "django"
or "unittest"
stored_runners[command_str] = runner
return runner
end
return M return M

View File

@@ -1,263 +1,77 @@
local async = require("neotest.async")
local lib = require("neotest.lib")
local base = require("neotest-python.base") local base = require("neotest-python.base")
local pytest = require("neotest-python.pytest") local create_adapter = require("neotest-python.adapter")
local function get_script() ---@class neotest-python.AdapterConfig
local paths = vim.api.nvim_get_runtime_file("neotest.py", true) ---@field dap? table
for _, path in ipairs(paths) do ---@field pytest_discover_instances? boolean
if vim.endswith(path, ("neotest-python%sneotest.py"):format(lib.files.sep)) then ---@field is_test_file? fun(file_path: string):boolean
return path ---@field python? string|string[]|fun(root: string):string[]
end ---@field args? string[]|fun(runner: string, position: neotest.Position, strategy: string): string[]
end ---@field runner? string|fun(python_command: string[]): string
error("neotest.py not found")
end
local dap_args
local is_test_file = base.is_test_file
local pytest_discover_instances = false
local function get_strategy_config(strategy, python, program, args)
local config = {
dap = function()
return vim.tbl_extend("keep", {
type = "python",
name = "Neotest Debugger",
request = "launch",
python = python,
program = program,
cwd = async.fn.getcwd(),
args = args,
}, dap_args or {})
end,
}
if config[strategy] then
return config[strategy]()
end
end
local get_python = function(root)
if not root then
root = vim.loop.cwd()
end
return base.get_python_command(root)
end
local get_args = function()
return {}
end
local stored_runners = {}
local get_runner = function(python_command)
local command_str = table.concat(python_command, " ")
if stored_runners[command_str] then
return stored_runners[command_str]
end
local vim_test_runner = vim.g["test#python#runner"]
if vim_test_runner == "pyunit" then
return "unittest"
end
if
vim_test_runner and lib.func_util.index({ "unittest", "pytest", "django" }, vim_test_runner)
then
return vim_test_runner
end
local runner = base.module_exists("pytest", python_command) and "pytest"
or base.module_exists("django", python_command) and "django"
or "unittest"
stored_runners[command_str] = runner
return runner
end
---@type neotest.Adapter
local PythonNeotestAdapter = { name = "neotest-python" }
PythonNeotestAdapter.root =
lib.files.match_root_pattern("pyproject.toml", "setup.cfg", "mypy.ini", "pytest.ini", "setup.py")
function PythonNeotestAdapter.is_test_file(file_path)
return is_test_file(file_path)
end
function PythonNeotestAdapter.filter_dir(name)
return name ~= "venv"
end
---@async
---@return neotest.Tree | nil
function PythonNeotestAdapter.discover_positions(path)
local root = PythonNeotestAdapter.root(path) or vim.loop.cwd()
local python = get_python(root)
local runner = get_runner(python)
-- Parse the file while pytest is running
local query = [[
;; Match undecorated functions
((function_definition
name: (identifier) @test.name)
(#match? @test.name "^test"))
@test.definition
;; Match decorated function, including decorators in definition
(decorated_definition
((function_definition
name: (identifier) @test.name)
(#match? @test.name "^test")))
@test.definition
;; Match decorated classes, including decorators in definition
(decorated_definition
(class_definition
name: (identifier) @namespace.name))
@namespace.definition
;; Match undecorated classes: namespaces nest so #not-has-parent is used
;; to ensure each namespace is annotated only once
(
(class_definition
name: (identifier) @namespace.name)
@namespace.definition
(#not-has-parent? @namespace.definition decorated_definition)
)
]]
local positions = lib.treesitter.parse_positions(path, query, {
require_namespaces = runner == "unittest",
})
if runner == "pytest" and pytest_discover_instances then
pytest.augment_positions(python, get_script(), path, positions, root)
end
return positions
end
---@async
---@param args neotest.RunArgs
---@return neotest.RunSpec
function PythonNeotestAdapter.build_spec(args)
local position = args.tree:data()
local results_path = async.fn.tempname()
local stream_path = async.fn.tempname()
lib.files.write(stream_path, "")
local root = PythonNeotestAdapter.root(position.path)
local python = get_python(root)
local runner = get_runner(python)
local stream_data, stop_stream = lib.files.stream_lines(stream_path)
local script_args = vim.tbl_flatten({
"--results-file",
results_path,
"--stream-file",
stream_path,
"--runner",
runner,
})
if pytest_discover_instances then
table.insert(script_args, "--emit-parameterized-ids")
end
table.insert(script_args, "--")
vim.list_extend(script_args, get_args(runner, position, args.strategy))
if args.extra_args then
vim.list_extend(script_args, args.extra_args)
end
if position then
table.insert(script_args, position.id)
end
local python_script = get_script()
local command = vim.tbl_flatten({
python,
python_script,
script_args,
})
local strategy_config = get_strategy_config(args.strategy, python, python_script, script_args)
---@type neotest.RunSpec
return {
command = command,
context = {
results_path = results_path,
stop_stream = stop_stream,
},
stream = function()
return function()
local lines = stream_data()
local results = {}
for _, line in ipairs(lines) do
local result = vim.json.decode(line, { luanil = { object = true } })
results[result.id] = result.result
end
return results
end
end,
strategy = strategy_config,
}
end
---@async
---@param spec neotest.RunSpec
---@param result neotest.StrategyResult
---@return neotest.Result[]
function PythonNeotestAdapter.results(spec, result)
spec.context.stop_stream()
local success, data = pcall(lib.files.read, spec.context.results_path)
if not success then
data = "{}"
end
-- TODO: Find out if this JSON option is supported in future
local results = vim.json.decode(data, { luanil = { object = true } })
for _, pos_result in pairs(results) do
result.output_path = pos_result.output_path
end
return results
end
local is_callable = function(obj) local is_callable = function(obj)
return type(obj) == "function" or (type(obj) == "table" and obj.__call) return type(obj) == "function" or (type(obj) == "table" and obj.__call)
end end
---@param config neotest-python.AdapterConfig
local augment_config = function(config)
local get_python_command = base.get_python_command
if config.python then
get_python_command = function(root)
local python = config.python
if is_callable(config.python) then
python = config.python(root)
end
if type(python) == "string" then
return { python }
end
if type(python) == "table" then
return python
end
return base.get_python(root)
end
end
local get_args = function()
return {}
end
if is_callable(config.args) then
get_args = config.args
elseif config.args then
get_args = function()
return config.args
end
end
local get_runner = base.get_runner
if is_callable(config.runner) then
get_runner = config.runner
elseif config.runner then
get_runner = function()
return config.runner
end
end
---@type neotest-python._AdapterConfig
return {
pytest_discovery = config.pytest_discover_instances,
dap_args = config.dap,
get_runner = get_runner,
get_args = get_args,
is_test_file = config.is_test_file or base.is_test_file,
get_python_command = get_python_command,
}
end
local PythonNeotestAdapter = create_adapter(augment_config({}))
setmetatable(PythonNeotestAdapter, { setmetatable(PythonNeotestAdapter, {
__call = function(_, opts) __call = function(_, config)
is_test_file = opts.is_test_file or is_test_file return create_adapter(augment_config(config))
if opts.python then
get_python = function(root)
local python = opts.python
if is_callable(opts.python) then
python = opts.python(root)
end
if type(python) == "string" then
return { python }
end
if type(python) == "table" then
return python
end
return base.get_python(root)
end
end
if is_callable(opts.args) then
get_args = opts.args
elseif opts.args then
get_args = function()
return opts.args
end
end
if is_callable(opts.runner) then
get_runner = opts.runner
elseif opts.runner then
get_runner = function()
return opts.runner
end
end
if type(opts.dap) == "table" then
dap_args = opts.dap
end
if opts.pytest_discover_instances ~= nil then
pytest_discover_instances = opts.pytest_discover_instances
end
return PythonNeotestAdapter
end, end,
}) })

View File

@@ -3,6 +3,52 @@ local logger = require("neotest.logging")
local M = {} local M = {}
local unix = require("socket.unix")
local socket_path = ""
--- Run a command, connect to the UNIX socket it prints, send messages, return response
-- @param cmd string: Command to run (should output socket path)
-- @param messages table|string: One or more messages to send
-- @return string: Concatenated response from server
local function talk_unix(cmd, messages)
-- 1. Run the command and capture its output (socket path)
if not socket_path then
local handle = assert(io.popen(cmd, "r"))
socket_path = handle:read("*l")
handle:close()
assert(socket_path, "Command did not return a socket path")
end
-- 2. Connect to the unix socket
local client = assert(unix())
assert(client:connect(socket_path))
-- 3. Send message(s)
if type(messages) == "string" then
messages = { messages }
end
for _, msg in ipairs(messages) do
assert(client:send(msg .. "\n"))
end
-- 4. Read response until EOF or timeout
client:settimeout(1)
local chunks = {}
while true do
local data, err = client:receive("*l")
if not data then
if err ~= "timeout" and err ~= "closed" then
error("Socket receive error: " .. tostring(err))
end
break
end
table.insert(chunks, data)
end
client:close()
return table.concat(chunks, "\n")
end
---@async ---@async
---Add test instances for path in root to positions ---Add test instances for path in root to positions
---@param positions neotest.Tree ---@param positions neotest.Tree
@@ -53,20 +99,13 @@ end
---@param positions neotest.Tree ---@param positions neotest.Tree
---@param root string ---@param root string
local function discover_params(python, script, path, positions, root) local function discover_params(python, script, path, positions, root)
local cmd = vim.tbl_flatten({ python, script, "--pytest-collect", path }) local cmd = vim.tbl_flatten({ python, script, "--pytest-collect", "-s", path })
logger.debug("Running test instance discovery:", cmd) logger.debug("Running test instance discovery:", cmd)
local test_params = {} local test_params = {}
local res, data = lib.process.run(cmd, { stdout = true, stderr = true }) local data = talk_unix(cmd, path)
if res ~= 0 then
logger.warn("Pytest discovery failed")
if data.stderr then
logger.debug(data.stderr)
end
return {}
end
for line in vim.gsplit(data.stdout, "\n", true) do for line in vim.gsplit(data, "\n", true) do
local param_index = string.find(line, "[", nil, true) local param_index = string.find(line, "[", nil, true)
if param_index then if param_index then
local test_id = root .. lib.files.path.sep .. string.sub(line, 1, param_index - 1) local test_id = root .. lib.files.path.sep .. string.sub(line, 1, param_index - 1)

View File

@@ -58,6 +58,13 @@ def main(argv: List[str]):
collect(argv) collect(argv)
return return
if "--pytest-extract-test-name-template" in argv:
argv.remove("--pytest-extract-test-name-template")
from .pytest import extract_test_name_template
extract_test_name_template(argv)
return
args = parser.parse_args(argv) args = parser.parse_args(argv)
adapter = get_adapter(TestRunner(args.runner), args.emit_parameterized_ids) adapter = get_adapter(TestRunner(args.runner), args.emit_parameterized_ids)

View File

@@ -64,16 +64,18 @@ class DjangoNeotestAdapter(CaseUtilsMixin, NeotestAdapter):
class DjangoUnittestRunner(CaseUtilsMixin, DiscoverRunner): class DjangoUnittestRunner(CaseUtilsMixin, DiscoverRunner):
def __init__(self, **kwargs): def __init__(self, **kwargs):
django_setup() django_setup()
kwargs["interactive"] = False
DiscoverRunner.__init__(self, **kwargs) DiscoverRunner.__init__(self, **kwargs)
@classmethod @classmethod
def add_arguments(cls, parser): def add_arguments(cls, parser):
DiscoverRunner.add_arguments(parser) DiscoverRunner.add_arguments(parser)
parser.add_argument("--verbosity", nargs="?", default=2) parser.add_argument("--verbosity", nargs="?", default=2)
parser.add_argument( if "failfast" not in parser.parse_args([]):
"--failfast", parser.add_argument(
action="store_true", "--failfast",
) action="store_true",
)
# override # override
def get_resultclass(self): def get_resultclass(self):

View File

@@ -0,0 +1,119 @@
import atexit
from collections.abc import Iterable
import hashlib
import itertools
import logging
import os
import signal
import socket
import sys
import pytest
import importlib
import importlib.util
import argparse
import inspect
from pathlib import Path
from typing import Any, Callable, cast
SOCKET_ROOT_DIR = Path("/tmp/neotest-python")
def get_tests(paths: Iterable[str]) -> list[str]:
test_names: list[str] = []
for path in (Path(path) for path in paths):
spec = importlib.util.spec_from_file_location(path.name, path)
if spec is None or spec.loader is None:
raise ModuleNotFoundError
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
tests: Iterable[tuple[str, Callable[..., Any]]] = inspect.getmembers(
mod,
predicate=lambda member: inspect.isfunction(member)
and member.__name__.startswith("test_"),
)
for _, test in tests:
if not (marks := getattr(test, "pytestmark", None)):
test_names.append(f"{test.__module__}::{test.__name__}")
continue
for mark in cast(Iterable[pytest.Mark], marks):
if mark.name == "parametrize":
ids = mark.kwargs.get("ids", [])
for id_, params in itertools.zip_longest(ids, mark.args[1]):
id_ = getattr(params, "id", None) or id_ or repr(params)
test_names.append(f"{test.__module__}::{test.__name__}[{id_}]")
return test_names
def main(
paths: Iterable[str],
collect_only: bool = True,
quiet: bool = True,
verbosity: int = 0,
socket_mode: bool = False,
):
tests = get_tests(paths)
if tests:
print("\n".join(tests))
if not socket_mode:
return
if not SOCKET_ROOT_DIR.exists():
SOCKET_ROOT_DIR.mkdir()
python_socket_path = (
SOCKET_ROOT_DIR / hashlib.sha1(sys.executable.encode()).digest().hex()
)
if python_socket_path.exists():
print(python_socket_path)
return
child_pid = os.fork()
if child_pid != 0:
return
atexit.register(lambda: os.unlink(python_socket_path))
signal.signal(signal.SIGTERM, lambda x, _: os.unlink(python_socket_path))
signal.signal(signal.SIGINT, lambda x, _: os.unlink(python_socket_path))
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(python_socket_path.absolute().as_posix())
sock.listen()
print(python_socket_path)
while ar := sock.accept():
con, _ = ar
try:
paths = con.recv(1000).decode().splitlines()
tests = get_tests(paths)
con.send("\n".join(tests).encode())
except Exception:
logging.exception("Error")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--collect-only", dest="collect_only", action="store_true")
parser.add_argument("-q", dest="quiet", action="store_true")
parser.add_argument("--verbosity", dest="verbosity", default=0)
parser.add_argument("-s", dest="socket_mode", action="store_true")
parser.add_argument(
"paths",
nargs="*",
)
args = parser.parse_args()
paths: list[str] = args.paths
main(
paths=paths,
quiet=args.quiet,
collect_only=args.collect_only,
verbosity=args.verbosity,
socket_mode=args.socket_mode,
)

View File

@@ -1,10 +1,13 @@
from io import StringIO from io import StringIO
import json
from pathlib import Path from pathlib import Path
from typing import Callable, Dict, List, Optional, Union from typing import Callable, Dict, List, Optional, Union
import params_getter
import pytest import pytest
from _pytest._code.code import ExceptionRepr from _pytest._code.code import ExceptionRepr
from _pytest.terminal import TerminalReporter from _pytest.terminal import TerminalReporter
from _pytest.fixtures import FixtureLookupErrorRepr
from .base import NeotestAdapter, NeotestError, NeotestResult, NeotestResultStatus from .base import NeotestAdapter, NeotestError, NeotestResult, NeotestResultStatus
@@ -100,8 +103,9 @@ class NeotestResultCollector:
outcome = yield outcome = yield
report = outcome.get_result() report = outcome.get_result()
if report.when != "call" and not ( if not (
report.outcome == "skipped" and report.when == "setup" report.when == "call"
or (report.when == "setup" and report.outcome in ("skipped", "failed"))
): ):
return return
@@ -136,6 +140,13 @@ class NeotestResultCollector:
errors.append( errors.append(
{"message": msg_prefix + error_message, "line": error_line} {"message": msg_prefix + error_message, "line": error_line}
) )
elif isinstance(exc_repr, FixtureLookupErrorRepr):
errors.append(
{
"message": msg_prefix + exc_repr.errorstring,
"line": exc_repr.firstlineno,
}
)
else: else:
# TODO: Figure out how these are returned and how to represent # TODO: Figure out how these are returned and how to represent
raise Exception( raise Exception(
@@ -196,5 +207,17 @@ class NeotestDebugpyPlugin:
additional_info.is_tracing -= 1 additional_info.is_tracing -= 1
class TestNameTemplateExtractor:
@staticmethod
def pytest_collection_modifyitems(config):
config = {"python_functions": config.getini("python_functions")[0]}
print(f"\n{json.dumps(config)}\n")
def extract_test_name_template(args):
# pytest.main(args=["-k", "neotest_none"], plugins=[TestNameTemplateExtractor])
pass
def collect(args): def collect(args):
pytest.main(["--collect-only", "-q"] + args) params_getter.main([], socket_mode=True)

View File

@@ -1,6 +1,12 @@
[project]
name = "neotest-python"
version = "0.0.1"
dependencies = [
"pytest>=8.4.1",
]
[tools.black] [tools.black]
line-length=120 line-length = 120
[tool.isort] [tool.isort]
profile = "black" profile = "black"
@@ -8,8 +14,7 @@ multi_line_output = 3
[tool.pytest.ini_options] [tool.pytest.ini_options]
filterwarnings = [ filterwarnings = [
"error", "error",
"ignore::pytest.PytestCollectionWarning", "ignore::pytest.PytestCollectionWarning",
"ignore:::pynvim[.*]" "ignore:::pynvim[.*]",
] ]

75
uv.lock generated Normal file
View File

@@ -0,0 +1,75 @@
version = 1
revision = 2
requires-python = ">=3.13"
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "iniconfig"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f2/97/ebf4da567aa6827c909642694d71c9fcf53e5b504f2d96afea02718862f3/iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7", size = 4793, upload-time = "2025-03-19T20:09:59.721Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050, upload-time = "2025-03-19T20:10:01.071Z" },
]
[[package]]
name = "neotest-python"
version = "0.0.1"
source = { virtual = "." }
dependencies = [
{ name = "pytest" },
]
[package.metadata]
requires-dist = [{ name = "pytest", specifier = ">=8.4.1" }]
[[package]]
name = "packaging"
version = "25.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a1/d4/1fc4078c65507b51b96ca8f8c3ba19e6a61c8253c72794544580a7b6c24d/packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f", size = 165727, upload-time = "2025-04-19T11:48:59.673Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pytest"
version = "8.4.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/08/ba/45911d754e8eba3d5a841a5ce61a65a685ff1798421ac054f85aa8747dfb/pytest-8.4.1.tar.gz", hash = "sha256:7c67fd69174877359ed9371ec3af8a3d2b04741818c51e5e99cc1742251fa93c", size = 1517714, upload-time = "2025-06-18T05:48:06.109Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/29/16/c8a903f4c4dffe7a12843191437d7cd8e32751d5de349d45d3fe69544e87/pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7", size = 365474, upload-time = "2025-06-18T05:48:03.955Z" },
]