Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
description: "Do not start new agent process when session is active, even if non-autostarted"
change-type: patch
destination-branches:
- master
- iso7
- iso6
20 changes: 20 additions & 0 deletions src/inmanta/server/agentmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,21 @@ async def expire_sessions_for_agents(self, env_id: uuid.UUID, endpoints: Set[str
)
await asyncio.gather(*(s.expire_and_abort(timeout=0) for s in sessions_to_expire))

async def are_agents_active(self, tid: uuid.UUID, endpoints: Iterable[str]) -> bool:
"""
Return true iff all the given agents are in the up or the paused state.
"""
return all(active for (_, active) in await self.get_agent_active_status(tid, endpoints))

async def get_agent_active_status(self, tid: uuid.UUID, endpoints: Iterable[str]) -> list[tuple[str, bool]]:
"""
Return a list of tuples where the first element of the tuple contains the name of an endpoint
and the second a boolean indicating where there is an active (up or paused) agent for that endpoint.
"""
all_sids_for_env = [sid for (sid, session) in self.sessions.items() if session.tid == tid]
all_active_endpoints_for_env = {ep for sid in all_sids_for_env for ep in self.endpoints_for_sid[sid]}
return [(ep, ep in all_active_endpoints_for_env) for ep in endpoints]

async def expire_all_sessions_for_environment(self, env_id: uuid.UUID) -> None:
async with self.session_lock:
await asyncio.gather(*[s.expire_and_abort(timeout=0) for s in self.sessions.values() if s.tid == env_id])
Expand Down Expand Up @@ -1101,6 +1116,11 @@ async def _ensure_agents(
if env.halted:
return False

if not restart and await self._agent_manager.are_agents_active(env.id, autostart_agents):
# do not start a new agent process if the agents are already active, regardless of whether their session
# is with an autostarted process or not.
return False

start_new_process: bool
if env.id not in self._agent_procs or self._agent_procs[env.id].returncode is not None:
# Start new process if none is currently running for this environment.
Expand Down
60 changes: 49 additions & 11 deletions tests/test_agent_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,9 +1207,42 @@ async def _dummy_fork_inmanta(
assert exception_message in str(excinfo.value)


async def test_dont_start_paused_agent(server, client, environment, caplog) -> None:
async def test_are_agents_active(server, client, environment, agent_factory) -> None:
"""
Ensure that the `AgentManager.are_agents_active()` method returns True when an agent
is in the up or the paused state.
"""
agentmanager = server.get_slice(SLICE_AGENT_MANAGER)
agent_name = "agent1"
env_id = UUID(environment)
env = await data.Environment.get_by_id(env_id)

# The agent is not started yet -> it should not be active
assert not await agentmanager.are_agents_active(tid=env_id, endpoints=[agent_name])

# Start agent
await agentmanager.ensure_agent_registered(env, agent_name)
await agent_factory(environment=environment, agent_map={agent_name: ""}, agent_names=[agent_name])

# Verify agent is active
await retry_limited(agentmanager.are_agents_active, tid=env_id, endpoints=[agent_name], timeout=10)

# Pause agent
result = await client.agent_action(tid=env_id, name=agent_name, action=AgentAction.pause.value)
assert result.code == 200, result.result

# Ensure the agent is still active
await retry_limited(agentmanager.are_agents_active, tid=env_id, endpoints=[agent_name], timeout=10)


@pytest.mark.parametrize("autostarted", (True, False))
async def test_dont_start_paused_agent(server, client, agent_factory, environment, caplog, autostarted: bool) -> None:
"""
Ensure that the AutostartedAgentManager doesn't try to start an agent that is paused (inmanta/inmanta-core#4398).

:param autostarted: Whether the paused agent was started as an autostarted agent. In typical scenarios this would
always be the case, but even when it's a manually started one (e.g. in module test fixtures), no new process should
be started.
"""
caplog.set_level(logging.DEBUG)
env_id = UUID(environment)
Expand All @@ -1221,33 +1254,38 @@ async def test_dont_start_paused_agent(server, client, environment, caplog) -> N
await agent_manager.ensure_agent_registered(env=env, nodename=agent_name)

# Add agent1 to AUTOSTART_AGENT_MAP
# Even when autostarted=False, it's still an autostarted agent, their just happens to be a non-autostarted session active
result = await client.set_setting(tid=environment, id=data.AUTOSTART_AGENT_MAP, value={"internal": "", agent_name: ""})
assert result.code == 200, result.result

# Start agent1
autostarted_agent_manager = server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER)

# Start agent1
env = await data.Environment.get_by_id(env_id)
assert (env_id, agent_name) not in agent_manager.tid_endpoint_to_session
caplog.clear()
await autostarted_agent_manager._ensure_agents(env=env, agents=[agent_name])
# Ensure we wait until a primary has been elected for agent1
assert (env_id, agent_name) in agent_manager.tid_endpoint_to_session
assert len(autostarted_agent_manager._agent_procs) == 1
assert "Started new agent with PID" in caplog.text
# Ensure no timeout happened
assert "took too long to start" not in caplog.text
if autostarted:
await autostarted_agent_manager._ensure_agents(env=env, agents=[agent_name])
# Ensure we wait until a primary has been elected for agent1
assert (env_id, agent_name) in agent_manager.tid_endpoint_to_session
assert len(autostarted_agent_manager._agent_procs) == 1
assert "Started new agent with PID" in caplog.text
# Ensure no timeout happened
assert "took too long to start" not in caplog.text
else:
await agent_factory(environment=env_id, code_loader=False, agent_map={agent_name: ""}, agent_names=[agent_name])

# Pause agent1
result = await client.agent_action(tid=env_id, name=agent_name, action=AgentAction.pause.value)
assert result.code == 200, result.result

# Pausing an agent should have no direct effect on the autostarted agent manager
assert len(autostarted_agent_manager._agent_procs) == 1
assert len(autostarted_agent_manager._agent_procs) == (1 if autostarted else 0)

# Execute _ensure_agents() again and verify that no restart is triggered
caplog.clear()
await autostarted_agent_manager._ensure_agents(env=env, agents=[agent_name])
assert len(autostarted_agent_manager._agent_procs) == 1
assert len(autostarted_agent_manager._agent_procs) == (1 if autostarted else 0)
assert "Started new agent with PID" not in caplog.text
# Ensure no timeout happened
assert "took too long to start" not in caplog.text
Expand Down