Source code for ewoksfluo.tests.conftest
import psutil
import pytest
from ewoksorange.tests.conftest import ewoks_orange_canvas # noqa F401
from ewoksorange.tests.conftest import qtapp # noqa F401
from ewoksorange.tests.conftest import raw_ewoks_orange_canvas # noqa F401
from .. import resource_utils
pytest.register_assert_rewrite("ewoksfluo.tests.utils.hdf5")
[docs]
@pytest.fixture(params=["normal", "slurm"])
def system_4cpus_8gb(request, monkeypatch) -> None:
# Force known CPU count of 4 and memory of 8 GB
class FakeProcess:
def cpu_affinity(self):
return [0, 1, 2, 3] # Simulate 4 CPUs
monkeypatch.setattr(psutil, "Process", lambda: FakeProcess())
if request.param == "normal":
monkeypatch.setattr(
resource_utils, "_get_available_memory", lambda: 8 * 1024**3
)
elif request.param == "slurm":
class SubprocessResponse:
def __init__(self, stdout):
self.stdout = stdout
self.returncode = 0
def mock_subprocess_run(cmd, **kwargs):
if "sacct" in cmd:
return SubprocessResponse("16G\n") # 16 GB requested
elif "sstat" in cmd:
return SubprocessResponse("8G\n") # 8 GB used
return SubprocessResponse("")
monkeypatch.setenv("SLURM_JOB_ID", "12345")
monkeypatch.setattr(resource_utils.subprocess, "run", mock_subprocess_run)
else:
raise ValueError(f"{request.param}")
# Verify that the patching works as expected
cpus = resource_utils.get_available_cpus(exclude_current=True)
assert cpus == 3
mem = resource_utils._get_available_memory()
assert mem == 8 * 1024**3