diff --git a/Packs/SilentPush/Integrations/SilentPush/SilentPush.py b/Packs/SilentPush/Integrations/SilentPush/SilentPush.py index 4ac0f32686dd..cc6894aab794 100644 --- a/Packs/SilentPush/Integrations/SilentPush/SilentPush.py +++ b/Packs/SilentPush/Integrations/SilentPush/SilentPush.py @@ -1,5 +1,5 @@ -import demistomock as demisto # noqa: F401 -from CommonServerPython import * # noqa: F401 +# import demistomock as demisto # noqa: F401 +# from CommonServerPython import * # noqa: F401 import ipaddress import re """Base Integration for Cortex XSOAR (aka Demisto) @@ -67,7 +67,10 @@ def _http_request(self, method: str, url_suffix: str, params: dict = None, data: Raises: DemistoException: If there's an error during the API call. """ - full_url = f'{self.base_url}{url_suffix}' + if url_suffix == "/api/v2/iocs/threat-ranking": + full_url = demisto.params().get('url', 'https://api.silentpush.com') + url_suffix + else: + full_url = f'{self.base_url}{url_suffix}' try: response = requests.request( method, @@ -83,7 +86,7 @@ def _http_request(self, method: str, url_suffix: str, params: dict = None, data: return response.text except Exception as e: raise DemistoException(f'Error in API call: {str(e)}') - + def parse_subject(self,subject: Any) -> Dict[str, Any]: """ Parse the subject of a certificate or domain record. @@ -92,7 +95,7 @@ def parse_subject(self,subject: Any) -> Dict[str, Any]: subject (Any): The subject to parse, which can be a dictionary, string, or other type. Returns: - Dict[str, Any]: A dictionary representation of the subject, + Dict[str, Any]: A dictionary representation of the subject, with a fallback to {'CN': subject} or {'CN': 'N/A'} if parsing fails. """ if isinstance(subject, dict): @@ -101,17 +104,16 @@ def parse_subject(self,subject: Any) -> Dict[str, Any]: try: return json.loads(subject.replace("'", '"')) except (json.JSONDecodeError, TypeError): - demisto.debug(f"Failed to parse subject: {subject}") return {'CN': subject} else: return {'CN': 'N/A'} - - def validate_ip_address(ip: str, allow_ipv6: bool = True) -> bool: + def validate_ip_address(self, ip: str, allow_ipv6: bool = True) -> bool: """ Validate an IP address. Args: + self: The instance of the class. ip (str): IP address to validate. allow_ipv6 (bool, optional): Whether to allow IPv6 addresses. Defaults to True. @@ -121,92 +123,128 @@ def validate_ip_address(ip: str, allow_ipv6: bool = True) -> bool: try: ip = ip.strip() ip_obj = ipaddress.ip_address(ip) - + return not (not allow_ipv6 and ip_obj.version == 6) except ValueError: return False - def validate_ip_inputs(ips: list[str], allow_ipv6: bool = True) -> list[str]: + def validate_ip_inputs(self, ips: List[str], allow_ipv6: bool = True) -> List[str]: """ Validate a list of IP addresses. Args: - ips (list[str]): List of IP addresses to validate. - allow_ipv6 (bool, optional): Whether to allow IPv6 addresses. Defaults to True. + ips (List[str]): List of IP addresses to validate. + allow_ipv6 (bool): Whether to allow IPv6 addresses. Defaults to True. Returns: - list[str]: List of valid IP addresses. + List[str]: List of valid IP addresses. Raises: DemistoException: If no valid IP addresses are found. """ - valid_ips = [ip.strip() for ip in ips if validate_ip_address(ip, allow_ipv6)] - + valid_ips = [ + ip.strip() for ip in ips if self.validate_ip_address(ip, allow_ipv6=allow_ipv6) + ] + if not valid_ips: - raise DemistoException(f"No valid {'IPv4 and IPv6' if allow_ipv6 else 'IPv4'} addresses found.") - + raise DemistoException( + f"No valid {'IPv4 and IPv6' if allow_ipv6 else 'IPv4'} addresses found." + ) + return valid_ips + + ''' Client Methods''' + def list_domain_information(self, domains: List[str], fetch_risk_score: Optional[bool] = False, fetch_whois_info: Optional[bool] = False) -> Dict: """ - Retrieve comprehensive information for multiple domains. + Retrieve domain information along with optional risk scores and WHOIS data. Args: - domains (List[str]): List of domains to fetch information for. - fetch_risk_score (bool, optional): Whether to retrieve risk scores. Defaults to False. - fetch_whois_info (bool, optional): Whether to retrieve WHOIS information. Defaults to False. + domains (List[str]): List of domains to get information for + fetch_risk_score (bool, optional): Whether to fetch risk scores. Defaults to False + fetch_whois_info (bool, optional): Whether to fetch WHOIS information. Defaults to False Returns: - Dict: A dictionary containing domain information, including optional risk scores and WHOIS data. + Dict: Dictionary containing domain information with optional risk scores and WHOIS data Raises: - DemistoException: If more than 100 domains are submitted. + DemistoException: If more than 100 domains are provided """ if len(domains) > 100: raise DemistoException("Maximum of 100 domains can be submitted in a single request.") domains_data = {'domains': domains} - bulk_info_response = self._http_request('POST', 'explore/bulk/domaininfo', data=domains_data) + bulk_info_response = self._http_request( + method='POST', + url_suffix='explore/bulk/domaininfo', + data=domains_data + ) domain_info_list = bulk_info_response.get('response', {}).get('domaininfo', []) domain_info_dict = {item['domain']: item for item in domain_info_list} risk_score_dict = {} if fetch_risk_score: - bulk_risk_response = self._http_request('POST', 'explore/bulk/domain/riskscore', data=domains_data) - risk_score_list = bulk_risk_response.get('response', []) + risk_response = self._http_request( + method='POST', + url_suffix='explore/bulk/domain/riskscore', + data=domains_data + ) + risk_score_list = risk_response.get('response', []) risk_score_dict = {item['domain']: item for item in risk_score_list} - live_whois_info = {} + whois_info_dict = {} if fetch_whois_info: for domain in domains: try: - live_whois_response = self._http_request('GET', f'explore/domain/whoislive/{domain}') - live_whois_info[domain] = live_whois_response.get('response', {}) + whois_response = self._http_request( + method='GET', + url_suffix=f'explore/domain/whois/{domain}' + ) + whois_data = whois_response.get('response', {}).get('whois', [{}])[0] + + whois_info_dict[domain] = { + 'Registrant Name': whois_data.get('name', 'N/A'), + 'Registrant Organization': whois_data.get('org', 'N/A'), + 'Registrant Address': ', '.join(whois_data.get('address', [])) if isinstance(whois_data.get('address'), list) else whois_data.get('address', 'N/A'), + 'Registrant City': whois_data.get('city', 'N/A'), + 'Registrant State': whois_data.get('state', 'N/A'), + 'Registrant Country': whois_data.get('country', 'N/A'), + 'Registrant Zipcode': whois_data.get('zipcode', 'N/A'), + 'Creation Date': whois_data.get('created', 'N/A'), + 'Updated Date': whois_data.get('updated', 'N/A'), + 'Expiration Date': whois_data.get('expires', 'N/A'), + 'Registrar': whois_data.get('registrar', 'N/A'), + 'WHOIS Server': whois_data.get('whois_server', 'N/A'), + 'Nameservers': ', '.join(whois_data.get('nameservers', [])), + 'Emails': ', '.join(whois_data.get('emails', [])) + } except Exception as e: - live_whois_info[domain] = {'error': str(e)} + whois_info_dict[domain] = {'error': str(e)} - combined_results = [{ - 'domain': domain, - **domain_info_dict.get(domain, {}), - 'sp_risk_score': risk_score_dict.get(domain, {}).get('sp_risk_score', 'N/A'), - 'sp_risk_score_explain': risk_score_dict.get(domain, {}).get('sp_risk_score_explain', 'N/A'), - 'whois_info': live_whois_info.get(domain, 'N/A') - } for domain in domains] + results = [] + for domain in domains: + domain_info = { + 'domain': domain, + **domain_info_dict.get(domain, {}), + } - return {'domains': combined_results} + if fetch_risk_score: + risk_data = risk_score_dict.get(domain, {}) + domain_info.update({ + 'risk_score': risk_data.get('sp_risk_score', 'N/A'), + 'risk_score_explanation': risk_data.get('sp_risk_score_explain', 'N/A') + }) - def get_domain_certificates(self, domain: str, **kwargs) -> Dict[str, Any]: - """ - Retrieve SSL/TLS certificates for a given domain. + if fetch_whois_info: + domain_info['whois_info'] = whois_info_dict.get(domain, {}) - Args: - domain (str): The domain to retrieve certificates for. - **kwargs: Additional optional parameters for filtering certificates. + results.append(domain_info) - Returns: - Dict[str, Any]: A dictionary containing domain certificate information. - """ + return {'domains': results} + + def get_domain_certificates(self, domain: str, **kwargs) -> Dict[str, Any]: url_suffix = f"explore/domain/certificates/{domain}" params = {k: v for k, v in kwargs.items() if v is not None} response = self._http_request( @@ -214,12 +252,9 @@ def get_domain_certificates(self, domain: str, **kwargs) -> Dict[str, Any]: url_suffix=url_suffix, params=params ) - demisto.debug(f"Raw response from API: {response}") return response - - def search_domains(self, query: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, risk_score_min: Optional[int] = None, risk_score_max: Optional[int] = None, limit: int = 100, domain_regex: Optional[str] = None, name_server: Optional[str] = None, asnum: Optional[int] = None, asname: Optional[str] = None, min_ip_diversity: Optional[int] = None, registrar: Optional[str] = None, min_asn_diversity: Optional[int] = None, certificate_issuer: Optional[str] = None, whois_date_after: Optional[str] = None, skip: Optional[int] = None) -> dict: """ Search for domains based on various filtering criteria. @@ -267,42 +302,64 @@ def search_domains(self, query: Optional[str] = None, start_date: Optional[str] response = self._http_request('GET', url_suffix, params=params) return response - def list_domain_infratags(self, domains: list, cluster: bool = False, mode: str = 'live', match: str = 'self', as_of: Optional[str] = None) -> dict: + def list_domain_infratags( + self, + domains: list, + cluster: bool = False, + mode: str = 'live', + match: str = 'self', + as_of: Optional[str] = None, + origin_uid: Optional[str] = None, + use_get: bool = False + ) -> dict: """ - Retrieve infrastructure tags for specified domains. + Retrieve infrastructure tags for specified domains, supporting both GET and POST methods. - Args: - domains (list): List of domains to fetch infrastructure tags for. - cluster (bool, optional): Whether to cluster tags. Defaults to False. - mode (str, optional): Tag retrieval mode. Defaults to 'live'. - match (str, optional): Matching criteria. Defaults to 'self'. - as_of (str, optional): Specific timestamp for tag retrieval. Defaults to None. + Args: + domains (list): List of domains to fetch infrastructure tags for. + cluster (bool): Whether to include cluster information (default: False). + mode (str): Tag retrieval mode (default: 'live'). + match (str): Matching criteria (default: 'self'). + as_of (Optional[str]): Specific timestamp for tag retrieval. + origin_uid (Optional[str]): Unique identifier for the API user. + use_get (bool): Use GET method instead of POST (default: False). - Returns: - dict: Infrastructure tags and optional tag clusters for the domains. - """ - url = 'explore/bulk/domain/infratags' - payload = { - 'domains': domains - } + Returns: + dict: API response containing infratags and optional tag clusters. + """ + url_suffix = 'explore/bulk/domain/infratags' params = { 'mode': mode, 'match': match, - 'clusters': cluster + 'clusters': int(cluster), } + if as_of: params['as_of'] = as_of + if origin_uid: + params['origin_uid'] = origin_uid - response = self._http_request( - method='POST', - url_suffix=url, - params=params, - data=payload - ) + if use_get: + + response = self._http_request( + method='GET', + url_suffix=url_suffix, + params=params + ) + else: + + payload = {'domains': domains} + response = self._http_request( + method='POST', + url_suffix=url_suffix, + params=params, + data=payload + ) return response + def get_enrichment_data(self, resource: str, value: str, explain: Optional[bool] = False, scan_data: Optional[bool] = False) -> dict: """ Retrieve enrichment data for a specific resource. @@ -334,12 +391,13 @@ def get_enrichment_data(self, resource: str, value: str, explain: Optional[bool] else: return response.get("response", {}).get("domaininfo", {}) - def list_ip_information(self, ips: List[str]) -> Dict: + def list_ip_information(self, ips: List[str], resource: str) -> Dict: """ Retrieve information for multiple IP addresses. Args: ips (List[str]): List of IPv4 or IPv6 addresses to fetch information for. + resource (str): The resource type ('ipv4' or 'ipv6'). Returns: Dict: API response containing IP information. @@ -347,64 +405,69 @@ def list_ip_information(self, ips: List[str]) -> Dict: if len(ips) > 100: raise DemistoException("Maximum of 100 IPs can be submitted in a single request.") - ip_data = {'ips': ips} - bulk_ip_response = self._http_request('POST', 'explore/bulk/ip2asn/ipv4', data=ip_data) + ip_data = {"ips": ips} + url_suffix = f"explore/bulk/ip2asn/{resource}" + bulk_ip_response = self._http_request("POST", url_suffix, data=ip_data) return bulk_ip_response - def get_asn_reputation(self, asn: int, explain: bool = False, limit: int = None) -> Dict[str, Any]: + + + def get_asn_reputation(self, asn: int, limit: Optional[int] = None, explain: Optional[bool] = False) -> Dict[str, Any]: """ - Retrieve reputation history for a specific Autonomous System Number (ASN). + Retrieve reputation history for a specific Autonomous System Number (ASN). - Args: - asn (int): The Autonomous System Number to query. - explain (bool, optional): Whether to include detailed explanations. Defaults to False. - limit (int, optional): Maximum number of results to return. Defaults to None. + Args: + asn (int): The Autonomous System Number to query. + limit (int, optional): Maximum number of results to return. Defaults to None. + explain (bool, optional): Whether to include explanation for reputation score. Defaults to False. - Returns: - Dict[str, Any]: ASN reputation history information. - """ + Returns: + Dict[str, Any]: ASN reputation history information. + """ url_suffix = f"explore/ipreputation/history/asn/{asn}" query_params = {} - if explain: - query_params['explain'] = 'true' + if limit: query_params['limit'] = limit + if explain: + query_params['explain'] = 'true' + response = self._http_request( method="GET", url_suffix=url_suffix, params=query_params ) + return response - def get_asn_takedown_reputation(self, args): + def get_asn_takedown_reputation(self, asn: str, limit: Optional[int] = None, explain: bool = False) -> Dict[str, Any]: """ Retrieve takedown reputation for a specific Autonomous System Number (ASN). Args: - args (dict): Arguments containing ASN, optional explain flag, and optional result limit. + asn (str): The ASN number to query + limit (Optional[int]): Maximum results to return + explain (bool): Whether to include explanation for reputation score Returns: - dict: Takedown reputation information for the specified ASN. + Dict[str, Any]: Takedown reputation information for the specified ASN Raises: - ValueError: If ASN is not provided. + ValueError: If ASN is not provided + DemistoException: If API call fails """ - asn = args.get('asn') - explain = argToBoolean(args.get('explain', 'false')) - limit = args.get('limit') - if not asn: - raise ValueError('The "asn" argument is required.') + raise ValueError('ASN is required.') endpoint = f'explore/takedownreputation/asn/{asn}' - params = {} - if explain: - params['explain'] = explain + if limit: params['limit'] = limit + if explain: + params['explain'] = 'true' response = self._http_request( method='GET', @@ -414,32 +477,33 @@ def get_asn_takedown_reputation(self, args): return response.get('response', {}).get('takedown_reputation', {}) - def get_ipv4_reputation(self, ipv4, explain=False, limit=None): - """ - Retrieve reputation history for a specific IPv4 address. - - Args: - ipv4 (str): The IPv4 address to query. - explain (bool, optional): Whether to include detailed explanations. Defaults to False. - limit (int, optional): Maximum number of results to return. Defaults to None. - Returns: - dict: IPv4 reputation history information. - """ + def get_ipv4_reputation(self, ipv4: str, explain: bool = False, limit: int = None) -> List[Dict[str, Any]]: + """ + Retrieve reputation information for an IPv4 address. + """ url_suffix = f"explore/ipreputation/history/ipv4/{ipv4}" - params = {} + query_params = {} + if explain: - params['explain'] = 'true' - if limit is not None: - params['limit'] = limit + query_params['explain'] = 'true' + if limit: + query_params['limit'] = limit - response = self._http_request( - method="GET", + raw_response = self._http_request( + method='GET', url_suffix=url_suffix, - params=params + params=query_params ) - return response + + ipv4_reputation = raw_response.get('response', {}).get('ip_reputation_history', []) + + return ipv4_reputation + + + + def get_job_status(self, job_id: str, max_wait: Optional[int] = None, result_type: Optional[str] = None) -> Dict[str, Any]: """ @@ -478,31 +542,41 @@ def get_job_status(self, job_id: str, max_wait: Optional[int] = None, result_typ return response - def get_nameserver_reputation(self, nameserver: str, explain: Optional[bool] = None, limit: Optional[int] = None) -> Dict[str, Any]: + def get_nameserver_reputation(self, nameserver: str, explain: bool = False, limit: int = None): """ - Retrieve reputation history for a specific nameserver. + Retrieve historical reputation data for the specified name server. - Args: - nameserver (str): The nameserver to query. - explain (bool, optional): Whether to include detailed explanations. Defaults to None. - limit (int, optional): Maximum number of results to return. Defaults to None. + Args: + nameserver (str): The nameserver for which the reputation data is to be fetched. + explain (bool): Whether to include detailed calculation explanations. + limit (int): Maximum number of reputation entries to return. + + Returns: + dict: Reputation history for the given nameserver. + """ + + url_suffix = f"explore/nsreputation/nameserver/{nameserver}" - Returns: - Dict[str, Any]: Nameserver reputation history information. - """ - url_suffix = f"explore/nsreputation/history/nameserver/{nameserver}" params = {} - if explain is not None: - params['explain'] = str(explain).lower() - if limit is not None: + + if explain: + params['explain'] = explain + if limit: params['limit'] = limit - response = self._http_request( - method="GET", - url_suffix=url_suffix, - params=params - ) - return response + + response = self._http_request(method="GET", url_suffix=url_suffix, params=params) + + + + try: + + return response.get('response', {}).get('ns_server_reputation', []) + except AttributeError: + + return {} + + def get_subnet_reputation(self, subnet: str, explain: Optional[bool] = False, limit: Optional[int] = None) -> Dict[str, Any]: """ @@ -550,7 +624,7 @@ def get_asns_for_domain(self, domain: str) -> Dict[str, Any]: ) return response - + def forward_padns_lookup(self, qtype: str, qname: str, **kwargs) -> Dict[str, Any]: """ Perform a forward PADNS lookup using various filtering parameters. @@ -564,9 +638,9 @@ def forward_padns_lookup(self, qtype: str, qname: str, **kwargs) -> Dict[str, An Dict[str, Any]: PADNS lookup results. """ url_suffix = f"explore/padns/lookup/query/{qtype}/{qname}" - + params = {k: v for k, v in kwargs.items() if v is not None} - + response = self._http_request( method="GET", url_suffix=url_suffix, @@ -588,9 +662,9 @@ def reverse_padns_lookup(self, qtype: str, qname: str, **kwargs) -> Dict[str, An Dict[str, Any]: Reverse PADNS lookup results. """ url_suffix = f"explore/padns/lookup/answer/{qtype}/{qname}" - + params = {k: v for k, v in kwargs.items() if v is not None} - + response = self._http_request( method="GET", url_suffix=url_suffix, @@ -598,7 +672,8 @@ def reverse_padns_lookup(self, qtype: str, qname: str, **kwargs) -> Dict[str, An ) return response - + + def density_lookup(self, qtype: str, query: str, **kwargs) -> Dict[str, Any]: """ Perform a density lookup based on various query types and parameters. @@ -612,9 +687,113 @@ def density_lookup(self, qtype: str, query: str, **kwargs) -> Dict[str, Any]: Dict[str, Any]: Density lookup results """ url_suffix = f"explore/padns/lookup/density/{qtype}/{query}" - + params = {k: v for k, v in kwargs.items() if v is not None} - + + response = self._http_request( + method="GET", + url_suffix=url_suffix, + params=params + ) + + return response + + def search_scan_data(self, query: str) -> Dict[str, Any]: + """ + Search the Silent Push scan data repositories. + + Args: + query (str): Query in SPQL syntax to scan data (mandatory) + + Returns: + Dict[str, Any]: Search results from scan data repositories + + Raises: + DemistoException: If query is not provided or API call fails + """ + if not query: + raise DemistoException("Query parameter is required for search scan data.") + + url_suffix = "explore/scandata/search/raw" + + payload = { + "query": query + } + + try: + response = self._http_request( + method="POST", + url_suffix=url_suffix, + data=payload + ) + return response + except Exception as e: + raise DemistoException(f"Failed to search scan data: {str(e)}") + + + def live_url_scan(self, url: str, platform: Optional[str] = None, os: Optional[str] = None, + browser: Optional[str] = None, region: Optional[str] = None) -> Dict[str, Any]: + """ + Perform a live scan of a URL to get hosting metadata. + + Args: + url (str): The URL to scan + platform (str, optional): Device to perform scan with (Desktop, Mobile, Crawler) + os (str, optional): OS to perform scan with (Windows, Linux, MacOS, iOS, Android) + browser (str, optional): Browser to perform scan with (Firefox, Chrome, Edge, Safari) + region (str, optional): Region from where scan should be performed (US, EU, AS, TOR) + + Returns: + Dict[str, Any]: The scan results including hosting metadata + + Raises: + DemistoException: If there's an error during the API call + """ + url_suffix = "explore/tools/scanondemand" + + params = { + 'url': url, + 'platform': platform, + 'os': os, + 'browser': browser, + 'region': region + } + + params = {k: v for k, v in params.items() if v is not None} + + try: + response = self._http_request( + method='GET', + url_suffix=url_suffix, + params=params + ) + return response + except Exception as e: + raise DemistoException(f'Error in live URL scan: {str(e)}') + + def get_future_attack_indicators(self, feed_uuid: str, page_no: int = 1, page_size: int = 10000) -> Dict[str, Any]: + """ + Retrieve indicators of future attack feed from SilentPush. + + Args: + feed_uuid (str): Feed unique identifier to fetch records for. + page_no (int, optional): Page number for pagination. Defaults to 1. + page_size (int, optional): Number of records per page. Defaults to 10000. + + Returns: + Dict[str, Any]: Response containing future attack indicators. + + Raises: + DemistoException: If there's an error during the API call. + """ + url_suffix = "/api/v2/iocs/threat-ranking" + + params = { + 'page': page_no, + 'size': page_size, + 'source_uuids': feed_uuid + } + response = self._http_request( method="GET", url_suffix=url_suffix, @@ -623,10 +802,60 @@ def density_lookup(self, qtype: str, query: str, **kwargs) -> Dict[str, Any]: return response + def screenshot_url(self, url: str) -> Dict[str, Any]: + """ + Generate a screenshot for a given URL and store it in the vault using GET request. + + Args: + url (str): The URL to capture a screenshot of + + Returns: + Dict[str, Any]: Response containing screenshot information and vault details + """ + endpoint = "explore/tools/screenshotondemand" + params = {"url": url} + + try: + + response = self._http_request( + method="GET", + url_suffix=endpoint, + params=params + ) + + if response.get("error"): + raise DemistoException(f"Failed to get screenshot: {response['error']}") + + screenshot_data = response.get("response", {}).get("screenshot", {}) + if not screenshot_data: + raise DemistoException("No screenshot data returned from API") + + screenshot_url = screenshot_data.get("message") + if not screenshot_url: + raise DemistoException("No screenshot URL returned") + + + image_response = requests.get(screenshot_url, verify=self.verify) + if image_response.status_code != 200: + raise DemistoException(f"Failed to download screenshot image: HTTP {image_response.status_code}") + + + filename = f"{url.split('://')[1].split('/')[0]}_screenshot.jpg" + return { + "status_code": screenshot_data.get("response", 200), + "screenshot_url": screenshot_url, + "vault_info": fileResult(filename, image_response.content), + "filename": filename + } + + except Exception as e: + raise DemistoException(f"Error capturing screenshot: {str(e)}") def test_module(client: Client) -> str: try: - client.list_domain_information('silentpush.com') + resp = client.search_domains() + if resp.get("status_code") != 200: + return f"Connection failed :- {resp.get('errors')}" return 'ok' except DemistoException as e: if 'Forbidden' in str(e) or 'Authorization' in str(e): @@ -634,7 +863,20 @@ def test_module(client: Client) -> str: raise e def list_domain_information_command(client: Client, args: Dict[str, Any]) -> CommandResults: - domains_arg = args.get('domains') or args.get('domain') + """ + Handle the list-domain-information command execution. + + Args: + client (Client): The client object for making API calls + args (Dict[str, Any]): Command arguments + + Returns: + CommandResults: Results for XSOAR + + Raises: + DemistoException: If no domains are provided + """ + domains_arg = args.get('domains', '') if not domains_arg: raise DemistoException('No domains provided') @@ -642,49 +884,47 @@ def list_domain_information_command(client: Client, args: Dict[str, Any]) -> Com fetch_risk_score = argToBoolean(args.get('fetch_risk_score', False)) fetch_whois_info = argToBoolean(args.get('fetch_whois_info', False)) - raw_response = client.list_domain_information(domains, fetch_risk_score, fetch_whois_info) + response = client.list_domain_information(domains, fetch_risk_score, fetch_whois_info) markdown = ['# Domain Information Results\n'] - for domain_info in raw_response.get('domains', []): - markdown.append(f"## Domain: {domain_info.get('domain', 'N/A')}") + + for domain_data in response.get('domains', []): + domain = domain_data.get('domain', 'N/A') + markdown.append(f'## Domain: {domain}') basic_info = { - 'Created Date': domain_info.get('whois_created_date', 'N/A'), - 'Updated Date': domain_info.get('whois_updated_date', 'N/A'), - 'Expiration Date': domain_info.get('whois_expiration_date', 'N/A'), - 'Registrar': domain_info.get('registrar', 'N/A'), - 'Status': domain_info.get('status', 'N/A'), - 'Name Servers': domain_info.get('nameservers', 'N/A') + 'Created Date': domain_data.get('whois_created_date', 'N/A'), + 'Updated Date': domain_data.get('whois_updated_date', 'N/A'), + 'Expiration Date': domain_data.get('whois_expiration_date', 'N/A'), + 'Registrar': domain_data.get('registrar', 'N/A'), + 'Status': domain_data.get('status', 'N/A'), + 'Name Servers': domain_data.get('nameservers', 'N/A') } markdown.append(tableToMarkdown('Domain Information', [basic_info])) if fetch_risk_score: risk_info = { - 'Risk Score': domain_info.get('sp_risk_score', 'N/A'), - 'Risk Score Explanation': domain_info.get('sp_risk_score_explain', 'N/A') + 'Risk Score': domain_data.get('risk_score', 'N/A'), + 'Risk Score Explanation': domain_data.get('risk_score_explanation', 'N/A') } markdown.append(tableToMarkdown('Risk Assessment', [risk_info])) - if fetch_whois_info and domain_info.get('whois_info') != 'N/A': - whois_info = domain_info.get('whois_info', {}) - if isinstance(whois_info, dict): - whois_data = { - 'Registrant Name': whois_info.get('registrant_name', 'N/A'), - 'Registrant Organization': whois_info.get('registrant_organization', 'N/A'), - 'Registrant Email': whois_info.get('registrant_email', 'N/A'), - 'Admin Email': whois_info.get('admin_email', 'N/A'), - 'Tech Email': whois_info.get('tech_email', 'N/A') - } - markdown.append(tableToMarkdown('WHOIS Information', [whois_data])) + if fetch_whois_info: + whois_info = domain_data.get('whois_info', {}) + if whois_info and isinstance(whois_info, dict): + if 'error' in whois_info: + markdown.append(f'WHOIS Error: {whois_info["error"]}') + else: + markdown.append(tableToMarkdown('WHOIS Information', [whois_info])) markdown.append('\n---\n') return CommandResults( outputs_prefix='SilentPush.Domain', outputs_key_field='domain', - outputs=raw_response.get('domains', []), + outputs=response.get('domains', []), readable_output='\n'.join(markdown), - raw_response=raw_response + raw_response=response ) def get_domain_certificates_command(client: Client, args: Dict[str, Any]) -> CommandResults: @@ -744,6 +984,7 @@ def get_domain_certificates_command(client: Client, args: Dict[str, Any]) -> Com except Exception as e: raise DemistoException(f"Error retrieving certificates for domain '{domain}': {str(e)}") + def search_domains_command(client: Client, args: dict) -> CommandResults: query = args.get('query') start_date = args.get('start_date') @@ -819,23 +1060,45 @@ def search_domains_command(client: Client, args: dict) -> CommandResults: ) def list_domain_infratags_command(client: Client, args: dict) -> CommandResults: + """ + Command function to retrieve domain infratags with optional cluster details. + + Args: + client (Client): SilentPush API client. + args (dict): Command arguments. + + Returns: + CommandResults: Formatted results of the infratags lookup. + """ domains = argToList(args.get('domains', '')) cluster = argToBoolean(args.get('cluster', False)) mode = args.get('mode', 'live') match = args.get('match', 'self') as_of = args.get('as_of', None) + origin_uid = args.get('origin_uid', None) + use_get = argToBoolean(args.get('use_get', False)) - if not domains: - raise ValueError('"domains" argument is required and cannot be empty.') + if not domains and not use_get: + raise ValueError('"domains" argument is required when using POST.') - raw_response = client.list_domain_infratags(domains, cluster, mode, match, as_of) + raw_response = client.list_domain_infratags(domains, cluster, mode, match, as_of, origin_uid, use_get) infratags = raw_response.get('response', {}).get('infratags', []) tag_clusters = raw_response.get('response', {}).get('tag_clusters', []) + readable_output = tableToMarkdown('Domain Infratags', infratags) - if tag_clusters: - readable_output += tableToMarkdown('Domain Tag Clusters', tag_clusters) + + if cluster and tag_clusters: + cluster_details = [] + for cluster in tag_clusters: + for key, value in cluster.items(): + cluster_details.append({'Cluster Level': key, 'Details': value}) + + readable_output += tableToMarkdown('Domain Tag Clusters', cluster_details) + + if cluster and not tag_clusters: + readable_output += "\n\n**No tag cluster data returned by the API.**" return CommandResults( outputs_prefix='SilentPush.InfraTags', @@ -846,6 +1109,8 @@ def list_domain_infratags_command(client: Client, args: dict) -> CommandResults: ) + + def get_enrichment_data_command(client: Client, args: dict) -> CommandResults: resource = args.get("resource") value = args.get("value") @@ -855,8 +1120,12 @@ def get_enrichment_data_command(client: Client, args: dict) -> CommandResults: if not resource or not value: raise ValueError("Both 'resource' and 'value' arguments are required.") - if resource in ["ip", "ipv4", "ipv6"] and not validate_ip_address(value, allow_ipv6=(resource != "ipv4")): - raise DemistoException(f"Invalid {resource.upper()} address: {value}") + + if resource in ["ipv4", "ipv6"]: + is_valid_ip = client.validate_ip_address(value, allow_ipv6=(resource == "ipv6")) + if not is_valid_ip: + raise DemistoException(f"Invalid {resource.upper()} address: {value}") + enrichment_data = client.get_enrichment_data(resource, value, explain, scan_data) @@ -886,126 +1155,243 @@ def get_enrichment_data_command(client: Client, args: dict) -> CommandResults: def list_ip_information_command(client: Client, args: Dict[str, Any]) -> CommandResults: ips = argToList(args.get("ips", "")) + if not ips: raise ValueError("The 'ips' parameter is required.") - try: - valid_ips = validate_ip_inputs(ips) - except DemistoException as e: - return CommandResults( - readable_output=str(e), - outputs_prefix="SilentPush.Error", - outputs_key_field="error", - outputs={"error": str(e)} - ) - response = client.list_ip_information(valid_ips) - outputs = response.get("response", {}).get("ip2asn", []) + ipv4_addresses = [] + ipv6_addresses = [] + + for ip in ips: + if client.validate_ip_address(ip, allow_ipv6=False): + ipv4_addresses.append(ip) + elif client.validate_ip_address(ip, allow_ipv6=True): + ipv6_addresses.append(ip) + + + results = [] + if ipv4_addresses: + ipv4_info = client.list_ip_information(ipv4_addresses, resource="ipv4") + results.extend(ipv4_info.get("response", {}).get("ip2asn", [])) + + if ipv6_addresses: + ipv6_info = client.list_ip_information(ipv6_addresses, resource="ipv6") + results.extend(ipv6_info.get("response", {}).get("ip2asn", [])) - if not outputs: + if not results: return CommandResults( - readable_output=f"No information found for IPs: {', '.join(valid_ips)}", + readable_output=f"No information found for IPs: {', '.join(ips)}", outputs_prefix="SilentPush.IPInformation", outputs_key_field="ip", outputs=[], - raw_response=response + raw_response={"ips": ips, "results": results}, ) + readable_output = tableToMarkdown( "Comprehensive IP Information", - outputs, - removeNull=True + results, + removeNull=True, ) return CommandResults( outputs_prefix="SilentPush.IPInformation", outputs_key_field="ip", - outputs=outputs, + outputs=results, readable_output=readable_output, - raw_response=response + raw_response={"ips": ips, "results": results}, ) -def get_asn_reputation_command(self, args: dict) -> CommandResults: - asn = args.get("asn") - explain = argToBoolean(args.get("explain", False)) - limit = arg_to_number(args.get("limit", None)) - if not asn: + + + +def get_asn_reputation_command(client: Client, args: dict) -> CommandResults: + """ + Command handler for retrieving ASN reputation data. + + Args: + client (Client): The API client instance + args (dict): Command arguments containing: + - asn: ASN number + - limit (optional): Maximum results to return + - explain (optional): Whether to include explanation + + Returns: + CommandResults: Formatted command results for XSOAR + """ + asn = args.get("asn") + limit = arg_to_number(args.get("limit", None)) + explain = argToBoolean(args.get("explain", False)) + + if not asn: raise ValueError("ASN is required.") try: - asn_reputation_data = self.get_asn_reputation(asn, explain, limit) + raw_response = client.get_asn_reputation(asn, limit, explain) + + response_data = raw_response.get('response', {}) + asn_reputation = response_data.get('asn_reputation') or response_data.get('asn_reputation_history', []) + + asn_reputation = sorted( + asn_reputation, + key=lambda x: x.get('date', ''), + reverse=True + ) - command_results = CommandResults( + if not asn_reputation: + return CommandResults( + readable_output=f"No reputation data found for ASN {asn}.", + outputs_prefix="SilentPush.ASNReputation", + outputs_key_field="asn", + outputs=[], + raw_response=raw_response + ) + + data_for_table = [] + for entry in asn_reputation: + row = { + 'ASN': entry.get('asn'), + 'Reputation': entry.get('asn_reputation'), + 'ASName': entry.get('asname'), + 'Date': entry.get('date') + } + if explain and entry.get('explanation'): + row['Explanation'] = entry.get('explanation') + data_for_table.append(row) + + headers = ['ASN', 'Reputation', 'ASName', 'Date'] + if explain: + headers.append('Explanation') + + readable_output = tableToMarkdown( + f'ASN Reputation for {asn}', + data_for_table, + headers=headers, + removeNull=True + ) + + return CommandResults( outputs_prefix="SilentPush.ASNReputation", outputs_key_field="asn", - outputs=asn_reputation_data, - raw_response=asn_reputation_data + outputs={ + 'asn': asn, + 'reputation_data': asn_reputation + }, + readable_output=readable_output, + raw_response=raw_response ) - return command_results - except Exception as e: raise DemistoException(f"Error retrieving ASN reputation data: {str(e)}") -def get_asn_takedown_reputation_command(client: Client, args): - - takedown_reputation = client.get_asn_takedown_reputation(args) +def get_asn_takedown_reputation_command(client: Client, args: dict) -> CommandResults: + """ + Command handler for retrieving ASN takedown reputation. + + Args: + client (Client): The API client instance + args (dict): Command arguments + + Returns: + CommandResults: Command results for XSOAR + """ asn = args.get('asn') + if not asn: + raise ValueError('ASN is a required parameter') - readable_output = tableToMarkdown( - f'Takedown Reputation for ASN {asn}', - [takedown_reputation], - headers=['asn', 'asname', 'asn_allocation_date', 'asn_takedown_reputation'] - ) + try: + limit = int(args.get('limit')) if args.get('limit') else None + except ValueError: + raise ValueError('limit must be a valid number') - return CommandResults( - outputs_prefix='SilentPush.TakedownReputation', - outputs_key_field='asn', - outputs=takedown_reputation, - readable_output=readable_output, - raw_response=takedown_reputation - ) + explain = argToBoolean(args.get('explain', False)) + + try: + response = client.get_asn_takedown_reputation(asn=asn, limit=limit, explain=explain) + + if not response: + return CommandResults( + readable_output=f'No takedown reputation data found for ASN {asn}', + outputs_prefix='SilentPush.ASNTakedownReputation', + outputs=None + ) + + reputation_data = { + 'ASN': response.get('asn', asn), + 'AS Name': response.get('asname', 'N/A'), + 'Allocation Date': response.get('asn_allocation_date', 'N/A'), + 'Takedown Reputation': response.get('asn_takedown_reputation', 'N/A'), + 'Allocation Age': response.get('asn_allocation_age', 'N/A') + } + + headers = ['ASN', 'AS Name', 'Allocation Date', 'Takedown Reputation', 'Allocation Age'] + + readable_output = tableToMarkdown( + f'ASN Takedown Reputation Information for {asn}', + [reputation_data], + headers=headers, + removeNull=True + ) + + return CommandResults( + readable_output=readable_output, + outputs_prefix='SilentPush.ASNTakedownReputation', + outputs_key_field='asn', + outputs=reputation_data, + raw_response=response + ) -def get_ipv4_reputation_command(client: Client, args: dict) -> CommandResults: + except Exception as e: + raise DemistoException(f'Error retrieving ASN takedown reputation: {str(e)}') + + +def get_ipv4_reputation_command(client: Client, args: Dict[str, Any]) -> CommandResults: ipv4 = args.get('ipv4') if not ipv4: - raise ValueError("The 'ipv4' parameter is required.") + raise DemistoException("IPv4 address is required") - if not validate_ip_address(ipv4, allow_ipv6=False): - raise DemistoException(f"Invalid IPv4 address: {ipv4}") + explain = argToBoolean(args.get('explain', "false")) + limit = arg_to_number(args.get('limit')) - explain = argToBoolean(args.get('explain', False)) - limit = arg_to_number(args.get('limit', None)) + raw_response = client.get_ipv4_reputation(ipv4, explain, limit) + + + if not raw_response: - try: - raw_response = client.get_ipv4_reputation(ipv4, explain, limit) - except Exception as e: return CommandResults( - readable_output=f"Error: {str(e)}", - raw_response={}, - outputs_prefix='SilentPush.Error', - outputs_key_field='error' + readable_output=f"No reputation data found for IPv4: {ipv4}", + outputs_prefix='SilentPush.IPv4Reputation', + outputs_key_field='ip', + outputs={'ip': ipv4}, + raw_response=raw_response ) - ip_reputation = raw_response.get('response', {}).get('ip_reputation_history', []) + latest_reputation = raw_response[0] + + + + reputation_data = { + 'IP': latest_reputation.get('ip', ipv4), + 'Date': latest_reputation.get('date'), + 'Reputation Score': latest_reputation.get('ip_reputation') + } - if not ip_reputation: - readable_output = f"No reputation information found for IPv4: {ipv4}" - else: - readable_output = tableToMarkdown(f"IPv4 Reputation for {ipv4}", ip_reputation) + readable_output = tableToMarkdown( + f'IPv4 Reputation Information for {ipv4}', + [reputation_data] + ) return CommandResults( outputs_prefix='SilentPush.IPv4Reputation', outputs_key_field='ip', - outputs={ - 'ip': ipv4, - 'reputation_history': ip_reputation - }, + outputs=reputation_data, readable_output=readable_output, raw_response=raw_response ) + def get_job_status_command(client: Client, args: dict) -> CommandResults: job_id = args.get('job_id') max_wait = arg_to_number(args.get('max_wait')) @@ -1046,43 +1432,49 @@ def get_job_status_command(client: Client, args: dict) -> CommandResults: outputs={'error': str(e)} ) -def get_nameserver_reputation_command(client: Client, args: Dict[str, Any]) -> CommandResults: - nameserver = args.get('nameserver') - if not nameserver: - raise DemistoException("The 'nameserver' parameter is required.") +def get_nameserver_reputation_command(client: Client, args: dict) -> CommandResults: + """ + Command handler for retrieving nameserver reputation. - explain = argToBoolean(args.get('explain', False)) - limit = arg_to_number(args.get('limit')) + Args: + client (Client): The API client instance. + args (dict): Command arguments. + + Returns: + CommandResults: The command results containing nameserver reputation data. + """ + nameserver = args.get("nameserver") + explain = argToBoolean(args.get("explain", "false")) + limit = arg_to_number(args.get("limit")) + + if not nameserver: + raise ValueError("Nameserver is required.") try: - raw_response = client.get_nameserver_reputation(nameserver, explain, limit) - reputation_history = raw_response.get('response', {}).get('ns_server_reputation_history', []) - if not reputation_history: - readable_output = f"No reputation history found for nameserver: {nameserver}" - else: + reputation_data = client.get_nameserver_reputation(nameserver, explain, limit) + + + if reputation_data: readable_output = tableToMarkdown( f"Nameserver Reputation for {nameserver}", - reputation_history, + reputation_data, + headers=["ns_server", "ns_server_reputation", "date"], removeNull=True ) + else: + readable_output = f"No reputation history found for nameserver: {nameserver}" return CommandResults( - outputs_prefix='SilentPush.NameserverReputation', - outputs_key_field='nameserver', - outputs={'nameserver': nameserver, 'reputation_history': reputation_history}, + outputs_prefix="SilentPush.NameserverReputation", + outputs_key_field="ns_server", + outputs={"nameserver": nameserver, "reputation_data": reputation_data}, readable_output=readable_output, - raw_response=raw_response + raw_response=reputation_data ) except Exception as e: - return CommandResults( - readable_output=f"Error retrieving nameserver reputation: {str(e)}", - raw_response={}, - outputs_prefix='SilentPush.Error', - outputs_key_field='error', - outputs={'error': str(e)} - ) + raise DemistoException(f"Error retrieving nameserver reputation: {e}") def get_subnet_reputation_command(client: Client, args: dict) -> CommandResults: subnet = args.get('subnet') @@ -1168,7 +1560,7 @@ def get_asns_for_domain_command(client: Client, args: dict) -> CommandResults: outputs_key_field='error', outputs={'error': str(e)} ) - + def forward_padns_lookup_command(client: Client, args: dict) -> CommandResults: """ Command function to perform forward PADNS lookup. @@ -1226,7 +1618,7 @@ def forward_padns_lookup_command(client: Client, args: dict) -> CommandResults: ) records = raw_response.get('response', {}).get('records', []) - + if not records: readable_output = f"No records found for {qtype} {qname}" else: @@ -1256,7 +1648,6 @@ def forward_padns_lookup_command(client: Client, args: dict) -> CommandResults: outputs_key_field='error', outputs={'error': str(e)} ) - def reverse_padns_lookup_command(client: Client, args: dict) -> CommandResults: """ Command function to perform reverse PADNS lookup. @@ -1274,45 +1665,29 @@ def reverse_padns_lookup_command(client: Client, args: dict) -> CommandResults: if not qtype or not qname: raise DemistoException("Both 'qtype' and 'qname' are required parameters.") - netmask = args.get('netmask') - subdomains = argToBoolean(args.get('subdomains')) if 'subdomains' in args else None - regex = args.get('regex') - first_seen_after = args.get('first_seen_after') - first_seen_before = args.get('first_seen_before') - last_seen_after = args.get('last_seen_after') - last_seen_before = args.get('last_seen_before') - as_of = args.get('as_of') - sort = args.get('sort') - output_format = args.get('output_format') - prefer = args.get('prefer') - with_metadata = argToBoolean(args.get('with_metadata')) if 'with_metadata' in args else None - max_wait = arg_to_number(args.get('max_wait')) - skip = arg_to_number(args.get('skip')) - limit = arg_to_number(args.get('limit')) + + filtered_args = { + key: value + for key, value in args.items() + if key not in ('qtype', 'qname') + } try: + raw_response = client.reverse_padns_lookup( qtype=qtype, qname=qname, - netmask=netmask, - subdomains=subdomains, - regex=regex, - first_seen_after=first_seen_after, - first_seen_before=first_seen_before, - last_seen_after=last_seen_after, - last_seen_before=last_seen_before, - as_of=as_of, - sort=sort, - output_format=output_format, - prefer=prefer, - with_metadata=with_metadata, - max_wait=max_wait, - skip=skip, - limit=limit + **filtered_args ) + + if raw_response.get('error'): + raise DemistoException( + f"API Error: {raw_response.get('error')}" + ) + + records = raw_response.get('response', {}).get('records', []) - if not records: readable_output = f"No records found for {qtype} {qname}" else: @@ -1325,11 +1700,7 @@ def reverse_padns_lookup_command(client: Client, args: dict) -> CommandResults: return CommandResults( outputs_prefix='SilentPush.ReversePADNSLookup', outputs_key_field='qname', - outputs={ - 'qtype': qtype, - 'qname': qname, - 'records': records - }, + outputs={'qtype': qtype, 'qname': qname, 'records': records}, readable_output=readable_output, raw_response=raw_response ) @@ -1342,7 +1713,10 @@ def reverse_padns_lookup_command(client: Client, args: dict) -> CommandResults: outputs_key_field='error', outputs={'error': str(e)} ) - + + + + def density_lookup_command(client: Client, args: dict) -> CommandResults: """ Command function to perform density lookup. @@ -1370,7 +1744,7 @@ def density_lookup_command(client: Client, args: dict) -> CommandResults: ) records = raw_response.get('response', {}).get('records', []) - + if not records: readable_output = f"No density records found for {qtype} {query}" else: @@ -1401,6 +1775,227 @@ def density_lookup_command(client: Client, args: dict) -> CommandResults: outputs={'error': str(e)} ) +def search_scan_data_command(client: Client, args: dict) -> CommandResults: + """ + Search scan data command handler. + + Args: + client (Client): SilentPush API client + args (dict): Command arguments: + - query (str): Required. SPQL syntax query + + Returns: + CommandResults: Command results with formatted output + """ + query = args.get('query') + if not query: + raise ValueError('Query parameter is required') + + try: + raw_response = client.search_scan_data(query=query) + + scan_data = raw_response.get('response', {}).get('scandata_raw', []) + + if not scan_data: + return CommandResults( + readable_output="No scan data records found", + outputs_prefix='SilentPush.ScanData', + outputs=None + ) + readable_output = tableToMarkdown( + "Raw Scan Data Results", + scan_data, + removeNull=True + ) + + return CommandResults( + outputs_prefix='SilentPush.ScanData', + outputs_key_field='domain', + outputs={ + 'records': scan_data, + 'query': query + }, + readable_output=readable_output, + raw_response=raw_response + ) + + except Exception as e: + raise DemistoException(f"Error in search scan data command: {str(e)}") + + + +def live_url_scan_command(client: Client, args: dict) -> CommandResults: + """ + Command handler for live URL scan command. + + Args: + client (Client): The SilentPush API client + args (dict): Command arguments + + Returns: + CommandResults: Results of the URL scan + """ + url = args.get('url') + if not url: + raise DemistoException("URL is a required parameter") + + platform = args.get('platform') + os = args.get('os') + browser = args.get('browser') + region = args.get('region') + valid_platforms = ['Desktop', 'Mobile', 'Crawler'] + if platform and platform not in valid_platforms: + raise DemistoException(f"Invalid platform. Must be one of: {', '.join(valid_platforms)}") + + valid_os = ['Windows', 'Linux', 'MacOS', 'iOS', 'Android'] + if os and os not in valid_os: + raise DemistoException(f"Invalid OS. Must be one of: {', '.join(valid_os)}") + + valid_browsers = ['Firefox', 'Chrome', 'Edge', 'Safari'] + if browser and browser not in valid_browsers: + raise DemistoException(f"Invalid browser. Must be one of: {', '.join(valid_browsers)}") + + valid_regions = ['US', 'EU', 'AS', 'TOR'] + if region and region not in valid_regions: + raise DemistoException(f"Invalid region. Must be one of: {', '.join(valid_regions)}") + + try: + raw_response = client.live_url_scan(url, platform, os, browser, region) + scan_results = raw_response.get('response', {}).get('scan', {}) + + if not isinstance(scan_results, dict): + readable_output = f"Unexpected response format for URL scan. Response: {scan_results}" + elif not scan_results: + readable_output = f"No scan results found for URL: {url}" + else: + headers = list(scan_results.keys()) + readable_output = tableToMarkdown( + f"URL Scan Results for {url}", + [scan_results], + headers=headers, + removeNull=True + ) + + return CommandResults( + outputs_prefix='SilentPush.URLScan', + outputs_key_field='url', + outputs={ + 'url': url, + 'scan_results': scan_results + }, + readable_output=readable_output, + raw_response=raw_response + ) + + except Exception as e: + return CommandResults( + readable_output=f"Error performing URL scan: {str(e)}", + raw_response={}, + outputs_prefix='SilentPush.Error', + outputs_key_field='error', + outputs={'error': str(e)} + ) + + + +def get_future_attack_indicators_command(client: Client, args: dict) -> CommandResults: + """ + Command handler for retrieving indicators of future attack feed. + + Args: + client (Client): SilentPush API client instance + args (dict): Command arguments + + Returns: + CommandResults: Results for XSOAR + + Raises: + ValueError: If required parameters are missing + """ + feed_uuid = args.get('feed_uuid') + if not feed_uuid: + raise ValueError("feed_uuid is a required parameter") + + page_no = arg_to_number(args.get('page_no', 1)) + page_size = arg_to_number(args.get('page_size', 10000)) + + try: + raw_response = client.get_future_attack_indicators( + feed_uuid=feed_uuid, + page_no=page_no, + page_size=page_size + ) + headers = list(raw_response[0].keys()) + readable_output = tableToMarkdown( + f"# Future Attack Indicators\nFeed UUID: {feed_uuid}\n", + raw_response, + headers=headers, + removeNull=True + ) + return CommandResults( + outputs_prefix='SilentPush.FutureAttackIndicators', + outputs_key_field='feed_uuid', + outputs={ + 'feed_uuid': feed_uuid, + 'page_no': page_no, + 'page_size': page_size, + 'indicators': raw_response + }, + readable_output=readable_output, + raw_response=raw_response + ) + + except Exception as e: + return CommandResults( + readable_output=f"Error retrieving future attack indicators: {str(e)}", + raw_response={}, + outputs_prefix='SilentPush.Error', + outputs_key_field='error', + outputs={'error': str(e)} + ) + + +def screenshot_url_command(client: Client, args: Dict[str, Any]) -> CommandResults: + """ + Command handler for taking URL screenshots + + Args: + client (Client): SilentPush API client instance + args (Dict[str, Any]): Command arguments + + Returns: + CommandResults: Results including screenshot data and vault info + """ + url = args.get("url") + if not url: + raise ValueError("URL is required") + + try: + result = client.screenshot_url(url) + readable_output = f"### Screenshot captured for {url}\n" + readable_output += f"- Status: Success\n" + readable_output += f"- Screenshot URL: {result['screenshot_url']}\n" + readable_output += f"- File ID: {result['vault_info']['FileID']}" + readable_output += f"- File Name: {result['filename']}" + + + return CommandResults( + outputs_prefix="SilentPush.Screenshot", + outputs_key_field="url", + outputs={ + "url": url, + "status": "success", + "status_code": result["status_code"], + "screenshot_url": result["screenshot_url"], + "file_id": result["vault_info"]["FileID"], + "file_name": result["filename"] + }, + readable_output=readable_output, + raw_response=result + ) + + except Exception as e: + raise DemistoException(f"Failed to capture screenshot: {str(e)}") def main(): @@ -1437,7 +2032,11 @@ def main(): 'silentpush-get-asns-for-domain': get_asns_for_domain_command, 'silentpush-forward-padns-lookup': forward_padns_lookup_command, 'silentpush-reverse-padns-lookup': reverse_padns_lookup_command, - 'silentpush-density-lookup': density_lookup_command + 'silentpush-density-lookup': density_lookup_command, + 'silentpush-search-scan-data': search_scan_data_command, + 'silentpush-live-url-scan': live_url_scan_command, + 'silentpush-get-future-attack-indicators': get_future_attack_indicators_command, + 'silentpush-screenshot-url': screenshot_url_command } if command in command_handlers: