Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions packages/gg_api_core/src/gg_api_core/mcp_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Simplified GitGuardian MCP Server with scope-based tool filtering."""

import asyncio
import logging
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator, Sequence
Expand Down Expand Up @@ -67,10 +68,21 @@ async def token_scope_lifespan(fastmcp) -> AsyncIterator[dict]:
async with original_lifespan(fastmcp) as original_context:
context_result = original_context

# Cache scopes at startup (single token throughout lifespan)
# Cache scopes at startup (single token throughout lifespan).
# Use a timeout to avoid blocking MCP server startup indefinitely
# (e.g., when an expired token triggers an interactive OAuth re-auth flow).
try:
self._token_scopes = await self._fetch_token_scopes_from_api() # type: ignore[attr-defined]
self._token_scopes = await asyncio.wait_for(
self._fetch_token_scopes_from_api(), # type: ignore[attr-defined]
timeout=30,
)
logger.debug(f"Retrieved token scopes: {self._token_scopes}")
except asyncio.TimeoutError:
logger.warning(
"Timed out fetching token scopes during startup (30s). "
"This may happen when an expired token triggers re-authentication. "
"Tools requiring scopes will not be available until scopes are fetched."
)
except Exception as e:
logger.warning(f"Failed to fetch token scopes during startup: {str(e)}")
logger.warning("Some tools may not be available if scope detection fails")
Expand Down
63 changes: 37 additions & 26 deletions packages/gg_api_core/src/gg_api_core/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,35 @@ def save_token(self, instance_url, token_data):
logger.warning(f"Failed to save token to {self.token_file}: {e}")

def get_token(self, instance_url: str) -> str | None:
"""Get a token for a specific instance URL if it exists and is not expired."""
"""Get a token for a specific instance URL if it exists.

Returns the token even if expired — the API is the source of truth
for token validity. This avoids triggering a blocking interactive
OAuth flow during MCP server startup when a token has just expired
but might still be accepted by the API (grace period) or can be
refreshed automatically via the 401 retry handler.
"""
tokens = self.load_tokens()
token_data = tokens.get(instance_url)

if not token_data:
return None

# Check if token is expired
# Log expiry status for diagnostics, but still return the token
expires_at = token_data.get("expires_at")
if expires_at:
# Parse ISO format date
try:
expiry_date = datetime.datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
now = datetime.datetime.now(datetime.timezone.utc)
if now >= expiry_date:
logger.info(f"Token for {instance_url} has expired")
return None
logger.warning(
f"Stored token for {instance_url} has expired "
f"(expired {now - expiry_date} ago). "
f"Returning it anyway — the API will validate and "
f"the 401 handler will trigger re-authentication if needed."
)
except Exception as e:
logger.warning(f"Failed to parse expiry date: {e}")
# If we can't parse the date, assume it's still valid

access_token = token_data.get("access_token")
return str(access_token) if access_token else None
Expand Down Expand Up @@ -482,16 +491,18 @@ def _load_saved_token(self):
logger.debug(f"No saved token found for {self.dashboard_url}")
return

# Check if token is expired
# Log expiry status but still load the token — the API validates
expires_at = token_data.get("expires_at")
if expires_at:
try:
# Parse ISO format date
expiry_date = datetime.datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
now = datetime.datetime.now(datetime.timezone.utc)
if now >= expiry_date:
logger.debug(f"Token for {self.dashboard_url} has expired")
return
logger.warning(
f"Stored token for {self.dashboard_url} has expired "
f"(expired {now - expiry_date} ago). "
f"Loading it anyway for automatic re-authentication."
)
except Exception as e:
logger.warning(f"Failed to parse expiry date '{expires_at}': {e}")

Expand Down Expand Up @@ -565,26 +576,26 @@ async def redirect_handler(authorization_url: str) -> None:
browser_opened = webbrowser.open(authorization_url)
if not browser_opened:
logger.warning("Could not open browser automatically.")
print("\n\n-------------------------------------------------------------")
print("Please open the following URL in your browser to authenticate:")
print(f"\n{authorization_url}\n")
print("-------------------------------------------------------------\n\n")
logger.info("-------------------------------------------------------------")
logger.info("Please open the following URL in your browser to authenticate:")
logger.info(f"{authorization_url}")
logger.info("-------------------------------------------------------------")
else:
logger.debug(f"Browser window opened successfully for '{self.token_name}'")
except Exception as e:
logger.exception(f"Error opening browser: {e}")
print("\n\n-------------------------------------------------------------")
print("Please open the following URL in your browser to authenticate:")
print(f"\n{authorization_url}\n")
print("-------------------------------------------------------------\n\n")

# Store relevant information for manual OAuth flow
print("\n\n===========================================================")
print(" GITGUARDIAN OAUTH LOGIN ")
print("===========================================================\n")
print(f"The server will open a browser window to {server_url} for authentication.")
print("You'll need to log in and authorize the application.")
print(f"After authorization, you'll be redirected to http://localhost:{callback_server.port}\n")
logger.info("-------------------------------------------------------------")
logger.info("Please open the following URL in your browser to authenticate:")
logger.info(f"{authorization_url}")
logger.info("-------------------------------------------------------------")

# Log OAuth flow information (using logger to avoid corrupting MCP stdio protocol)
logger.info("===========================================================")
logger.info(" GITGUARDIAN OAUTH LOGIN ")
logger.info("===========================================================")
logger.info(f"The server will open a browser window to {server_url} for authentication.")
logger.info("You'll need to log in and authorize the application.")
logger.info(f"After authorization, you'll be redirected to http://localhost:{callback_server.port}")

# Create a simple server directly instead of trying to use OAuthClientProvider
try:
Expand Down
Loading