Compare commits

...

12 Commits

Author SHA1 Message Date
Itai Bohadana
72223525c4 fix(params_getter): try to bind even if socket exists 2025-09-03 17:58:45 +03:00
Itai Bohadana
ab80f41d3f fix(params_getter): try to bind even if socket exists 2025-09-02 13:38:23 +03:00
Itai Bohadana
9f6fbd6e04 feat(pytest): use socket instead of a shitton of processes 2025-08-28 15:51:42 +03:00
nicos68
ed9b4d794b feat(pytest): use python_functions config option (#100) 2025-06-23 22:28:18 +01:00
EdVanDance
34c9f6f3dc fix: python detection on windows (#92)
- virtualenvs on windows use `Scripts` instead of `bin`
  993ba1316a
- line endings of course
2025-05-28 16:18:54 +01:00
Maksym Bieńkowski
61e878f4b3 fix(django): make the test runner non-interactive by default (#95) 2025-05-28 16:17:40 +01:00
Illia Denysenko
ae21072712 feat: uv python detection (#98) (#98) 2025-05-28 16:16:29 +01:00
Neal Joslin
a2861ab3c9 fix(django): duplicate --failfast check (#84) 2024-10-02 20:26:48 +01:00
Liam Jarvis
72603dfdba fix: remove global print function call (#79)
This commit removes a call to an unknown print function call
2024-08-19 10:40:46 +01:00
Rónán Carrigan
e5bff6dcf3 fix(pytest): handle failed setup
Previously we would only handle skipped setups, we need to handle failed
setups in cases of fixture lookup errors.

See #37
2024-08-14 16:21:33 +01:00
Rónán Carrigan
0ab9ad3570 feat: multiple adapter instances
Allows calling the module multiple times with different configs for use
in different in projects.
2024-08-14 14:31:43 +01:00
johnybx
2e83d2bc00 fix: - use --verbosity=0 when collecting pytest parametrized tests (#60) 2024-01-15 08:03:46 +00:00
10 changed files with 760 additions and 301 deletions

View File

@@ -0,0 +1,143 @@
local nio = require("nio")
local lib = require("neotest.lib")
local pytest = require("neotest-python.pytest")
local base = require("neotest-python.base")
---@class neotest-python._AdapterConfig
---@field dap_args? table
---@field pytest_discovery? boolean
---@field is_test_file fun(file_path: string):boolean
---@field get_python_command fun(root: string):string[]
---@field get_args fun(runner: string, position: neotest.Position, strategy: string): string[]
---@field get_runner fun(python_command: string[]): string
---@param config neotest-python._AdapterConfig
---@return neotest.Adapter
return function(config)
---@param run_args neotest.RunArgs
---@param results_path string
---@param stream_path string
---@param runner string
---@return string[]
local function build_script_args(run_args, results_path, stream_path, runner)
local script_args = {
"--results-file",
results_path,
"--stream-file",
stream_path,
"--runner",
runner,
}
if config.pytest_discovery then
table.insert(script_args, "--emit-parameterized-ids")
end
local position = run_args.tree:data()
table.insert(script_args, "--")
vim.list_extend(script_args, config.get_args(runner, position, run_args.strategy))
if run_args.extra_args then
vim.list_extend(script_args, run_args.extra_args)
end
if position then
table.insert(script_args, position.id)
end
return script_args
end
---@type neotest.Adapter
return {
name = "neotest-python",
root = base.get_root,
filter_dir = function(name)
return name ~= "venv"
end,
is_test_file = config.is_test_file,
discover_positions = function(path)
local root = base.get_root(path) or vim.loop.cwd() or ""
local python_command = config.get_python_command(root)
local runner = config.get_runner(python_command)
local positions = lib.treesitter.parse_positions(
path,
base.treesitter_queries(runner, config, python_command),
{
require_namespaces = runner == "unittest",
}
)
if runner == "pytest" and config.pytest_discovery then
pytest.augment_positions(python_command, base.get_script_path(), path, positions, root)
end
return positions
end,
---@param args neotest.RunArgs
---@return neotest.RunSpec
build_spec = function(args)
local position = args.tree:data()
local root = base.get_root(position.path) or vim.loop.cwd() or ""
local python_command = config.get_python_command(root)
local runner = config.get_runner(python_command)
local results_path = nio.fn.tempname()
local stream_path = nio.fn.tempname()
lib.files.write(stream_path, "")
local stream_data, stop_stream = lib.files.stream_lines(stream_path)
local script_args = build_script_args(args, results_path, stream_path, runner)
local script_path = base.get_script_path()
local strategy_config
if args.strategy == "dap" then
strategy_config =
base.create_dap_config(python_command, script_path, script_args, config.dap_args)
end
---@type neotest.RunSpec
return {
command = vim.iter({ python_command, script_path, script_args }):flatten():totable(),
context = {
results_path = results_path,
stop_stream = stop_stream,
},
stream = function()
return function()
local lines = stream_data()
local results = {}
for _, line in ipairs(lines) do
local result = vim.json.decode(line, { luanil = { object = true } })
results[result.id] = result.result
end
return results
end
end,
strategy = strategy_config,
}
end,
---@param spec neotest.RunSpec
---@param result neotest.StrategyResult
---@return neotest.Result[]
results = function(spec, result)
spec.context.stop_stream()
local success, data = pcall(lib.files.read, spec.context.results_path)
if not success then
data = "{}"
end
local results = vim.json.decode(data, { luanil = { object = true } })
for _, pos_result in pairs(results) do
result.output_path = pos_result.output_path
end
return results
end,
}
end

View File

@@ -1,4 +1,4 @@
local async = require("neotest.async")
local nio = require("nio")
local lib = require("neotest.lib")
local Path = require("plenary.path")
@@ -14,30 +14,35 @@ function M.is_test_file(file_path)
end
M.module_exists = function(module, python_command)
return lib.process.run(vim.tbl_flatten({
return lib.process.run(vim
.iter({
python_command,
"-c",
"import " .. module,
})) == 0
})
:flatten()
:totable()) == 0
end
local python_command_mem = {}
local venv_bin = vim.loop.os_uname().sysname:match("Windows") and "Scripts" or "bin"
---@return string[]
function M.get_python_command(root)
root = root or vim.loop.cwd()
if python_command_mem[root] then
return python_command_mem[root]
end
-- Use activated virtualenv.
if vim.env.VIRTUAL_ENV then
python_command_mem[root] = { Path:new(vim.env.VIRTUAL_ENV, "bin", "python").filename }
python_command_mem[root] = { Path:new(vim.env.VIRTUAL_ENV, venv_bin, "python").filename }
return python_command_mem[root]
end
for _, pattern in ipairs({ "*", ".*" }) do
local match = async.fn.glob(Path:new(root or async.fn.getcwd(), pattern, "pyvenv.cfg").filename)
local match = nio.fn.glob(Path:new(root or nio.fn.getcwd(), pattern, "pyvenv.cfg").filename)
if match ~= "" then
python_command_mem[root] = { (Path:new(match):parent() / "bin" / "python").filename }
python_command_mem[root] = { (Path:new(match):parent() / venv_bin / "python").filename }
return python_command_mem[root]
end
end
@@ -45,7 +50,7 @@ function M.get_python_command(root)
if lib.files.exists("Pipfile") then
local success, exit_code, data = pcall(lib.process.run, { "pipenv", "--py" }, { stdout = true })
if success and exit_code == 0 then
local venv = data.stdout:gsub("\n", "")
local venv = data.stdout:gsub("\r?\n", "")
if venv then
python_command_mem[root] = { Path:new(venv).filename }
return python_command_mem[root]
@@ -60,19 +65,131 @@ function M.get_python_command(root)
{ stdout = true }
)
if success and exit_code == 0 then
local venv = data.stdout:gsub("\n", "")
local venv = data.stdout:gsub("\r?\n", "")
if venv then
python_command_mem[root] = { Path:new(venv, "bin", "python").filename }
python_command_mem[root] = { Path:new(venv, venv_bin, "python").filename }
return python_command_mem[root]
end
end
end
if lib.files.exists("uv.lock") then
local success, exit_code, data = pcall(
lib.process.run,
{ "uv", "run", "python", "-c", "import sys; print(sys.executable)" },
{ stdout = true }
)
if success and exit_code == 0 then
python_command_mem[root] = { Path:new(data).filename }
return python_command_mem[root]
end
end
-- Fallback to system Python.
python_command_mem[root] = {
async.fn.exepath("python3") or async.fn.exepath("python") or "python",
nio.fn.exepath("python3") or nio.fn.exepath("python") or "python",
}
return python_command_mem[root]
end
---@return string
function M.get_script_path()
local paths = vim.api.nvim_get_runtime_file("neotest.py", true)
for _, path in ipairs(paths) do
if vim.endswith(path, ("python%sneotest.py"):format(lib.files.sep)) then
return path
end
end
error("neotest.py not found")
end
---@param python_command string[]
---@param config neotest-python._AdapterConfig
---@param runner string
---@return string
local function scan_test_function_pattern(runner, config, python_command)
local test_function_pattern = "^test"
return test_function_pattern
end
---@param python_command string[]
---@param config neotest-python._AdapterConfig
---@param runner string
---@return string
M.treesitter_queries = function(runner, config, python_command)
local test_function_pattern = scan_test_function_pattern(runner, config, python_command)
return string.format(
[[
;; Match undecorated functions
((function_definition
name: (identifier) @test.name)
(#match? @test.name "%s"))
@test.definition
;; Match decorated function, including decorators in definition
(decorated_definition
((function_definition
name: (identifier) @test.name)
(#match? @test.name "%s")))
@test.definition
;; Match decorated classes, including decorators in definition
(decorated_definition
(class_definition
name: (identifier) @namespace.name))
@namespace.definition
;; Match undecorated classes: namespaces nest so #not-has-parent is used
;; to ensure each namespace is annotated only once
(
(class_definition
name: (identifier) @namespace.name)
@namespace.definition
(#not-has-parent? @namespace.definition decorated_definition)
)
]],
test_function_pattern,
test_function_pattern
)
end
M.get_root =
lib.files.match_root_pattern("pyproject.toml", "setup.cfg", "mypy.ini", "pytest.ini", "setup.py")
function M.create_dap_config(python_path, script_path, script_args, dap_args)
return vim.tbl_extend("keep", {
type = "python",
name = "Neotest Debugger",
request = "launch",
python = python_path,
program = script_path,
cwd = nio.fn.getcwd(),
args = script_args,
}, dap_args or {})
end
local stored_runners = {}
function M.get_runner(python_path)
local command_str = table.concat(python_path, " ")
if stored_runners[command_str] then
return stored_runners[command_str]
end
local vim_test_runner = vim.g["test#python#runner"]
if vim_test_runner == "pyunit" then
return "unittest"
end
if
vim_test_runner and lib.func_util.index({ "unittest", "pytest", "django" }, vim_test_runner)
then
return vim_test_runner
end
local runner = M.module_exists("pytest_", python_path) and "pytest"
or M.module_exists("django", python_path) and "django"
or "unittest"
stored_runners[command_str] = runner
return runner
end
return M

View File

@@ -1,230 +1,27 @@
local async = require("neotest.async")
local lib = require("neotest.lib")
local base = require("neotest-python.base")
local pytest = require("neotest-python.pytest")
local create_adapter = require("neotest-python.adapter")
local function get_script()
local paths = vim.api.nvim_get_runtime_file("neotest.py", true)
for _, path in ipairs(paths) do
if vim.endswith(path, ("neotest-python%sneotest.py"):format(lib.files.sep)) then
return path
end
end
error("neotest.py not found")
end
local dap_args
local is_test_file = base.is_test_file
local pytest_discover_instances = false
local function get_strategy_config(strategy, python, program, args)
local config = {
dap = function()
return vim.tbl_extend("keep", {
type = "python",
name = "Neotest Debugger",
request = "launch",
python = python,
program = program,
cwd = async.fn.getcwd(),
args = args,
}, dap_args or {})
end,
}
if config[strategy] then
return config[strategy]()
end
end
local get_python = function(root)
if not root then
root = vim.loop.cwd()
end
return base.get_python_command(root)
end
local get_args = function()
return {}
end
local stored_runners = {}
local get_runner = function(python_command)
local command_str = table.concat(python_command, " ")
if stored_runners[command_str] then
return stored_runners[command_str]
end
local vim_test_runner = vim.g["test#python#runner"]
if vim_test_runner == "pyunit" then
return "unittest"
end
if
vim_test_runner and lib.func_util.index({ "unittest", "pytest", "django" }, vim_test_runner)
then
return vim_test_runner
end
local runner = base.module_exists("pytest", python_command) and "pytest"
or base.module_exists("django", python_command) and "django"
or "unittest"
stored_runners[command_str] = runner
return runner
end
---@type neotest.Adapter
local PythonNeotestAdapter = { name = "neotest-python" }
PythonNeotestAdapter.root =
lib.files.match_root_pattern("pyproject.toml", "setup.cfg", "mypy.ini", "pytest.ini", "setup.py")
function PythonNeotestAdapter.is_test_file(file_path)
return is_test_file(file_path)
end
function PythonNeotestAdapter.filter_dir(name)
return name ~= "venv"
end
---@async
---@return neotest.Tree | nil
function PythonNeotestAdapter.discover_positions(path)
local root = PythonNeotestAdapter.root(path) or vim.loop.cwd()
local python = get_python(root)
local runner = get_runner(python)
-- Parse the file while pytest is running
local query = [[
;; Match undecorated functions
((function_definition
name: (identifier) @test.name)
(#match? @test.name "^test"))
@test.definition
;; Match decorated function, including decorators in definition
(decorated_definition
((function_definition
name: (identifier) @test.name)
(#match? @test.name "^test")))
@test.definition
;; Match decorated classes, including decorators in definition
(decorated_definition
(class_definition
name: (identifier) @namespace.name))
@namespace.definition
;; Match undecorated classes: namespaces nest so #not-has-parent is used
;; to ensure each namespace is annotated only once
(
(class_definition
name: (identifier) @namespace.name)
@namespace.definition
(#not-has-parent? @namespace.definition decorated_definition)
)
]]
local positions = lib.treesitter.parse_positions(path, query, {
require_namespaces = runner == "unittest",
})
if runner == "pytest" and pytest_discover_instances then
pytest.augment_positions(python, get_script(), path, positions, root)
end
return positions
end
---@async
---@param args neotest.RunArgs
---@return neotest.RunSpec
function PythonNeotestAdapter.build_spec(args)
local position = args.tree:data()
local results_path = async.fn.tempname()
local stream_path = async.fn.tempname()
lib.files.write(stream_path, "")
local root = PythonNeotestAdapter.root(position.path)
local python = get_python(root)
local runner = get_runner(python)
local stream_data, stop_stream = lib.files.stream_lines(stream_path)
local script_args = vim.tbl_flatten({
"--results-file",
results_path,
"--stream-file",
stream_path,
"--runner",
runner,
})
if pytest_discover_instances then
table.insert(script_args, "--emit-parameterized-ids")
end
table.insert(script_args, "--")
vim.list_extend(script_args, get_args(runner, position, args.strategy))
if args.extra_args then
vim.list_extend(script_args, args.extra_args)
end
if position then
table.insert(script_args, position.id)
end
local python_script = get_script()
local command = vim.tbl_flatten({
python,
python_script,
script_args,
})
local strategy_config = get_strategy_config(args.strategy, python, python_script, script_args)
---@type neotest.RunSpec
return {
command = command,
context = {
results_path = results_path,
stop_stream = stop_stream,
},
stream = function()
return function()
local lines = stream_data()
local results = {}
for _, line in ipairs(lines) do
local result = vim.json.decode(line, { luanil = { object = true } })
results[result.id] = result.result
end
return results
end
end,
strategy = strategy_config,
}
end
---@async
---@param spec neotest.RunSpec
---@param result neotest.StrategyResult
---@return neotest.Result[]
function PythonNeotestAdapter.results(spec, result)
spec.context.stop_stream()
local success, data = pcall(lib.files.read, spec.context.results_path)
if not success then
data = "{}"
end
-- TODO: Find out if this JSON option is supported in future
local results = vim.json.decode(data, { luanil = { object = true } })
for _, pos_result in pairs(results) do
result.output_path = pos_result.output_path
end
return results
end
---@class neotest-python.AdapterConfig
---@field dap? table
---@field pytest_discover_instances? boolean
---@field is_test_file? fun(file_path: string):boolean
---@field python? string|string[]|fun(root: string):string[]
---@field args? string[]|fun(runner: string, position: neotest.Position, strategy: string): string[]
---@field runner? string|fun(python_command: string[]): string
local is_callable = function(obj)
return type(obj) == "function" or (type(obj) == "table" and obj.__call)
end
setmetatable(PythonNeotestAdapter, {
__call = function(_, opts)
is_test_file = opts.is_test_file or is_test_file
if opts.python then
get_python = function(root)
local python = opts.python
---@param config neotest-python.AdapterConfig
local augment_config = function(config)
local get_python_command = base.get_python_command
if config.python then
get_python_command = function(root)
local python = config.python
if is_callable(opts.python) then
python = opts.python(root)
if is_callable(config.python) then
python = config.python(root)
end
if type(python) == "string" then
@@ -237,27 +34,44 @@ setmetatable(PythonNeotestAdapter, {
return base.get_python(root)
end
end
if is_callable(opts.args) then
get_args = opts.args
elseif opts.args then
local get_args = function()
return {}
end
if is_callable(config.args) then
get_args = config.args
elseif config.args then
get_args = function()
return opts.args
return config.args
end
end
if is_callable(opts.runner) then
get_runner = opts.runner
elseif opts.runner then
local get_runner = base.get_runner
if is_callable(config.runner) then
get_runner = config.runner
elseif config.runner then
get_runner = function()
return opts.runner
return config.runner
end
end
if type(opts.dap) == "table" then
dap_args = opts.dap
---@type neotest-python._AdapterConfig
return {
pytest_discovery = config.pytest_discover_instances,
dap_args = config.dap,
get_runner = get_runner,
get_args = get_args,
is_test_file = config.is_test_file or base.is_test_file,
get_python_command = get_python_command,
}
end
if opts.pytest_discover_instances ~= nil then
pytest_discover_instances = opts.pytest_discover_instances
end
return PythonNeotestAdapter
local PythonNeotestAdapter = create_adapter(augment_config({}))
setmetatable(PythonNeotestAdapter, {
__call = function(_, config)
return create_adapter(augment_config(config))
end,
})

View File

@@ -1,4 +1,5 @@
local lib = require("neotest.lib")
local nio = require("nio")
local logger = require("neotest.logging")
local M = {}
@@ -15,7 +16,7 @@ local function add_test_instances(positions, test_params)
for _, params_str in ipairs(pos_params) do
local new_data = vim.tbl_extend("force", position, {
id = string.format("%s[%s]", position.id, params_str),
name = string.format("%s[%s]", position.name, params_str),
name = string.format("%s", params_str),
})
new_data.range = nil
@@ -26,6 +27,74 @@ local function add_test_instances(positions, test_params)
end
end
local socket_path = ""
local socket_future = nio.control.future()
local socket_start = nio.control.future()
--- Run a command, connect to the UNIX socket it prints, send messages, return response
-- @param cmd string: Command to run (should output socket path)
-- @param messages table|string: One or more messages to send
-- @param callback uv.read_start.callback: One or more messages to send
-- @return string: Concatenated response from server
local function get_socket_path(cmd, messages, callback)
-- 1. Run the command and capture its output (socket path)
if socket_path == "" and not socket_start.is_set() then
local stdout = assert(vim.uv.new_pipe(true))
local stderr = assert(vim.uv.new_pipe(true))
local stdin = assert(vim.uv.new_pipe(true))
socket_start.set()
local handle
handle, _ = vim.uv.spawn(cmd[1], {
stdio = { stdin, stdout, stderr },
detached = true,
args = #cmd > 1 and vim.list_slice(cmd, 2, #cmd) or { cmd[1] },
}, function(code, signal)
vim.uv.close(stdout)
vim.uv.close(stderr)
vim.uv.close(stdin)
vim.uv.close(handle)
end)
vim.uv.read_start(stdout, function(err, s)
if s ~= nil then
socket_path = string.gsub(s, "\n$", "")
socket_future.set()
end
end)
vim.uv.read_start(stderr, function(err, s)
if err ~= nil then
vim.print(err, s)
end
end)
end
socket_future.wait()
-- 2. Connect to the unix socket
local client = assert(vim.uv.new_pipe(false))
client:connect(socket_path, function(err)
if err ~= nil then
vim.print("Error", err)
end
-- 3. Send message(s)
if type(messages) == "string" then
messages = { messages }
end
for _, msg in ipairs(messages) do
vim.uv.write(client, msg .. "\n", function(err_write)
client:read_start(function(err_read, data)
if err_read ~= nil then
vim.print(err_read, data)
else
callback(err_read, data)
end
client:close()
end)
end)
end
end)
end
---@async
---@param path string
---@return boolean
@@ -57,16 +126,16 @@ local function discover_params(python, script, path, positions, root)
logger.debug("Running test instance discovery:", cmd)
local test_params = {}
local res, data = lib.process.run(cmd, { stdout = true, stderr = true })
if res ~= 0 then
logger.warn("Pytest discovery failed")
if data.stderr then
logger.debug(data.stderr)
get_socket_path(cmd, path, function(err, data)
if err ~= nil then
vim.print(err, data)
return
end
return {}
if data == nil then
return
end
for line in vim.gsplit(data.stdout, "\n", true) do
for line in vim.gsplit(data, "\n", { trimempty = true, plain = true }) do
local param_index = string.find(line, "[", nil, true)
if param_index then
local test_id = root .. lib.files.path.sep .. string.sub(line, 1, param_index - 1)
@@ -81,7 +150,8 @@ local function discover_params(python, script, path, positions, root)
end
end
end
return test_params
return add_test_instances(positions, test_params)
end)
end
---@async
@@ -93,8 +163,7 @@ end
---@param root string
function M.augment_positions(python, script, path, positions, root)
if has_parametrize(path) then
local test_params = discover_params(python, script, path, positions, root)
add_test_instances(positions, test_params)
discover_params(python, script, path, positions, root)
end
end

View File

@@ -14,7 +14,7 @@ class TestRunner(str, Enum):
def get_adapter(runner: TestRunner, emit_parameterized_ids: bool) -> NeotestAdapter:
if runner == TestRunner.PYTEST:
from .pytest import PytestNeotestAdapter
from .pytest_ import PytestNeotestAdapter
return PytestNeotestAdapter(emit_parameterized_ids)
elif runner == TestRunner.UNITTEST:
@@ -53,11 +53,18 @@ parser.add_argument("args", nargs="*")
def main(argv: List[str]):
if "--pytest-collect" in argv:
argv.remove("--pytest-collect")
from .pytest import collect
from .pytest_ import collect
collect(argv)
return
if "--pytest-extract-test-name-template" in argv:
argv.remove("--pytest-extract-test-name-template")
from .pytest_ import extract_test_name_template
extract_test_name_template(argv)
return
args = parser.parse_args(argv)
adapter = get_adapter(TestRunner(args.runner), args.emit_parameterized_ids)

View File

@@ -64,12 +64,14 @@ class DjangoNeotestAdapter(CaseUtilsMixin, NeotestAdapter):
class DjangoUnittestRunner(CaseUtilsMixin, DiscoverRunner):
def __init__(self, **kwargs):
django_setup()
kwargs["interactive"] = False
DiscoverRunner.__init__(self, **kwargs)
@classmethod
def add_arguments(cls, parser):
DiscoverRunner.add_arguments(parser)
parser.add_argument("--verbosity", nargs="?", default=2)
if "failfast" not in parser.parse_args([]):
parser.add_argument(
"--failfast",
action="store_true",

View File

@@ -0,0 +1,200 @@
import asyncio
import atexit
from collections import deque
from collections.abc import Iterable
import hashlib
import os
import select
import signal
import socket
import sys
from _pytest.mark.structures import ParameterSet
from _pytest.python import IdMaker
import pytest
import importlib
import importlib.util
import argparse
import inspect
from pathlib import Path
from typing import Any, Callable, Generator, Self, cast
SOCKET_ROOT_DIR = Path("/tmp/neotest-python")
class LineReceiver:
def __init__(self, s: socket.socket) -> None:
self._s = s
self._s.setblocking(False)
self._tmp_buffer = ""
self._tmp_lines: deque[str] = deque()
def __iter__(self) -> Self:
return self
def __next__(self) -> str:
if self._tmp_lines:
return self._tmp_lines.popleft()
ready, _, _ = select.select([self._s], [], [], 0.1)
if ready:
msg = self._s.recv(256).decode()
else:
msg = ""
lines = msg.splitlines()
if not lines:
raise StopIteration
lines[0] = self._tmp_buffer + lines[0]
if not msg.endswith("\n"):
self._tmp_buffer = lines[-1]
else:
self._tmp_buffer = ""
self._tmp_lines.extend(lines)
return self._tmp_lines.popleft()
def get_tests(paths: Iterable[str]) -> Generator[str, None, None]:
root = Path(os.curdir).absolute()
if Path(os.curdir).absolute().as_posix() not in sys.path:
sys.path.insert(0, Path(os.curdir).absolute().as_posix())
for path in (Path(path) for path in paths):
spec = importlib.util.spec_from_file_location(path.name, path)
if spec is None or spec.loader is None:
raise ModuleNotFoundError(path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
tests: Iterable[tuple[str, Callable[..., Any]]] = inspect.getmembers(
mod,
predicate=lambda member: inspect.isfunction(member)
and member.__name__.startswith("test_"),
)
for _, test in tests:
if not (marks := getattr(test, "pytestmark", None)):
yield (f"{path.relative_to(root).as_posix()}::{test.__name__}")
continue
for mark in cast(Iterable[pytest.Mark], marks):
if mark.name == "parametrize":
ids = mark.kwargs.get("ids")
argnames = mark.args[0]
argvalues = mark.args[1]
argnames, parametersets = ParameterSet._for_parametrize( # pyright: ignore[reportUnknownMemberType, reportPrivateUsage]
argnames,
argvalues,
None,
None, # pyright: ignore[reportArgumentType]
None, # pyright: ignore[reportArgumentType]
)
if ids is None:
idfn = None
ids_ = None
elif callable(ids):
idfn = ids
ids_ = None
else:
idfn = None
ids_ = pytest.Metafunc._validate_ids( # pyright: ignore[reportPrivateUsage]
None, # pyright: ignore[reportArgumentType]
ids,
parametersets,
test.__name__, # pyright: ignore[reportArgumentType]
)
id_maker = IdMaker(
argnames,
parametersets,
idfn,
ids_,
None,
nodeid=None,
func_name=test.__name__,
)
yield from (
f"{path.relative_to(root).as_posix()}::{test.__name__}[{id_}]"
for id_ in id_maker.make_unique_parameterset_ids()
)
def _close_socket(path: Path) -> None:
if path.exists():
os.unlink(path)
exit(0)
async def serve_socket():
if not SOCKET_ROOT_DIR.exists():
SOCKET_ROOT_DIR.mkdir()
python_socket_path = (
SOCKET_ROOT_DIR / hashlib.sha1(sys.executable.encode()).digest().hex()
)
if python_socket_path.exists():
print(python_socket_path)
return
atexit.register(lambda: _close_socket(python_socket_path))
signal.signal(signal.SIGTERM, lambda x, _: _close_socket(python_socket_path))
signal.signal(signal.SIGINT, lambda x, _: _close_socket(python_socket_path))
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
path = await reader.readline()
tests = "\n".join(
[test for test in get_tests([cast(bytes, path).decode().strip()])] # pyright: ignore[reportUnnecessaryCast]
)
writer.write(f"{tests}\n".encode())
writer.close()
server = await asyncio.start_unix_server(
handle_client, python_socket_path.absolute().as_posix()
)
async with server:
print(python_socket_path)
sys.stdout.close()
await server.serve_forever()
def main(
paths: Iterable[str],
collect_only: bool = True,
quiet: bool = True,
verbosity: int = 0,
socket_mode: bool = False,
no_fork: bool = True,
):
tests = list(get_tests(paths))
if tests:
print("\n".join(tests))
if not socket_mode:
return
asyncio.run(serve_socket())
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--collect-only", dest="collect_only", action="store_true")
parser.add_argument("-q", dest="quiet", action="store_true")
parser.add_argument("--verbosity", dest="verbosity", default=0)
parser.add_argument("-s", dest="socket_mode", action="store_true")
parser.add_argument("-f", dest="no_fork", action="store_true")
parser.add_argument(
"paths",
nargs="*",
)
args = parser.parse_args()
paths: list[str] = args.paths
main(
paths=paths,
quiet=args.quiet,
collect_only=args.collect_only,
verbosity=args.verbosity,
socket_mode=args.socket_mode,
no_fork=args.no_fork,
)

View File

@@ -1,10 +1,13 @@
from io import StringIO
import json
from pathlib import Path
from typing import Callable, Dict, List, Optional, Union
from . import params_getter
import pytest
from _pytest._code.code import ExceptionRepr
from _pytest.terminal import TerminalReporter
from _pytest.fixtures import FixtureLookupErrorRepr
from .base import NeotestAdapter, NeotestError, NeotestResult, NeotestResultStatus
@@ -100,8 +103,9 @@ class NeotestResultCollector:
outcome = yield
report = outcome.get_result()
if report.when != "call" and not (
report.outcome == "skipped" and report.when == "setup"
if not (
report.when == "call"
or (report.when == "setup" and report.outcome in ("skipped", "failed"))
):
return
@@ -136,6 +140,13 @@ class NeotestResultCollector:
errors.append(
{"message": msg_prefix + error_message, "line": error_line}
)
elif isinstance(exc_repr, FixtureLookupErrorRepr):
errors.append(
{
"message": msg_prefix + exc_repr.errorstring,
"line": exc_repr.firstlineno,
}
)
else:
# TODO: Figure out how these are returned and how to represent
raise Exception(
@@ -196,5 +207,21 @@ class NeotestDebugpyPlugin:
additional_info.is_tracing -= 1
class TestNameTemplateExtractor:
@staticmethod
def pytest_collection_modifyitems(config):
config = {"python_functions": config.getini("python_functions")[0]}
print(f"\n{json.dumps(config)}\n")
def extract_test_name_template(args):
# pytest.main(args=["-k", "neotest_none"], plugins=[TestNameTemplateExtractor])
pass
def collect(args):
pytest.main(["--collect-only", "-q"] + args)
params_getter.main(
[],
socket_mode=True,
no_fork=True,
)

View File

@@ -1,3 +1,9 @@
[project]
name = "neotest-python"
version = "0.0.1"
dependencies = [
"pytest>=8.4.1",
]
[tools.black]
line-length = 120
@@ -10,6 +16,5 @@ multi_line_output = 3
filterwarnings = [
"error",
"ignore::pytest.PytestCollectionWarning",
"ignore:::pynvim[.*]"
"ignore:::pynvim[.*]",
]

75
uv.lock generated Normal file
View File

@@ -0,0 +1,75 @@
version = 1
revision = 2
requires-python = ">=3.13"
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "iniconfig"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f2/97/ebf4da567aa6827c909642694d71c9fcf53e5b504f2d96afea02718862f3/iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7", size = 4793, upload-time = "2025-03-19T20:09:59.721Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050, upload-time = "2025-03-19T20:10:01.071Z" },
]
[[package]]
name = "neotest-python"
version = "0.0.1"
source = { virtual = "." }
dependencies = [
{ name = "pytest" },
]
[package.metadata]
requires-dist = [{ name = "pytest", specifier = ">=8.4.1" }]
[[package]]
name = "packaging"
version = "25.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a1/d4/1fc4078c65507b51b96ca8f8c3ba19e6a61c8253c72794544580a7b6c24d/packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f", size = 165727, upload-time = "2025-04-19T11:48:59.673Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pytest"
version = "8.4.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/08/ba/45911d754e8eba3d5a841a5ce61a65a685ff1798421ac054f85aa8747dfb/pytest-8.4.1.tar.gz", hash = "sha256:7c67fd69174877359ed9371ec3af8a3d2b04741818c51e5e99cc1742251fa93c", size = 1517714, upload-time = "2025-06-18T05:48:06.109Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/29/16/c8a903f4c4dffe7a12843191437d7cd8e32751d5de349d45d3fe69544e87/pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7", size = 365474, upload-time = "2025-06-18T05:48:03.955Z" },
]