From d5373ba59178a6eb4b2be58ea127f923dec32061 Mon Sep 17 00:00:00 2001 From: Sirajmx Date: Sat, 2 May 2026 00:47:36 +0530 Subject: [PATCH 1/5] buyer booking workflow fix --- src/ad_seller/flows/product_setup_flow.py | 4 +- src/ad_seller/flows/proposal_handling_flow.py | 38 +++++ src/ad_seller/interfaces/api/main.py | 154 ++++++++++++++++-- 3 files changed, 178 insertions(+), 18 deletions(-) diff --git a/src/ad_seller/flows/product_setup_flow.py b/src/ad_seller/flows/product_setup_flow.py index 04b161a..9cee887 100644 --- a/src/ad_seller/flows/product_setup_flow.py +++ b/src/ad_seller/flows/product_setup_flow.py @@ -469,9 +469,9 @@ async def create_default_products(self) -> None: }, ] - for product_config in default_products: + for i, product_config in enumerate(default_products): product_def = ProductDefinition( - product_id=f"prod-{uuid.uuid4().hex[:8]}", + product_id=f"prod-{i + 1:03d}", name=product_config["name"], description=product_config.get("description"), inventory_type=product_config["inventory_type"], diff --git a/src/ad_seller/flows/proposal_handling_flow.py b/src/ad_seller/flows/proposal_handling_flow.py index 2095a14..5dfe8e4 100644 --- a/src/ad_seller/flows/proposal_handling_flow.py +++ b/src/ad_seller/flows/proposal_handling_flow.py @@ -630,3 +630,41 @@ def handle_proposal( result["_flow_state_snapshot"] = self.state.model_dump(mode="json") return result + + async def handle_proposal_async( + self, + proposal_id: str, + proposal_data: dict[str, Any], + buyer_context: Optional[BuyerContext] = None, + products: Optional[dict] = None, + ) -> dict[str, Any]: + """Async version of handle_proposal using kickoff_async(). + + Use this from async FastAPI handlers to avoid blocking the event loop + with the synchronous kickoff() call. + """ + self.state.proposal_id = proposal_id + self.state.proposal_data = proposal_data + self.state.buyer_context = buyer_context + if products: + self.state.products = products + + await self.kickoff_async() + + result = { + "proposal_id": proposal_id, + "recommendation": self.state.recommendation, + "status": self.state.status.value, + "evaluation": self.state.evaluation.model_dump() if self.state.evaluation else None, + "counter_terms": self.state.counter_terms, + "upsell_suggestions": self.state.upsell_suggestions, + "errors": self.state.errors, + "warnings": self.state.warnings, + } + + if self.state.status == ExecutionStatus.PENDING_APPROVAL: + result["pending_approval"] = True + result["flow_id"] = self.state.flow_id + result["_flow_state_snapshot"] = self.state.model_dump(mode="json") + + return result diff --git a/src/ad_seller/interfaces/api/main.py b/src/ad_seller/interfaces/api/main.py index cb1d7b6..056b992 100644 --- a/src/ad_seller/interfaces/api/main.py +++ b/src/ad_seller/interfaces/api/main.py @@ -18,7 +18,7 @@ from fastapi import Depends, FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel +from pydantic import BaseModel, Field from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware logger = logging.getLogger(__name__) @@ -602,9 +602,9 @@ def _get_static_product_catalog() -> dict[str, Any]: ] products: dict[str, ProductDefinition] = {} - for cfg in default_products: + for i, cfg in enumerate(default_products): product_def = ProductDefinition( - product_id=f"prod-{uuid.uuid4().hex[:8]}", + product_id=f"prod-{i + 1:03d}", name=cfg["name"], description=cfg.get("description"), inventory_type=cfg["inventory_type"], @@ -709,6 +709,114 @@ async def get_product(product_id: str): return _serialize_product(product) +class ProductSearchFilters(BaseModel): + """Filters for product search (mirrors buyer OpenDirectClient.search_products).""" + + channel: Optional[str] = None + adFormat: Optional[str] = None + minPrice: Optional[float] = None + maxPrice: Optional[float] = None + deliveryType: Optional[str] = None + limit: int = Field(default=10, ge=1, le=50) + + +def _serialize_product_iab(product: Any) -> dict[str, Any]: + """Serialize a ProductDefinition to IAB OpenDirect 2.1 wire format. + + The buyer's OpenDirectClient.search_products() parses the response via + Product.model_validate() which requires camelCase aliases: publisherId, + basePrice, rateType. _serialize_product() uses our internal snake_case + shape for UI consumers; this variant targets the buyer agent wire spec. + """ + publisher_id = _get_api_settings().seller_organization_id or "seller-pub-001" + deal_types = [dt.value for dt in product.supported_deal_types] + delivery = "Guaranteed" if "programmaticguaranteed" in deal_types else "NonGuaranteed" + return { + "id": product.product_id, + "publisherId": publisher_id, + "name": product.name, + "description": product.description or "", + "basePrice": product.base_cpm, + "rateType": "CPM", + "deliveryType": delivery, + "availableImpressions": 1_000_000, + "targeting": {"capabilities": ["geo", "demographic", "contextual"]}, + # Pass our extra fields through; buyer Product ignores unknown fields + "product_id": product.product_id, + "inventory_type": product.inventory_type, + "floor_cpm": product.floor_cpm, + "deal_types": deal_types, + } + + +@app.post("/products/search", tags=["Products"]) +async def search_products(filters: ProductSearchFilters): + """Search products with optional filters. + + Returns IAB OpenDirect 2.1 camelCase shape so the buyer's + Product.model_validate() can parse publisherId / basePrice / rateType. + """ + catalog = _get_static_product_catalog() + products = list(catalog["products"].values()) + if filters.channel: + products = [p for p in products if p.inventory_type == filters.channel] + if filters.minPrice is not None: + products = [p for p in products if p.base_cpm >= filters.minPrice] + if filters.maxPrice is not None: + products = [p for p in products if p.base_cpm <= filters.maxPrice] + return {"products": [_serialize_product_iab(p) for p in products[: filters.limit]]} + + +class AvailsCheckRequest(BaseModel): + """Availability check request (mirrors buyer AvailsRequest schema). + + Accepts both camelCase (productId) and snake_case (product_id) so callers + using either convention work without a 422. + """ + + productId: Optional[str] = Field(None, alias="productId") + product_id: Optional[str] = Field(None, alias="product_id") + startDate: Optional[str] = Field(None, alias="startDate") + start_date: Optional[str] = Field(None, alias="start_date") + endDate: Optional[str] = Field(None, alias="endDate") + end_date: Optional[str] = Field(None, alias="end_date") + requestedImpressions: Optional[int] = None + budget: Optional[float] = None + targeting: Optional[dict] = None + + model_config = {"populate_by_name": True} + + def resolved_product_id(self) -> str: + """Return whichever of productId/product_id was supplied.""" + return self.productId or self.product_id or "" + + +@app.post("/products/avails", tags=["Products"]) +async def check_product_avails(request: AvailsCheckRequest): + """Check availability for a product. + + FIX-4: Added to support buyer's AvailsCheckTool which calls + POST /products/avails. Returns static 1M available impressions, + matching the placeholder already used in ProposalHandlingFlow. + """ + pid = request.resolved_product_id() + catalog = _get_static_product_catalog() + product = catalog["products"].get(pid) + if not product: + raise HTTPException(status_code=404, detail=f"Product {pid} not found") + requested = request.requestedImpressions or 0 + available = 1_000_000 + return { + "productId": pid, + "availableImpressions": available, + "guaranteedImpressions": int(available * 0.85), + "estimatedCpm": product.base_cpm, + "totalCost": round((product.base_cpm / 1000) * requested, 2) if requested else 0.0, + "deliveryConfidence": 85.0, + "availableTargeting": ["geo", "demographic", "contextual"], + } + + @app.post("/pricing", response_model=PricingResponse, tags=["Pricing"]) async def get_pricing( request: PricingRequest, @@ -771,9 +879,15 @@ async def submit_proposal( from ...flows import ProductSetupFlow, ProposalHandlingFlow - # Get products - setup_flow = ProductSetupFlow() - await setup_flow.kickoff_async() + # Run ProductSetupFlow to get the serial-ID product catalog (prod-001…prod-012). + # Wrap in try/except so a setup failure doesn't produce a raw 500. + try: + setup_flow = ProductSetupFlow() + await setup_flow.kickoff_async() + products = setup_flow.state.products + except Exception as exc: # noqa: BLE001 + logger.exception("ProductSetupFlow failed: %s", exc) + products = {} # Enforce agent registry _, max_tier = await _resolve_and_enforce_agent(request.agent_url) @@ -788,7 +902,6 @@ async def submit_proposal( max_access_tier=max_tier, ) - # Process proposal proposal_id = f"prop-{uuid.uuid4().hex[:8]}" proposal_data = { "product_id": request.product_id, @@ -800,13 +913,22 @@ async def submit_proposal( "buyer_id": request.buyer_id, } - flow = ProposalHandlingFlow() - result = flow.handle_proposal( - proposal_id=proposal_id, - proposal_data=proposal_data, - buyer_context=context, - products=setup_flow.state.products, - ) + try: + flow = ProposalHandlingFlow() + result = await flow.handle_proposal_async( + proposal_id=proposal_id, + proposal_data=proposal_data, + buyer_context=context, + products=products, + ) + except Exception as exc: # noqa: BLE001 + logger.exception("ProposalHandlingFlow failed for %s: %s", proposal_id, exc) + return ProposalResponse( + proposal_id=proposal_id, + recommendation="reject", + status="failed", + errors=[f"Proposal evaluation error: {exc}"], + ) # If pending approval, create the approval request if result.get("pending_approval"): @@ -839,8 +961,8 @@ async def submit_proposal( return ProposalResponse( proposal_id=proposal_id, - recommendation=result["recommendation"], - status=result["status"], + recommendation=result.get("recommendation") or "reject", + status=result.get("status", "failed"), counter_terms=result.get("counter_terms"), errors=result.get("errors", []), ) From baf0d880a9b18414a8a21ec18c9fb242cbedb8e2 Mon Sep 17 00:00:00 2001 From: Sirajmx Date: Wed, 20 May 2026 18:19:52 +0530 Subject: [PATCH 2/5] GAM Reporting integration --- docs/guides/configuration.md | 4 +- docs/guides/gam-reporting.md | 246 +++++++++++++++++++++++ docs/guides/inventory-sync.md | 2 +- mkdocs.yml | 1 + src/ad_seller/clients/gam_soap_client.py | 244 +++++++++++++++++++++- src/ad_seller/config/settings.py | 2 +- src/ad_seller/interfaces/api/main.py | 146 +++++++++++++- 7 files changed, 636 insertions(+), 9 deletions(-) create mode 100644 docs/guides/gam-reporting.md diff --git a/docs/guides/configuration.md b/docs/guides/configuration.md index 95cda42..2f121aa 100644 --- a/docs/guides/configuration.md +++ b/docs/guides/configuration.md @@ -35,7 +35,7 @@ with case-insensitive variable names. | `GAM_NETWORK_CODE` | `str` | `None` | GAM network code (numeric ID) | | `GAM_JSON_KEY_PATH` | `str` | `None` | Path to GAM service account JSON key file | | `GAM_APPLICATION_NAME` | `str` | `"AdSellerSystem"` | Application name for GAM API requests | -| `GAM_API_VERSION` | `str` | `"v202411"` | GAM SOAP API version | +| `GAM_API_VERSION` | `str` | `"v202505"` | GAM SOAP API version | | `GAM_DEFAULT_TRAFFICKER_ID` | `str` | `None` | Default trafficker user ID for order creation | | `FREEWHEEL_ENABLED` | `bool` | `false` | Feature flag to enable FreeWheel integration | | `FREEWHEEL_NETWORK_ID` | `str` | `None` | Publisher network/account ID in FreeWheel | @@ -215,7 +215,7 @@ GAM_ENABLED=true GAM_NETWORK_CODE=12345678 GAM_JSON_KEY_PATH=/etc/secrets/gam-service-account.json GAM_APPLICATION_NAME=AcmeSellerAgent -GAM_API_VERSION=v202411 +GAM_API_VERSION=v202505 # GAM_DEFAULT_TRAFFICKER_ID=111222333 # Optional # ============================================================================= diff --git a/docs/guides/gam-reporting.md b/docs/guides/gam-reporting.md new file mode 100644 index 0000000..32c54e2 --- /dev/null +++ b/docs/guides/gam-reporting.md @@ -0,0 +1,246 @@ +# GAM Delivery Reporting + +The seller agent connects to Google Ad Manager (GAM) via the SOAP API to expose +delivery data — impressions served, clicks, and revenue — directly through its REST +endpoints. Publishers can view order-level stats for their own dashboard, and buyer +agents receive real delivery figures through the standard deal performance endpoint. + +--- + +## Prerequisites + +| Requirement | Where to get it | +|---|---| +| GAM network code | GAM UI → Admin → Global Settings → Network code | +| Service account JSON key | Google Cloud Console → IAM & Admin → Service Accounts → Create → Download key | +| Service account linked to GAM | GAM UI → Admin → Access & Authentication → API Access → Add service account | + +--- + +## Configuration + +```env +GAM_ENABLED=true +GAM_NETWORK_CODE=12345678 +GAM_JSON_KEY_PATH=/etc/secrets/gam-service-account.json +GAM_APPLICATION_NAME=AdSellerSystem # optional, identifies your app in GAM logs +GAM_API_VERSION=v202505 # optional, defaults to v202505 +``` + +See [Configuration Reference](configuration.md) for the full environment variable list. + +--- + +## How It Works + +```mermaid +sequenceDiagram + participant P as Publisher / Buyer Agent + participant S as Seller API + participant G as Google Ad Manager (SOAP) + + P->>S: GET /gam/orders + S->>G: OrderService.getOrdersByStatement() + G-->>S: Order list + S-->>P: {network_code, orders, count} + + P->>S: GET /gam/report?order_ids=123&days=30 + S->>G: ReportService.runReportJob() + G-->>S: job_id + loop Poll until COMPLETED + S->>G: getReportJobStatus(job_id) + end + S->>G: DownloadReportToFile (CSV_EXCEL) + G-->>S: CSV rows + S-->>P: {orders, report_rows, summary} + + P->>S: GET /api/v1/deals/{deal_id}/performance + S->>S: Look up gam_order_id on deal + alt gam_order_id present + S->>G: run_delivery_report([gam_order_id]) + G-->>S: delivery rows + S-->>P: Real impressions, fill_rate, avg_cpm + else gam_order_id absent or GAM not configured + S-->>P: Placeholder stats (impressions_served: 0) + end +``` + +--- + +## Endpoints + +### List Orders + +```bash +GET /gam/orders?limit=50 +``` + +Returns the most recent orders from the GAM network with id, name, and status. +Useful for finding order IDs to pass to the report endpoint. + +**Example response** + +```json +{ + "network_code": "12345678", + "user": { + "id": "987654321", + "name": "my-service-account", + "email": "seller@project.iam.gserviceaccount.com" + }, + "orders": [ + {"id": "54058762", "name": "Homepage Campaign Q2", "status": "APPROVED"}, + {"id": "54058882", "name": "Video Pre-Roll Bundle", "status": "APPROVED"} + ], + "count": 2 +} +``` + +### Delivery Report + +```bash +GET /gam/report?order_ids=54058762,54058882&days=30 +``` + +Submits a GAM report job, polls until complete (up to ~60 s), and returns +line-item-level delivery data for the look-back window. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `order_ids` | string | required | Comma-separated numeric GAM order IDs | +| `days` | int | `30` | Look-back window in days | + +**Example response** + +```json +{ + "orders": [ + { + "order_id": "54058762", + "order_name": "Homepage Campaign Q2", + "status": "APPROVED", + "line_items": [ + { + "id": "1195282", + "name": "Homepage_728x90", + "status": "DELIVERING", + "impressions_goal": 500000, + "cost_type": "CPM" + } + ] + } + ], + "report_rows": [ + { + "order_id": "54058762", + "order_name": "Homepage Campaign Q2", + "line_item_id": "1195282", + "line_item_name": "Homepage_728x90", + "impressions": 124830, + "clicks": 312, + "revenue_usd": 1872.45 + } + ], + "summary": { + "impressions": 124830, + "clicks": 312, + "revenue_usd": 1872.45 + } +} +``` + +### Deal Performance (A2A) + +```bash +GET /api/v1/deals/{deal_id}/performance +``` + +This endpoint is the standard A2A delivery channel — buyer agents call it to get +delivery stats for a booked deal without direct GAM access. + +When a deal has been trafficked into GAM and its `gam_order_id` is recorded, the +seller queries GAM and returns real data. Otherwise it returns placeholder stats +so the endpoint always responds, even before trafficking. + +**Example — real data (deal linked to GAM order)** + +```json +{ + "deal_id": "DEMO-4128A557FF5C", + "impressions_available": 500000, + "impressions_served": 124830, + "fill_rate": 24.97, + "win_rate": 0.0, + "avg_cpm_actual": 15.00, + "delivery_pacing": "on_track", + "last_updated": "2026-06-15T09:12:34Z" +} +``` + +**Example — placeholder (GAM not configured or deal not yet trafficked)** + +```json +{ + "deal_id": "DEMO-4128A557FF5C", + "impressions_available": 1000000, + "impressions_served": 0, + "fill_rate": 0.0, + "win_rate": 0.0, + "avg_cpm_actual": 0.0, + "delivery_pacing": "not_started", + "last_updated": "2026-06-15T09:12:34Z" +} +``` + +`delivery_pacing` values: + +| Value | Meaning | +|---|---| +| `not_started` | No impressions served yet | +| `on_track` | Fill rate ≥ 40% of goal | +| `behind` | Fill rate < 40% of goal | + +--- + +## Linking a Deal to a GAM Order + +The seller does not create a GAM order automatically when a deal is booked. After +trafficking the deal in GAM, store the GAM order ID on the deal record so the +performance endpoint can look it up: + +```bash +# The deal record in storage accepts a gam_order_id field. +# Set it after you have trafficked the deal in GAM. +# +# Example: deal DEMO-4128A557FF5C was trafficked as GAM order 54058762 +curl -X PATCH http://localhost:8000/api/v1/deals/DEMO-4128A557FF5C \ + -H "Content-Type: application/json" \ + -d '{"metadata": {"gam_order_id": "54058762"}}' +``` + +Once set, `GET /api/v1/deals/{deal_id}/performance` automatically pulls live +delivery data for that order. + +--- + +## Fallback Behaviour + +If GAM is not configured or any GAM call fails, every endpoint degrades gracefully: + +| Condition | Behaviour | +|---|---| +| `GAM_ENABLED=false` | `/gam/orders` and `/gam/report` return HTTP 503 | +| GAM credentials missing | Same as above | +| Deal has no `gam_order_id` | `/performance` returns placeholder stats | +| GAM API error at runtime | `/performance` silently falls back to placeholder | + +This means the seller agent is fully operational without GAM — GAM is an optional +enhancement for publishers who have it configured. + +--- + +## Related + +- [Inventory Sync](inventory-sync.md) --- Sync ad unit inventory from GAM into the seller catalog +- [Configuration Reference](configuration.md) --- All GAM environment variables +- [Buyer Agent Integration](../integration/buyer-agent.md) --- How buyers consume the performance endpoint diff --git a/docs/guides/inventory-sync.md b/docs/guides/inventory-sync.md index 55bd230..cdeee70 100644 --- a/docs/guides/inventory-sync.md +++ b/docs/guides/inventory-sync.md @@ -34,7 +34,7 @@ GAM_ENABLED=true GAM_NETWORK_CODE=12345678 GAM_JSON_KEY_PATH=/path/to/service-account.json GAM_APPLICATION_NAME=AdSellerSystem # Optional, default: AdSellerSystem -GAM_API_VERSION=v202411 # Optional, default: v202411 +GAM_API_VERSION=v202505 # Optional, default: v202505 ``` ### Sync Process diff --git a/mkdocs.yml b/mkdocs.yml index 354cab2..c149714 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -84,6 +84,7 @@ nav: - Setup Overview: guides/publisher-setup.md - Configuration: guides/configuration.md - Inventory Sync: guides/inventory-sync.md + - GAM Delivery Reporting: guides/gam-reporting.md - Media Kit: guides/media-kit.md - Pricing & Access Tiers: guides/pricing-rules.md - Approval & HITL: guides/approval-rules.md diff --git a/src/ad_seller/clients/gam_soap_client.py b/src/ad_seller/clients/gam_soap_client.py index be4276a..23b5005 100644 --- a/src/ad_seller/clients/gam_soap_client.py +++ b/src/ad_seller/clients/gam_soap_client.py @@ -5,11 +5,17 @@ Provides methods for writing to GAM using the SOAP API. Used for creating orders, line items, and managing audience segments. +Also provides read/reporting methods for delivery data. """ -from datetime import datetime +import io +import time +from datetime import date, datetime, timedelta from typing import Any, Optional +_SUPPORTED_VERSIONS = ("v202505", "v202508", "v202511", "v202602") +_DEFAULT_VERSION = "v202505" + from ..config import get_settings from ..models.gam import ( GAMAdUnit, @@ -60,7 +66,8 @@ def __init__( self.network_code = network_code or settings.gam_network_code self.credentials_path = credentials_path or settings.gam_json_key_path self.application_name = settings.gam_application_name - self.api_version = settings.gam_api_version + raw_version = settings.gam_api_version or _DEFAULT_VERSION + self.api_version = raw_version if raw_version in _SUPPORTED_VERSIONS else _DEFAULT_VERSION self.default_trafficker_id = settings.gam_default_trafficker_id self._client: Optional[Any] = None @@ -654,3 +661,236 @@ def get_current_user(self) -> dict[str, Any]: """ user_service = self._get_service("UserService") return user_service.getCurrentUser() + + # ========================================================================= + # Reporting — Read Operations + # ========================================================================= + + def list_orders(self, limit: int = 50) -> list[dict[str, Any]]: + """List recent orders in the network. + + Args: + limit: Maximum number of orders to return + + Returns: + List of dicts with id, name, status + """ + from googleads import ad_manager + + order_service = self._get_service("OrderService") + sb = ad_manager.StatementBuilder() + sb.Limit(limit) + result = order_service.getOrdersByStatement(sb.ToStatement()) + return [ + { + "id": str(getattr(o, "id", "")), + "name": getattr(o, "name", ""), + "status": str(getattr(o, "status", "")), + } + for o in (getattr(result, "results", None) or []) + ] + + def get_order_by_id(self, order_id: str) -> dict[str, Any]: + """Fetch a single order by numeric ID. + + Args: + order_id: GAM order ID + + Returns: + Dict with id, name, status, advertiser_id, start_date, end_date + """ + from googleads import ad_manager + + order_service = self._get_service("OrderService") + sb = ad_manager.StatementBuilder() + sb.Where("id = :id").WithBindVariable("id", int(order_id)) + sb.Limit(1) + result = order_service.getOrdersByStatement(sb.ToStatement()) + orders = getattr(result, "results", None) or [] + if not orders: + return {} + o = orders[0] + return { + "id": str(getattr(o, "id", "")), + "name": getattr(o, "name", ""), + "status": str(getattr(o, "status", "")), + "advertiser_id": str(getattr(o, "advertiserId", "")), + "start_date": str(getattr(o, "startDateTime", "")), + "end_date": str(getattr(o, "endDateTime", "")), + } + + def list_line_items_for_order(self, order_id: str) -> list[dict[str, Any]]: + """List line items for a given order ID. + + Args: + order_id: GAM order ID + + Returns: + List of dicts with id, name, status, impressions_goal, cost_type + """ + from googleads import ad_manager + + li_service = self._get_service("LineItemService") + sb = ad_manager.StatementBuilder() + sb.Where("orderId = :orderId").WithBindVariable("orderId", int(order_id)) + result = li_service.getLineItemsByStatement(sb.ToStatement()) + return [ + { + "id": str(getattr(li, "id", "")), + "name": getattr(li, "name", ""), + "status": str(getattr(li, "status", "")), + "impressions_goal": getattr( + getattr(li, "primaryGoal", None), "units", -1 + ), + "cost_type": str(getattr(li, "costType", "CPM")), + } + for li in (getattr(result, "results", None) or []) + ] + + def run_delivery_report( + self, + order_ids: list[str], + days: int = 30, + ) -> list[dict[str, Any]]: + """Submit a delivery report job and return rows as dicts. + + Polls until the job completes (max ~60 s) then downloads CSV rows. + + Args: + order_ids: List of GAM order IDs (numeric strings) + days: Look-back window in days (default 30) + + Returns: + List of row dicts with keys: + order_id, order_name, line_item_id, line_item_name, + impressions, clicks, revenue_usd + """ + report_svc = self._get_service("ReportService") + today = date.today() + start = today - timedelta(days=days) + + report_job = { + "reportQuery": { + "dimensions": [ + "ORDER_ID", + "ORDER_NAME", + "LINE_ITEM_ID", + "LINE_ITEM_NAME", + ], + "columns": [ + "AD_SERVER_IMPRESSIONS", + "AD_SERVER_CLICKS", + "AD_SERVER_CPM_AND_CPC_REVENUE", + ], + "dateRangeType": "CUSTOM_DATE", + "startDate": { + "year": start.year, + "month": start.month, + "day": start.day, + }, + "endDate": { + "year": today.year, + "month": today.month, + "day": today.day, + }, + } + } + + job = report_svc.runReportJob(report_job) + job_id = job.id + + for _ in range(20): + time.sleep(3) + status = str(report_svc.getReportJobStatus(job_id)) + if status == "COMPLETED": + break + if status == "FAILED": + return [{"error": f"Report job {job_id} failed"}] + + downloader = self._client.GetDataDownloader(version=self.api_version) + buf = io.BytesIO() + downloader.DownloadReportToFile( + job_id, "CSV_EXCEL", buf, use_gzip_compression=False + ) + buf.seek(0) + lines = buf.read().decode("utf-8").splitlines() + + if len(lines) < 2: + return [] + + rows = [] + for line in lines[1:]: + if line.startswith("Total"): + continue + parts = line.split(",") + if len(parts) < 7: + continue + rows.append( + { + "order_id": parts[0].strip(), + "order_name": parts[1].strip(), + "line_item_id": parts[2].strip(), + "line_item_name": parts[3].strip(), + "impressions": int(parts[4].strip() or 0), + "clicks": int(parts[5].strip() or 0), + "revenue_usd": float(parts[6].strip() or 0), + } + ) + return rows + + def get_delivery_report( + self, + order_ids: list[str], + days: int = 30, + ) -> dict[str, Any]: + """Fetch order metadata + line items + delivery rows for given order IDs. + + Args: + order_ids: List of GAM order IDs + days: Look-back window in days + + Returns: + { + "orders": [{"order_id", "order_name", "status", "line_items": [...]}], + "report_rows": [...], + "summary": {"impressions", "clicks", "revenue_usd"} + } + """ + orders_out: list[dict[str, Any]] = [] + for oid in order_ids: + entry: dict[str, Any] = {"order_id": oid} + try: + order = self.get_order_by_id(oid) + entry.update( + { + "order_name": order.get("name", oid), + "status": order.get("status", "UNKNOWN"), + } + ) + except Exception as e: + entry.update({"order_name": oid, "status": "ERROR", "error": str(e)}) + try: + entry["line_items"] = self.list_line_items_for_order(oid) + except Exception as e: + entry["line_items"] = [] + entry["line_items_error"] = str(e) + orders_out.append(entry) + + report_rows: list[dict[str, Any]] = [] + try: + report_rows = self.run_delivery_report(order_ids, days=days) + except Exception as e: + report_rows = [{"error": str(e)}] + + ok = [r for r in report_rows if "error" not in r] + summary = { + "impressions": sum(r.get("impressions", 0) for r in ok), + "clicks": sum(r.get("clicks", 0) for r in ok), + "revenue_usd": round(sum(r.get("revenue_usd", 0) for r in ok), 2), + } + + return { + "orders": orders_out, + "report_rows": report_rows, + "summary": summary, + } diff --git a/src/ad_seller/config/settings.py b/src/ad_seller/config/settings.py index 036303e..3a9174b 100644 --- a/src/ad_seller/config/settings.py +++ b/src/ad_seller/config/settings.py @@ -73,7 +73,7 @@ class Settings(BaseSettings): gam_network_code: Optional[str] = None # GAM network ID gam_json_key_path: Optional[str] = None # Path to service account JSON key gam_application_name: str = "AdSellerSystem" # Application name for GAM API - gam_api_version: str = "v202411" # SOAP API version + gam_api_version: str = "v202505" # SOAP API version gam_default_trafficker_id: Optional[str] = None # Default trafficker user ID # FreeWheel Configuration (alternative ad server) diff --git a/src/ad_seller/interfaces/api/main.py b/src/ad_seller/interfaces/api/main.py index 056b992..bf55080 100644 --- a/src/ad_seller/interfaces/api/main.py +++ b/src/ad_seller/interfaces/api/main.py @@ -79,6 +79,7 @@ }, {"name": "Deal Performance", "description": "Deal delivery and performance metrics"}, {"name": "Bulk Operations", "description": "Batch deal create/update/cancel"}, + {"name": "Reporting", "description": "GAM delivery reporting (seller-internal)"}, ], ) @@ -3864,8 +3865,10 @@ async def get_deal_performance(deal_id: str): """Return delivery stats for a deal. Provides performance feedback for buyer SPO (Supply Path Optimization). - Returns placeholder/mock stats initially — real ad server integration - comes in a future phase. + + If GAM is configured and the deal has a linked gam_order_id (set when + the deal is trafficked into GAM), returns real delivery data from the + ad server. Otherwise returns placeholder stats. """ from ...storage.factory import get_storage @@ -3878,8 +3881,60 @@ async def get_deal_performance(deal_id: str): detail={"error": "deal_not_found", "message": f"Deal '{deal_id}' not found."}, ) - # Placeholder performance data — real stats come from ad server integration now = datetime.utcnow().isoformat() + "Z" + s = _get_api_settings() + + # Real path: GAM configured + deal was trafficked into GAM + gam_order_id = deal.get("gam_order_id") or deal.get("metadata", {}).get("gam_order_id") + if s.gam_enabled and s.gam_network_code and s.gam_json_key_path and gam_order_id: + try: + from ...clients.gam_soap_client import GAMSoapClient + + client = GAMSoapClient() + client.connect() + report = client.get_delivery_report([str(gam_order_id)], days=30) + client.disconnect() + + summary = report.get("summary", {}) + impressions_served = summary.get("impressions", 0) + impressions_available = 0 + for order in report.get("orders", []): + for li in order.get("line_items", []): + goal = li.get("impressions_goal", 0) + if goal and goal > 0: + impressions_available += goal + + fill_rate = ( + round(impressions_served / impressions_available * 100, 1) + if impressions_available + else 0.0 + ) + revenue = summary.get("revenue_usd", 0.0) + avg_cpm = ( + round(revenue / impressions_served * 1000, 2) + if impressions_served + else 0.0 + ) + pacing = ( + "not_started" if impressions_served == 0 + else "on_track" if fill_rate >= 40 + else "behind" + ) + + return DealPerformanceResponse( + deal_id=deal_id, + impressions_available=impressions_available, + impressions_served=impressions_served, + fill_rate=fill_rate, + win_rate=0.0, + avg_cpm_actual=avg_cpm, + delivery_pacing=pacing, + last_updated=now, + ) + except Exception: + pass # Fall through to placeholder on any GAM error + + # Fallback: placeholder stats (GAM not configured or order not yet trafficked) return DealPerformanceResponse( deal_id=deal_id, impressions_available=1000000, @@ -5262,3 +5317,88 @@ async def get_deal_lineage(deal_id: str): "replacements": replacements, "chain_length": len(parents) + 1 + len(replacements), } + + +# ============================================================================= +# GAM Reporting (seller-internal) +# ============================================================================= + + +def _gam_configured() -> bool: + s = _get_api_settings() + return bool(s.gam_enabled and s.gam_network_code and s.gam_json_key_path) + + +@app.get("/gam/orders", tags=["Reporting"]) +async def gam_list_orders( + limit: int = 50, + _auth: None = Depends(_get_optional_api_key_record), +) -> dict[str, Any]: + """List recent GAM orders directly from the ad server. + + Returns the most recent orders with id, name, and status. + Requires GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH in .env. + """ + if not _gam_configured(): + raise HTTPException( + status_code=503, + detail="GAM not configured — set GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH", + ) + try: + from ...clients.gam_soap_client import GAMSoapClient + + client = GAMSoapClient() + client.connect() + orders = client.list_orders(limit=limit) + user = client.get_current_user() + client.disconnect() + return { + "network_code": _get_api_settings().gam_network_code, + "user": { + "id": str(getattr(user, "id", "")), + "name": getattr(user, "name", ""), + "email": getattr(user, "email", ""), + }, + "orders": orders, + "count": len(orders), + } + except Exception as e: + raise HTTPException(status_code=502, detail=str(e)) + + +@app.get("/gam/report", tags=["Reporting"]) +async def gam_delivery_report( + order_ids: str, + days: int = 30, + _auth: None = Depends(_get_optional_api_key_record), +) -> dict[str, Any]: + """Pull a delivery report from GAM by order ID(s). + + Args: + order_ids: Comma-separated numeric GAM order IDs + days: Look-back window in days (default 30) + + Returns order metadata, line items, and delivery data (impressions, clicks, revenue). + Requires GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH in .env. + """ + if not _gam_configured(): + raise HTTPException( + status_code=503, + detail="GAM not configured — set GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH", + ) + ids = [oid.strip() for oid in order_ids.split(",") if oid.strip()] + if not ids: + raise HTTPException( + status_code=400, + detail="order_ids must be a comma-separated list of numeric GAM order IDs", + ) + try: + from ...clients.gam_soap_client import GAMSoapClient + + client = GAMSoapClient() + client.connect() + report = client.get_delivery_report(ids, days=days) + client.disconnect() + return report + except Exception as e: + raise HTTPException(status_code=502, detail=str(e)) From bc2c1cd550a3af604cf7e99d3904ce31cdd632f3 Mon Sep 17 00:00:00 2001 From: Sirajmx Date: Wed, 20 May 2026 19:00:14 +0530 Subject: [PATCH 3/5] lint fix --- src/ad_seller/clients/gam_soap_client.py | 6 +++--- tests/unit/test_gam_clients.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ad_seller/clients/gam_soap_client.py b/src/ad_seller/clients/gam_soap_client.py index 23b5005..220d8a8 100644 --- a/src/ad_seller/clients/gam_soap_client.py +++ b/src/ad_seller/clients/gam_soap_client.py @@ -13,9 +13,6 @@ from datetime import date, datetime, timedelta from typing import Any, Optional -_SUPPORTED_VERSIONS = ("v202505", "v202508", "v202511", "v202602") -_DEFAULT_VERSION = "v202505" - from ..config import get_settings from ..models.gam import ( GAMAdUnit, @@ -38,6 +35,9 @@ GAMUnitType, ) +_SUPPORTED_VERSIONS = ("v202505", "v202508", "v202511", "v202602") +_DEFAULT_VERSION = "v202505" + class GAMSoapClient: """SOAP API client for writing to Google Ad Manager. diff --git a/tests/unit/test_gam_clients.py b/tests/unit/test_gam_clients.py index bb27283..4da99a2 100644 --- a/tests/unit/test_gam_clients.py +++ b/tests/unit/test_gam_clients.py @@ -264,7 +264,7 @@ def test_soap_client_setup(self): ) assert client.network_code == "12345678" assert client.credentials_path == "/path/to/creds.json" - assert client.api_version == "v202411" + assert client.api_version == "v202505" assert client._client is None # Not connected yet def test_rest_client_requires_connection(self): From d777aea31fedc50ad984bf42df669e136373d864 Mon Sep 17 00:00:00 2001 From: Sirajmx Date: Tue, 26 May 2026 22:06:20 +0530 Subject: [PATCH 4/5] mcp tools --- src/ad_seller/interfaces/mcp_server.py | 48 ++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/ad_seller/interfaces/mcp_server.py b/src/ad_seller/interfaces/mcp_server.py index 30daa4e..bfb9f9f 100644 --- a/src/ad_seller/interfaces/mcp_server.py +++ b/src/ad_seller/interfaces/mcp_server.py @@ -509,6 +509,54 @@ async def get_deal_performance(deal_id: str) -> str: return resp.text +@mcp.tool() +async def list_gam_orders(limit: int = 50) -> str: + """List recent orders from Google Ad Manager (GAM). + + Returns order id, name, and status for each order in the network. + Also returns the authenticated service account user and network code. + Requires GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH in .env. + """ + import httpx + + settings = _get_settings() + url = getattr(settings, "seller_agent_url", "http://localhost:8000") + + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.get(f"{url}/gam/orders", params={"limit": limit}) + if resp.status_code == 503: + return "GAM not configured — set GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH in .env" + return resp.text + + +@mcp.tool() +async def get_gam_delivery_report(order_ids: str, days: int = 30) -> str: + """Pull a delivery report from Google Ad Manager for one or more orders. + + Args: + order_ids: Comma-separated numeric GAM order IDs (e.g. '54058762,54058882') + days: Look-back window in days (default 30) + + Returns order metadata, line items, and delivery data (impressions, clicks, revenue). + Requires GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH in .env. + """ + import httpx + + settings = _get_settings() + url = getattr(settings, "seller_agent_url", "http://localhost:8000") + + async with httpx.AsyncClient(timeout=120) as client: + resp = await client.get( + f"{url}/gam/report", + params={"order_ids": order_ids, "days": days}, + ) + if resp.status_code == 503: + return "GAM not configured — set GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH in .env" + if resp.status_code == 400: + return "order_ids must be a comma-separated list of numeric GAM order IDs" + return resp.text + + @mcp.tool() async def push_deal_to_buyers(deal_id: str, buyer_urls: str) -> str: """Push a deal to buyer endpoints via IAB Deals API v1.0. From cc5f9f083fe982a289f405268053350636122834 Mon Sep 17 00:00:00 2001 From: Sirajmx Date: Wed, 27 May 2026 00:01:02 +0530 Subject: [PATCH 5/5] allow agents only filter and persist gam id --- src/ad_seller/clients/gam_soap_client.py | 65 +++++++++++++++++++----- src/ad_seller/interfaces/api/main.py | 29 ++++++++++- src/ad_seller/interfaces/mcp_server.py | 18 +++++-- src/ad_seller/tools/gam/book_deal.py | 18 +++++++ 4 files changed, 110 insertions(+), 20 deletions(-) diff --git a/src/ad_seller/clients/gam_soap_client.py b/src/ad_seller/clients/gam_soap_client.py index 220d8a8..51b940f 100644 --- a/src/ad_seller/clients/gam_soap_client.py +++ b/src/ad_seller/clients/gam_soap_client.py @@ -228,7 +228,13 @@ def create_order( if notes: order["notes"] = notes if external_order_id: - order["externalOrderId"] = external_order_id + # GAM externalOrderId must be numeric — store non-numeric IDs in notes + try: + order["externalOrderId"] = int(external_order_id) + except (ValueError, TypeError): + order["notes"] = ( + f"{order.get('notes', '')} [deal_id:{external_order_id}]".strip() + ) result = order_service.createOrders([order]) order_data = result[0] @@ -666,29 +672,52 @@ def get_current_user(self) -> dict[str, Any]: # Reporting — Read Operations # ========================================================================= - def list_orders(self, limit: int = 50) -> list[dict[str, Any]]: + def list_orders( + self, + limit: int = 50, + agent_created_only: bool = False, + ) -> list[dict[str, Any]]: """List recent orders in the network. Args: limit: Maximum number of orders to return + agent_created_only: If True, return only orders created by the + agent (those with an externalOrderId set, which the agent + stores as the OpenDirect deal_id e.g. DEMO-xxxx) Returns: - List of dicts with id, name, status + List of dicts with id, name, status, external_order_id (deal_id if agent-created) """ from googleads import ad_manager order_service = self._get_service("OrderService") sb = ad_manager.StatementBuilder() + if agent_created_only: + sb.Where("externalOrderId != ''") sb.Limit(limit) result = order_service.getOrdersByStatement(sb.ToStatement()) - return [ - { - "id": str(getattr(o, "id", "")), - "name": getattr(o, "name", ""), - "status": str(getattr(o, "status", "")), - } - for o in (getattr(result, "results", None) or []) - ] + import re + + rows = [] + for o in getattr(result, "results", None) or []: + external_id = getattr(o, "externalOrderId", None) + notes = getattr(o, "notes", "") or "" + # Extract deal_id from notes if stored as [deal_id:DEMO-xxxx] + deal_id_from_notes = None + match = re.search(r"\[deal_id:([^\]]+)\]", notes) + if match: + deal_id_from_notes = match.group(1) + agent_deal_id = deal_id_from_notes or (str(external_id) if external_id else None) + rows.append( + { + "id": str(getattr(o, "id", "")), + "name": getattr(o, "name", ""), + "status": str(getattr(o, "status", "")), + "external_order_id": agent_deal_id, + "agent_created": bool(agent_deal_id), + } + ) + return rows def get_order_by_id(self, order_id: str) -> dict[str, Any]: """Fetch a single order by numeric ID. @@ -710,13 +739,23 @@ def get_order_by_id(self, order_id: str) -> dict[str, Any]: if not orders: return {} o = orders[0] + + def _fmt_date(dt: Any) -> str: + """Format a GAM SOAP DateTime object as YYYY-MM-DD.""" + if dt is None: + return "" + d = getattr(dt, "date", None) + if d is None: + return str(dt) + return f"{getattr(d, 'year', '')-0:04d}-{getattr(d, 'month', ''):02d}-{getattr(d, 'day', ''):02d}" + return { "id": str(getattr(o, "id", "")), "name": getattr(o, "name", ""), "status": str(getattr(o, "status", "")), "advertiser_id": str(getattr(o, "advertiserId", "")), - "start_date": str(getattr(o, "startDateTime", "")), - "end_date": str(getattr(o, "endDateTime", "")), + "start_date": _fmt_date(getattr(o, "startDateTime", None)), + "end_date": _fmt_date(getattr(o, "endDateTime", None)), } def list_line_items_for_order(self, order_id: str) -> list[dict[str, Any]]: diff --git a/src/ad_seller/interfaces/api/main.py b/src/ad_seller/interfaces/api/main.py index bf55080..e6bdbfd 100644 --- a/src/ad_seller/interfaces/api/main.py +++ b/src/ad_seller/interfaces/api/main.py @@ -5332,11 +5332,16 @@ def _gam_configured() -> bool: @app.get("/gam/orders", tags=["Reporting"]) async def gam_list_orders( limit: int = 50, + agent_created_only: bool = False, _auth: None = Depends(_get_optional_api_key_record), ) -> dict[str, Any]: """List recent GAM orders directly from the ad server. - Returns the most recent orders with id, name, and status. + Args: + limit: Maximum number of orders to return (default 50) + agent_created_only: If true, return only orders created by the agent + (those with an externalOrderId matching an OpenDirect deal_id) + Requires GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH in .env. """ if not _gam_configured(): @@ -5346,10 +5351,30 @@ async def gam_list_orders( ) try: from ...clients.gam_soap_client import GAMSoapClient + from ...storage.factory import get_storage client = GAMSoapClient() client.connect() - orders = client.list_orders(limit=limit) + + if agent_created_only: + # Use SQLite as source of truth — find all deals with gam_order_id set + storage = await get_storage() + all_deals = await storage.list_deals() + linked = [ + {"deal_id": d.get("deal_id"), "gam_order_id": d.get("gam_order_id")} + for d in all_deals + if d.get("gam_order_id") + ] + orders = [] + for link in linked[:limit]: + order = client.get_order_by_id(link["gam_order_id"]) + if order: + order["external_order_id"] = link["deal_id"] + order["agent_created"] = True + orders.append(order) + else: + orders = client.list_orders(limit=limit) + user = client.get_current_user() client.disconnect() return { diff --git a/src/ad_seller/interfaces/mcp_server.py b/src/ad_seller/interfaces/mcp_server.py index bfb9f9f..eb76774 100644 --- a/src/ad_seller/interfaces/mcp_server.py +++ b/src/ad_seller/interfaces/mcp_server.py @@ -510,11 +510,16 @@ async def get_deal_performance(deal_id: str) -> str: @mcp.tool() -async def list_gam_orders(limit: int = 50) -> str: - """List recent orders from Google Ad Manager (GAM). +async def list_gam_orders(limit: int = 50, agent_created_only: bool = False) -> str: + """List orders from Google Ad Manager (GAM). - Returns order id, name, and status for each order in the network. - Also returns the authenticated service account user and network code. + Args: + limit: Maximum number of orders to return (default 50) + agent_created_only: If True, return only orders created by the agent + (linked to an OpenDirect deal_id). Use this to see campaigns the + agent has booked, separated from manually created orders. + + Returns order id, name, status, and whether the order was agent-created. Requires GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH in .env. """ import httpx @@ -523,7 +528,10 @@ async def list_gam_orders(limit: int = 50) -> str: url = getattr(settings, "seller_agent_url", "http://localhost:8000") async with httpx.AsyncClient(timeout=30) as client: - resp = await client.get(f"{url}/gam/orders", params={"limit": limit}) + resp = await client.get( + f"{url}/gam/orders", + params={"limit": limit, "agent_created_only": agent_created_only}, + ) if resp.status_code == 503: return "GAM not configured — set GAM_ENABLED=true, GAM_NETWORK_CODE, GAM_JSON_KEY_PATH in .env" return resp.text diff --git a/src/ad_seller/tools/gam/book_deal.py b/src/ad_seller/tools/gam/book_deal.py index 0e333c7..531f6da 100644 --- a/src/ad_seller/tools/gam/book_deal.py +++ b/src/ad_seller/tools/gam/book_deal.py @@ -242,6 +242,24 @@ def _run( message="Deal successfully booked in GAM", ) + # Write gam_order_id back to the deal record so the + # performance endpoint can query real delivery data + try: + import asyncio + + from ...storage.factory import get_storage + + async def _persist_gam_order_id() -> None: + storage = await get_storage() + deal = await storage.get_deal(deal_id) + if deal: + deal["gam_order_id"] = order.id + await storage.set_deal(deal_id, deal) + + asyncio.run(_persist_gam_order_id()) + except Exception: + pass # Best-effort — never block the booking result + lines = [ "Deal booked successfully in GAM:\n", f"- OpenDirect Deal ID: {deal_id}",