Compare commits

...

12 Commits

Author SHA1 Message Date
Itai Bohadana
72223525c4 fix(params_getter): try to bind even if socket exists 2025-09-03 17:58:45 +03:00
Itai Bohadana
ab80f41d3f fix(params_getter): try to bind even if socket exists 2025-09-02 13:38:23 +03:00
Itai Bohadana
9f6fbd6e04 feat(pytest): use socket instead of a shitton of processes 2025-08-28 15:51:42 +03:00
nicos68
ed9b4d794b feat(pytest): use python_functions config option (#100) 2025-06-23 22:28:18 +01:00
EdVanDance
34c9f6f3dc fix: python detection on windows (#92)
- virtualenvs on windows use `Scripts` instead of `bin`
  993ba1316a
- line endings of course
2025-05-28 16:18:54 +01:00
Maksym Bieńkowski
61e878f4b3 fix(django): make the test runner non-interactive by default (#95) 2025-05-28 16:17:40 +01:00
Illia Denysenko
ae21072712 feat: uv python detection (#98) (#98) 2025-05-28 16:16:29 +01:00
Neal Joslin
a2861ab3c9 fix(django): duplicate --failfast check (#84) 2024-10-02 20:26:48 +01:00
Liam Jarvis
72603dfdba fix: remove global print function call (#79)
This commit removes a call to an unknown print function call
2024-08-19 10:40:46 +01:00
Rónán Carrigan
e5bff6dcf3 fix(pytest): handle failed setup
Previously we would only handle skipped setups, we need to handle failed
setups in cases of fixture lookup errors.

See #37
2024-08-14 16:21:33 +01:00
Rónán Carrigan
0ab9ad3570 feat: multiple adapter instances
Allows calling the module multiple times with different configs for use
in different in projects.
2024-08-14 14:31:43 +01:00
johnybx
2e83d2bc00 fix: - use --verbosity=0 when collecting pytest parametrized tests (#60) 2024-01-15 08:03:46 +00:00
10 changed files with 760 additions and 301 deletions

View File

@@ -0,0 +1,143 @@
local nio = require("nio")
local lib = require("neotest.lib")
local pytest = require("neotest-python.pytest")
local base = require("neotest-python.base")
---@class neotest-python._AdapterConfig
---@field dap_args? table
---@field pytest_discovery? boolean
---@field is_test_file fun(file_path: string):boolean
---@field get_python_command fun(root: string):string[]
---@field get_args fun(runner: string, position: neotest.Position, strategy: string): string[]
---@field get_runner fun(python_command: string[]): string
---@param config neotest-python._AdapterConfig
---@return neotest.Adapter
return function(config)
---@param run_args neotest.RunArgs
---@param results_path string
---@param stream_path string
---@param runner string
---@return string[]
local function build_script_args(run_args, results_path, stream_path, runner)
local script_args = {
"--results-file",
results_path,
"--stream-file",
stream_path,
"--runner",
runner,
}
if config.pytest_discovery then
table.insert(script_args, "--emit-parameterized-ids")
end
local position = run_args.tree:data()
table.insert(script_args, "--")
vim.list_extend(script_args, config.get_args(runner, position, run_args.strategy))
if run_args.extra_args then
vim.list_extend(script_args, run_args.extra_args)
end
if position then
table.insert(script_args, position.id)
end
return script_args
end
---@type neotest.Adapter
return {
name = "neotest-python",
root = base.get_root,
filter_dir = function(name)
return name ~= "venv"
end,
is_test_file = config.is_test_file,
discover_positions = function(path)
local root = base.get_root(path) or vim.loop.cwd() or ""
local python_command = config.get_python_command(root)
local runner = config.get_runner(python_command)
local positions = lib.treesitter.parse_positions(
path,
base.treesitter_queries(runner, config, python_command),
{
require_namespaces = runner == "unittest",
}
)
if runner == "pytest" and config.pytest_discovery then
pytest.augment_positions(python_command, base.get_script_path(), path, positions, root)
end
return positions
end,
---@param args neotest.RunArgs
---@return neotest.RunSpec
build_spec = function(args)
local position = args.tree:data()
local root = base.get_root(position.path) or vim.loop.cwd() or ""
local python_command = config.get_python_command(root)
local runner = config.get_runner(python_command)
local results_path = nio.fn.tempname()
local stream_path = nio.fn.tempname()
lib.files.write(stream_path, "")
local stream_data, stop_stream = lib.files.stream_lines(stream_path)
local script_args = build_script_args(args, results_path, stream_path, runner)
local script_path = base.get_script_path()
local strategy_config
if args.strategy == "dap" then
strategy_config =
base.create_dap_config(python_command, script_path, script_args, config.dap_args)
end
---@type neotest.RunSpec
return {
command = vim.iter({ python_command, script_path, script_args }):flatten():totable(),
context = {
results_path = results_path,
stop_stream = stop_stream,
},
stream = function()
return function()
local lines = stream_data()
local results = {}
for _, line in ipairs(lines) do
local result = vim.json.decode(line, { luanil = { object = true } })
results[result.id] = result.result
end
return results
end
end,
strategy = strategy_config,
}
end,
---@param spec neotest.RunSpec
---@param result neotest.StrategyResult
---@return neotest.Result[]
results = function(spec, result)
spec.context.stop_stream()
local success, data = pcall(lib.files.read, spec.context.results_path)
if not success then
data = "{}"
end
local results = vim.json.decode(data, { luanil = { object = true } })
for _, pos_result in pairs(results) do
result.output_path = pos_result.output_path
end
return results
end,
}
end

View File

@@ -1,4 +1,4 @@
local async = require("neotest.async") local nio = require("nio")
local lib = require("neotest.lib") local lib = require("neotest.lib")
local Path = require("plenary.path") local Path = require("plenary.path")
@@ -14,30 +14,35 @@ function M.is_test_file(file_path)
end end
M.module_exists = function(module, python_command) M.module_exists = function(module, python_command)
return lib.process.run(vim.tbl_flatten({ return lib.process.run(vim
.iter({
python_command, python_command,
"-c", "-c",
"import " .. module, "import " .. module,
})) == 0 })
:flatten()
:totable()) == 0
end end
local python_command_mem = {} local python_command_mem = {}
local venv_bin = vim.loop.os_uname().sysname:match("Windows") and "Scripts" or "bin"
---@return string[] ---@return string[]
function M.get_python_command(root) function M.get_python_command(root)
root = root or vim.loop.cwd()
if python_command_mem[root] then if python_command_mem[root] then
return python_command_mem[root] return python_command_mem[root]
end end
-- Use activated virtualenv. -- Use activated virtualenv.
if vim.env.VIRTUAL_ENV then if vim.env.VIRTUAL_ENV then
python_command_mem[root] = { Path:new(vim.env.VIRTUAL_ENV, "bin", "python").filename } python_command_mem[root] = { Path:new(vim.env.VIRTUAL_ENV, venv_bin, "python").filename }
return python_command_mem[root] return python_command_mem[root]
end end
for _, pattern in ipairs({ "*", ".*" }) do for _, pattern in ipairs({ "*", ".*" }) do
local match = async.fn.glob(Path:new(root or async.fn.getcwd(), pattern, "pyvenv.cfg").filename) local match = nio.fn.glob(Path:new(root or nio.fn.getcwd(), pattern, "pyvenv.cfg").filename)
if match ~= "" then if match ~= "" then
python_command_mem[root] = { (Path:new(match):parent() / "bin" / "python").filename } python_command_mem[root] = { (Path:new(match):parent() / venv_bin / "python").filename }
return python_command_mem[root] return python_command_mem[root]
end end
end end
@@ -45,7 +50,7 @@ function M.get_python_command(root)
if lib.files.exists("Pipfile") then if lib.files.exists("Pipfile") then
local success, exit_code, data = pcall(lib.process.run, { "pipenv", "--py" }, { stdout = true }) local success, exit_code, data = pcall(lib.process.run, { "pipenv", "--py" }, { stdout = true })
if success and exit_code == 0 then if success and exit_code == 0 then
local venv = data.stdout:gsub("\n", "") local venv = data.stdout:gsub("\r?\n", "")
if venv then if venv then
python_command_mem[root] = { Path:new(venv).filename } python_command_mem[root] = { Path:new(venv).filename }
return python_command_mem[root] return python_command_mem[root]
@@ -60,19 +65,131 @@ function M.get_python_command(root)
{ stdout = true } { stdout = true }
) )
if success and exit_code == 0 then if success and exit_code == 0 then
local venv = data.stdout:gsub("\n", "") local venv = data.stdout:gsub("\r?\n", "")
if venv then if venv then
python_command_mem[root] = { Path:new(venv, "bin", "python").filename } python_command_mem[root] = { Path:new(venv, venv_bin, "python").filename }
return python_command_mem[root] return python_command_mem[root]
end end
end end
end end
if lib.files.exists("uv.lock") then
local success, exit_code, data = pcall(
lib.process.run,
{ "uv", "run", "python", "-c", "import sys; print(sys.executable)" },
{ stdout = true }
)
if success and exit_code == 0 then
python_command_mem[root] = { Path:new(data).filename }
return python_command_mem[root]
end
end
-- Fallback to system Python. -- Fallback to system Python.
python_command_mem[root] = { python_command_mem[root] = {
async.fn.exepath("python3") or async.fn.exepath("python") or "python", nio.fn.exepath("python3") or nio.fn.exepath("python") or "python",
} }
return python_command_mem[root] return python_command_mem[root]
end end
---@return string
function M.get_script_path()
local paths = vim.api.nvim_get_runtime_file("neotest.py", true)
for _, path in ipairs(paths) do
if vim.endswith(path, ("python%sneotest.py"):format(lib.files.sep)) then
return path
end
end
error("neotest.py not found")
end
---@param python_command string[]
---@param config neotest-python._AdapterConfig
---@param runner string
---@return string
local function scan_test_function_pattern(runner, config, python_command)
local test_function_pattern = "^test"
return test_function_pattern
end
---@param python_command string[]
---@param config neotest-python._AdapterConfig
---@param runner string
---@return string
M.treesitter_queries = function(runner, config, python_command)
local test_function_pattern = scan_test_function_pattern(runner, config, python_command)
return string.format(
[[
;; Match undecorated functions
((function_definition
name: (identifier) @test.name)
(#match? @test.name "%s"))
@test.definition
;; Match decorated function, including decorators in definition
(decorated_definition
((function_definition
name: (identifier) @test.name)
(#match? @test.name "%s")))
@test.definition
;; Match decorated classes, including decorators in definition
(decorated_definition
(class_definition
name: (identifier) @namespace.name))
@namespace.definition
;; Match undecorated classes: namespaces nest so #not-has-parent is used
;; to ensure each namespace is annotated only once
(
(class_definition
name: (identifier) @namespace.name)
@namespace.definition
(#not-has-parent? @namespace.definition decorated_definition)
)
]],
test_function_pattern,
test_function_pattern
)
end
M.get_root =
lib.files.match_root_pattern("pyproject.toml", "setup.cfg", "mypy.ini", "pytest.ini", "setup.py")
function M.create_dap_config(python_path, script_path, script_args, dap_args)
return vim.tbl_extend("keep", {
type = "python",
name = "Neotest Debugger",
request = "launch",
python = python_path,
program = script_path,
cwd = nio.fn.getcwd(),
args = script_args,
}, dap_args or {})
end
local stored_runners = {}
function M.get_runner(python_path)
local command_str = table.concat(python_path, " ")
if stored_runners[command_str] then
return stored_runners[command_str]
end
local vim_test_runner = vim.g["test#python#runner"]
if vim_test_runner == "pyunit" then
return "unittest"
end
if
vim_test_runner and lib.func_util.index({ "unittest", "pytest", "django" }, vim_test_runner)
then
return vim_test_runner
end
local runner = M.module_exists("pytest_", python_path) and "pytest"
or M.module_exists("django", python_path) and "django"
or "unittest"
stored_runners[command_str] = runner
return runner
end
return M return M

View File

@@ -1,230 +1,27 @@
local async = require("neotest.async")
local lib = require("neotest.lib")
local base = require("neotest-python.base") local base = require("neotest-python.base")
local pytest = require("neotest-python.pytest") local create_adapter = require("neotest-python.adapter")
local function get_script() ---@class neotest-python.AdapterConfig
local paths = vim.api.nvim_get_runtime_file("neotest.py", true) ---@field dap? table
for _, path in ipairs(paths) do ---@field pytest_discover_instances? boolean
if vim.endswith(path, ("neotest-python%sneotest.py"):format(lib.files.sep)) then ---@field is_test_file? fun(file_path: string):boolean
return path ---@field python? string|string[]|fun(root: string):string[]
end ---@field args? string[]|fun(runner: string, position: neotest.Position, strategy: string): string[]
end ---@field runner? string|fun(python_command: string[]): string
error("neotest.py not found")
end
local dap_args
local is_test_file = base.is_test_file
local pytest_discover_instances = false
local function get_strategy_config(strategy, python, program, args)
local config = {
dap = function()
return vim.tbl_extend("keep", {
type = "python",
name = "Neotest Debugger",
request = "launch",
python = python,
program = program,
cwd = async.fn.getcwd(),
args = args,
}, dap_args or {})
end,
}
if config[strategy] then
return config[strategy]()
end
end
local get_python = function(root)
if not root then
root = vim.loop.cwd()
end
return base.get_python_command(root)
end
local get_args = function()
return {}
end
local stored_runners = {}
local get_runner = function(python_command)
local command_str = table.concat(python_command, " ")
if stored_runners[command_str] then
return stored_runners[command_str]
end
local vim_test_runner = vim.g["test#python#runner"]
if vim_test_runner == "pyunit" then
return "unittest"
end
if
vim_test_runner and lib.func_util.index({ "unittest", "pytest", "django" }, vim_test_runner)
then
return vim_test_runner
end
local runner = base.module_exists("pytest", python_command) and "pytest"
or base.module_exists("django", python_command) and "django"
or "unittest"
stored_runners[command_str] = runner
return runner
end
---@type neotest.Adapter
local PythonNeotestAdapter = { name = "neotest-python" }
PythonNeotestAdapter.root =
lib.files.match_root_pattern("pyproject.toml", "setup.cfg", "mypy.ini", "pytest.ini", "setup.py")
function PythonNeotestAdapter.is_test_file(file_path)
return is_test_file(file_path)
end
function PythonNeotestAdapter.filter_dir(name)
return name ~= "venv"
end
---@async
---@return neotest.Tree | nil
function PythonNeotestAdapter.discover_positions(path)
local root = PythonNeotestAdapter.root(path) or vim.loop.cwd()
local python = get_python(root)
local runner = get_runner(python)
-- Parse the file while pytest is running
local query = [[
;; Match undecorated functions
((function_definition
name: (identifier) @test.name)
(#match? @test.name "^test"))
@test.definition
;; Match decorated function, including decorators in definition
(decorated_definition
((function_definition
name: (identifier) @test.name)
(#match? @test.name "^test")))
@test.definition
;; Match decorated classes, including decorators in definition
(decorated_definition
(class_definition
name: (identifier) @namespace.name))
@namespace.definition
;; Match undecorated classes: namespaces nest so #not-has-parent is used
;; to ensure each namespace is annotated only once
(
(class_definition
name: (identifier) @namespace.name)
@namespace.definition
(#not-has-parent? @namespace.definition decorated_definition)
)
]]
local positions = lib.treesitter.parse_positions(path, query, {
require_namespaces = runner == "unittest",
})
if runner == "pytest" and pytest_discover_instances then
pytest.augment_positions(python, get_script(), path, positions, root)
end
return positions
end
---@async
---@param args neotest.RunArgs
---@return neotest.RunSpec
function PythonNeotestAdapter.build_spec(args)
local position = args.tree:data()
local results_path = async.fn.tempname()
local stream_path = async.fn.tempname()
lib.files.write(stream_path, "")
local root = PythonNeotestAdapter.root(position.path)
local python = get_python(root)
local runner = get_runner(python)
local stream_data, stop_stream = lib.files.stream_lines(stream_path)
local script_args = vim.tbl_flatten({
"--results-file",
results_path,
"--stream-file",
stream_path,
"--runner",
runner,
})
if pytest_discover_instances then
table.insert(script_args, "--emit-parameterized-ids")
end
table.insert(script_args, "--")
vim.list_extend(script_args, get_args(runner, position, args.strategy))
if args.extra_args then
vim.list_extend(script_args, args.extra_args)
end
if position then
table.insert(script_args, position.id)
end
local python_script = get_script()
local command = vim.tbl_flatten({
python,
python_script,
script_args,
})
local strategy_config = get_strategy_config(args.strategy, python, python_script, script_args)
---@type neotest.RunSpec
return {
command = command,
context = {
results_path = results_path,
stop_stream = stop_stream,
},
stream = function()
return function()
local lines = stream_data()
local results = {}
for _, line in ipairs(lines) do
local result = vim.json.decode(line, { luanil = { object = true } })
results[result.id] = result.result
end
return results
end
end,
strategy = strategy_config,
}
end
---@async
---@param spec neotest.RunSpec
---@param result neotest.StrategyResult
---@return neotest.Result[]
function PythonNeotestAdapter.results(spec, result)
spec.context.stop_stream()
local success, data = pcall(lib.files.read, spec.context.results_path)
if not success then
data = "{}"
end
-- TODO: Find out if this JSON option is supported in future
local results = vim.json.decode(data, { luanil = { object = true } })
for _, pos_result in pairs(results) do
result.output_path = pos_result.output_path
end
return results
end
local is_callable = function(obj) local is_callable = function(obj)
return type(obj) == "function" or (type(obj) == "table" and obj.__call) return type(obj) == "function" or (type(obj) == "table" and obj.__call)
end end
setmetatable(PythonNeotestAdapter, { ---@param config neotest-python.AdapterConfig
__call = function(_, opts) local augment_config = function(config)
is_test_file = opts.is_test_file or is_test_file local get_python_command = base.get_python_command
if opts.python then if config.python then
get_python = function(root) get_python_command = function(root)
local python = opts.python local python = config.python
if is_callable(opts.python) then if is_callable(config.python) then
python = opts.python(root) python = config.python(root)
end end
if type(python) == "string" then if type(python) == "string" then
@@ -237,27 +34,44 @@ setmetatable(PythonNeotestAdapter, {
return base.get_python(root) return base.get_python(root)
end end
end end
if is_callable(opts.args) then
get_args = opts.args local get_args = function()
elseif opts.args then return {}
end
if is_callable(config.args) then
get_args = config.args
elseif config.args then
get_args = function() get_args = function()
return opts.args return config.args
end end
end end
if is_callable(opts.runner) then
get_runner = opts.runner local get_runner = base.get_runner
elseif opts.runner then if is_callable(config.runner) then
get_runner = config.runner
elseif config.runner then
get_runner = function() get_runner = function()
return opts.runner return config.runner
end end
end end
if type(opts.dap) == "table" then
dap_args = opts.dap ---@type neotest-python._AdapterConfig
end return {
if opts.pytest_discover_instances ~= nil then pytest_discovery = config.pytest_discover_instances,
pytest_discover_instances = opts.pytest_discover_instances dap_args = config.dap,
end get_runner = get_runner,
return PythonNeotestAdapter get_args = get_args,
is_test_file = config.is_test_file or base.is_test_file,
get_python_command = get_python_command,
}
end
local PythonNeotestAdapter = create_adapter(augment_config({}))
setmetatable(PythonNeotestAdapter, {
__call = function(_, config)
return create_adapter(augment_config(config))
end, end,
}) })

View File

@@ -1,4 +1,5 @@
local lib = require("neotest.lib") local lib = require("neotest.lib")
local nio = require("nio")
local logger = require("neotest.logging") local logger = require("neotest.logging")
local M = {} local M = {}
@@ -15,7 +16,7 @@ local function add_test_instances(positions, test_params)
for _, params_str in ipairs(pos_params) do for _, params_str in ipairs(pos_params) do
local new_data = vim.tbl_extend("force", position, { local new_data = vim.tbl_extend("force", position, {
id = string.format("%s[%s]", position.id, params_str), id = string.format("%s[%s]", position.id, params_str),
name = string.format("%s[%s]", position.name, params_str), name = string.format("%s", params_str),
}) })
new_data.range = nil new_data.range = nil
@@ -26,6 +27,74 @@ local function add_test_instances(positions, test_params)
end end
end end
local socket_path = ""
local socket_future = nio.control.future()
local socket_start = nio.control.future()
--- Run a command, connect to the UNIX socket it prints, send messages, return response
-- @param cmd string: Command to run (should output socket path)
-- @param messages table|string: One or more messages to send
-- @param callback uv.read_start.callback: One or more messages to send
-- @return string: Concatenated response from server
local function get_socket_path(cmd, messages, callback)
-- 1. Run the command and capture its output (socket path)
if socket_path == "" and not socket_start.is_set() then
local stdout = assert(vim.uv.new_pipe(true))
local stderr = assert(vim.uv.new_pipe(true))
local stdin = assert(vim.uv.new_pipe(true))
socket_start.set()
local handle
handle, _ = vim.uv.spawn(cmd[1], {
stdio = { stdin, stdout, stderr },
detached = true,
args = #cmd > 1 and vim.list_slice(cmd, 2, #cmd) or { cmd[1] },
}, function(code, signal)
vim.uv.close(stdout)
vim.uv.close(stderr)
vim.uv.close(stdin)
vim.uv.close(handle)
end)
vim.uv.read_start(stdout, function(err, s)
if s ~= nil then
socket_path = string.gsub(s, "\n$", "")
socket_future.set()
end
end)
vim.uv.read_start(stderr, function(err, s)
if err ~= nil then
vim.print(err, s)
end
end)
end
socket_future.wait()
-- 2. Connect to the unix socket
local client = assert(vim.uv.new_pipe(false))
client:connect(socket_path, function(err)
if err ~= nil then
vim.print("Error", err)
end
-- 3. Send message(s)
if type(messages) == "string" then
messages = { messages }
end
for _, msg in ipairs(messages) do
vim.uv.write(client, msg .. "\n", function(err_write)
client:read_start(function(err_read, data)
if err_read ~= nil then
vim.print(err_read, data)
else
callback(err_read, data)
end
client:close()
end)
end)
end
end)
end
---@async ---@async
---@param path string ---@param path string
---@return boolean ---@return boolean
@@ -57,16 +126,16 @@ local function discover_params(python, script, path, positions, root)
logger.debug("Running test instance discovery:", cmd) logger.debug("Running test instance discovery:", cmd)
local test_params = {} local test_params = {}
local res, data = lib.process.run(cmd, { stdout = true, stderr = true }) get_socket_path(cmd, path, function(err, data)
if res ~= 0 then if err ~= nil then
logger.warn("Pytest discovery failed") vim.print(err, data)
if data.stderr then return
logger.debug(data.stderr)
end end
return {} if data == nil then
return
end end
for line in vim.gsplit(data.stdout, "\n", true) do for line in vim.gsplit(data, "\n", { trimempty = true, plain = true }) do
local param_index = string.find(line, "[", nil, true) local param_index = string.find(line, "[", nil, true)
if param_index then if param_index then
local test_id = root .. lib.files.path.sep .. string.sub(line, 1, param_index - 1) local test_id = root .. lib.files.path.sep .. string.sub(line, 1, param_index - 1)
@@ -81,7 +150,8 @@ local function discover_params(python, script, path, positions, root)
end end
end end
end end
return test_params return add_test_instances(positions, test_params)
end)
end end
---@async ---@async
@@ -93,8 +163,7 @@ end
---@param root string ---@param root string
function M.augment_positions(python, script, path, positions, root) function M.augment_positions(python, script, path, positions, root)
if has_parametrize(path) then if has_parametrize(path) then
local test_params = discover_params(python, script, path, positions, root) discover_params(python, script, path, positions, root)
add_test_instances(positions, test_params)
end end
end end

View File

@@ -14,7 +14,7 @@ class TestRunner(str, Enum):
def get_adapter(runner: TestRunner, emit_parameterized_ids: bool) -> NeotestAdapter: def get_adapter(runner: TestRunner, emit_parameterized_ids: bool) -> NeotestAdapter:
if runner == TestRunner.PYTEST: if runner == TestRunner.PYTEST:
from .pytest import PytestNeotestAdapter from .pytest_ import PytestNeotestAdapter
return PytestNeotestAdapter(emit_parameterized_ids) return PytestNeotestAdapter(emit_parameterized_ids)
elif runner == TestRunner.UNITTEST: elif runner == TestRunner.UNITTEST:
@@ -53,11 +53,18 @@ parser.add_argument("args", nargs="*")
def main(argv: List[str]): def main(argv: List[str]):
if "--pytest-collect" in argv: if "--pytest-collect" in argv:
argv.remove("--pytest-collect") argv.remove("--pytest-collect")
from .pytest import collect from .pytest_ import collect
collect(argv) collect(argv)
return return
if "--pytest-extract-test-name-template" in argv:
argv.remove("--pytest-extract-test-name-template")
from .pytest_ import extract_test_name_template
extract_test_name_template(argv)
return
args = parser.parse_args(argv) args = parser.parse_args(argv)
adapter = get_adapter(TestRunner(args.runner), args.emit_parameterized_ids) adapter = get_adapter(TestRunner(args.runner), args.emit_parameterized_ids)

View File

@@ -64,12 +64,14 @@ class DjangoNeotestAdapter(CaseUtilsMixin, NeotestAdapter):
class DjangoUnittestRunner(CaseUtilsMixin, DiscoverRunner): class DjangoUnittestRunner(CaseUtilsMixin, DiscoverRunner):
def __init__(self, **kwargs): def __init__(self, **kwargs):
django_setup() django_setup()
kwargs["interactive"] = False
DiscoverRunner.__init__(self, **kwargs) DiscoverRunner.__init__(self, **kwargs)
@classmethod @classmethod
def add_arguments(cls, parser): def add_arguments(cls, parser):
DiscoverRunner.add_arguments(parser) DiscoverRunner.add_arguments(parser)
parser.add_argument("--verbosity", nargs="?", default=2) parser.add_argument("--verbosity", nargs="?", default=2)
if "failfast" not in parser.parse_args([]):
parser.add_argument( parser.add_argument(
"--failfast", "--failfast",
action="store_true", action="store_true",

View File

@@ -0,0 +1,200 @@
import asyncio
import atexit
from collections import deque
from collections.abc import Iterable
import hashlib
import os
import select
import signal
import socket
import sys
from _pytest.mark.structures import ParameterSet
from _pytest.python import IdMaker
import pytest
import importlib
import importlib.util
import argparse
import inspect
from pathlib import Path
from typing import Any, Callable, Generator, Self, cast
SOCKET_ROOT_DIR = Path("/tmp/neotest-python")
class LineReceiver:
def __init__(self, s: socket.socket) -> None:
self._s = s
self._s.setblocking(False)
self._tmp_buffer = ""
self._tmp_lines: deque[str] = deque()
def __iter__(self) -> Self:
return self
def __next__(self) -> str:
if self._tmp_lines:
return self._tmp_lines.popleft()
ready, _, _ = select.select([self._s], [], [], 0.1)
if ready:
msg = self._s.recv(256).decode()
else:
msg = ""
lines = msg.splitlines()
if not lines:
raise StopIteration
lines[0] = self._tmp_buffer + lines[0]
if not msg.endswith("\n"):
self._tmp_buffer = lines[-1]
else:
self._tmp_buffer = ""
self._tmp_lines.extend(lines)
return self._tmp_lines.popleft()
def get_tests(paths: Iterable[str]) -> Generator[str, None, None]:
root = Path(os.curdir).absolute()
if Path(os.curdir).absolute().as_posix() not in sys.path:
sys.path.insert(0, Path(os.curdir).absolute().as_posix())
for path in (Path(path) for path in paths):
spec = importlib.util.spec_from_file_location(path.name, path)
if spec is None or spec.loader is None:
raise ModuleNotFoundError(path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
tests: Iterable[tuple[str, Callable[..., Any]]] = inspect.getmembers(
mod,
predicate=lambda member: inspect.isfunction(member)
and member.__name__.startswith("test_"),
)
for _, test in tests:
if not (marks := getattr(test, "pytestmark", None)):
yield (f"{path.relative_to(root).as_posix()}::{test.__name__}")
continue
for mark in cast(Iterable[pytest.Mark], marks):
if mark.name == "parametrize":
ids = mark.kwargs.get("ids")
argnames = mark.args[0]
argvalues = mark.args[1]
argnames, parametersets = ParameterSet._for_parametrize( # pyright: ignore[reportUnknownMemberType, reportPrivateUsage]
argnames,
argvalues,
None,
None, # pyright: ignore[reportArgumentType]
None, # pyright: ignore[reportArgumentType]
)
if ids is None:
idfn = None
ids_ = None
elif callable(ids):
idfn = ids
ids_ = None
else:
idfn = None
ids_ = pytest.Metafunc._validate_ids( # pyright: ignore[reportPrivateUsage]
None, # pyright: ignore[reportArgumentType]
ids,
parametersets,
test.__name__, # pyright: ignore[reportArgumentType]
)
id_maker = IdMaker(
argnames,
parametersets,
idfn,
ids_,
None,
nodeid=None,
func_name=test.__name__,
)
yield from (
f"{path.relative_to(root).as_posix()}::{test.__name__}[{id_}]"
for id_ in id_maker.make_unique_parameterset_ids()
)
def _close_socket(path: Path) -> None:
if path.exists():
os.unlink(path)
exit(0)
async def serve_socket():
if not SOCKET_ROOT_DIR.exists():
SOCKET_ROOT_DIR.mkdir()
python_socket_path = (
SOCKET_ROOT_DIR / hashlib.sha1(sys.executable.encode()).digest().hex()
)
if python_socket_path.exists():
print(python_socket_path)
return
atexit.register(lambda: _close_socket(python_socket_path))
signal.signal(signal.SIGTERM, lambda x, _: _close_socket(python_socket_path))
signal.signal(signal.SIGINT, lambda x, _: _close_socket(python_socket_path))
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
path = await reader.readline()
tests = "\n".join(
[test for test in get_tests([cast(bytes, path).decode().strip()])] # pyright: ignore[reportUnnecessaryCast]
)
writer.write(f"{tests}\n".encode())
writer.close()
server = await asyncio.start_unix_server(
handle_client, python_socket_path.absolute().as_posix()
)
async with server:
print(python_socket_path)
sys.stdout.close()
await server.serve_forever()
def main(
paths: Iterable[str],
collect_only: bool = True,
quiet: bool = True,
verbosity: int = 0,
socket_mode: bool = False,
no_fork: bool = True,
):
tests = list(get_tests(paths))
if tests:
print("\n".join(tests))
if not socket_mode:
return
asyncio.run(serve_socket())
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--collect-only", dest="collect_only", action="store_true")
parser.add_argument("-q", dest="quiet", action="store_true")
parser.add_argument("--verbosity", dest="verbosity", default=0)
parser.add_argument("-s", dest="socket_mode", action="store_true")
parser.add_argument("-f", dest="no_fork", action="store_true")
parser.add_argument(
"paths",
nargs="*",
)
args = parser.parse_args()
paths: list[str] = args.paths
main(
paths=paths,
quiet=args.quiet,
collect_only=args.collect_only,
verbosity=args.verbosity,
socket_mode=args.socket_mode,
no_fork=args.no_fork,
)

View File

@@ -1,10 +1,13 @@
from io import StringIO from io import StringIO
import json
from pathlib import Path from pathlib import Path
from typing import Callable, Dict, List, Optional, Union from typing import Callable, Dict, List, Optional, Union
from . import params_getter
import pytest import pytest
from _pytest._code.code import ExceptionRepr from _pytest._code.code import ExceptionRepr
from _pytest.terminal import TerminalReporter from _pytest.terminal import TerminalReporter
from _pytest.fixtures import FixtureLookupErrorRepr
from .base import NeotestAdapter, NeotestError, NeotestResult, NeotestResultStatus from .base import NeotestAdapter, NeotestError, NeotestResult, NeotestResultStatus
@@ -100,8 +103,9 @@ class NeotestResultCollector:
outcome = yield outcome = yield
report = outcome.get_result() report = outcome.get_result()
if report.when != "call" and not ( if not (
report.outcome == "skipped" and report.when == "setup" report.when == "call"
or (report.when == "setup" and report.outcome in ("skipped", "failed"))
): ):
return return
@@ -136,6 +140,13 @@ class NeotestResultCollector:
errors.append( errors.append(
{"message": msg_prefix + error_message, "line": error_line} {"message": msg_prefix + error_message, "line": error_line}
) )
elif isinstance(exc_repr, FixtureLookupErrorRepr):
errors.append(
{
"message": msg_prefix + exc_repr.errorstring,
"line": exc_repr.firstlineno,
}
)
else: else:
# TODO: Figure out how these are returned and how to represent # TODO: Figure out how these are returned and how to represent
raise Exception( raise Exception(
@@ -196,5 +207,21 @@ class NeotestDebugpyPlugin:
additional_info.is_tracing -= 1 additional_info.is_tracing -= 1
class TestNameTemplateExtractor:
@staticmethod
def pytest_collection_modifyitems(config):
config = {"python_functions": config.getini("python_functions")[0]}
print(f"\n{json.dumps(config)}\n")
def extract_test_name_template(args):
# pytest.main(args=["-k", "neotest_none"], plugins=[TestNameTemplateExtractor])
pass
def collect(args): def collect(args):
pytest.main(["--collect-only", "-q"] + args) params_getter.main(
[],
socket_mode=True,
no_fork=True,
)

View File

@@ -1,6 +1,12 @@
[project]
name = "neotest-python"
version = "0.0.1"
dependencies = [
"pytest>=8.4.1",
]
[tools.black] [tools.black]
line-length=120 line-length = 120
[tool.isort] [tool.isort]
profile = "black" profile = "black"
@@ -10,6 +16,5 @@ multi_line_output = 3
filterwarnings = [ filterwarnings = [
"error", "error",
"ignore::pytest.PytestCollectionWarning", "ignore::pytest.PytestCollectionWarning",
"ignore:::pynvim[.*]" "ignore:::pynvim[.*]",
] ]

75
uv.lock generated Normal file
View File

@@ -0,0 +1,75 @@
version = 1
revision = 2
requires-python = ">=3.13"
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "iniconfig"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f2/97/ebf4da567aa6827c909642694d71c9fcf53e5b504f2d96afea02718862f3/iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7", size = 4793, upload-time = "2025-03-19T20:09:59.721Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050, upload-time = "2025-03-19T20:10:01.071Z" },
]
[[package]]
name = "neotest-python"
version = "0.0.1"
source = { virtual = "." }
dependencies = [
{ name = "pytest" },
]
[package.metadata]
requires-dist = [{ name = "pytest", specifier = ">=8.4.1" }]
[[package]]
name = "packaging"
version = "25.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a1/d4/1fc4078c65507b51b96ca8f8c3ba19e6a61c8253c72794544580a7b6c24d/packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f", size = 165727, upload-time = "2025-04-19T11:48:59.673Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pytest"
version = "8.4.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/08/ba/45911d754e8eba3d5a841a5ce61a65a685ff1798421ac054f85aa8747dfb/pytest-8.4.1.tar.gz", hash = "sha256:7c67fd69174877359ed9371ec3af8a3d2b04741818c51e5e99cc1742251fa93c", size = 1517714, upload-time = "2025-06-18T05:48:06.109Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/29/16/c8a903f4c4dffe7a12843191437d7cd8e32751d5de349d45d3fe69544e87/pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7", size = 365474, upload-time = "2025-06-18T05:48:03.955Z" },
]