diff --git a/Packs/SilentPush/Integrations/SilentPush/SilentPush.py b/Packs/SilentPush/Integrations/SilentPush/SilentPush.py index fc56db7e38c2..82fae6d7b0fb 100644 --- a/Packs/SilentPush/Integrations/SilentPush/SilentPush.py +++ b/Packs/SilentPush/Integrations/SilentPush/SilentPush.py @@ -1,37 +1,152 @@ -import demistomock as demisto # noqa: F401 -from CommonServerPython import * # noqa: F401 -"""Base Integration for Cortex XSOAR (aka Demisto) - -This is an integration to interact with the SilentPush API and provide functionality within XSOAR. - -Developer Documentation: https://xsoar.pan.dev/docs/welcome -Code Conventions: https://xsoar.pan.dev/docs/integrations/code-conventions -Linting: https://xsoar.pan.dev/docs/integrations/linting -""" - -from CommonServerUserPython import * # noqa +import demistomock as demisto +from CommonServerPython import * +from CommonServerUserPython import * +import enum +import json import urllib3 -from typing import Any +import dateparser +import traceback +from typing import Any, Dict, List, Optional, Union # Disable insecure warnings urllib3.disable_warnings() - -def mock_debug(message): - """Print debug messages to the XSOAR logs""" - print(f"DEBUG: {message}") - - -demisto.debug = mock_debug - ''' CONSTANTS ''' -DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' # ISO8601 format with UTC, default in XSOAR +DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' + +# API ENDPOINTS +JOB_STATUS = "explore/job" +NAMESERVER_REPUTATION = "explore/nsreputation/nameserver" +SUBNET_REPUTATION = "explore/ipreputation/history/subnet" +ASNS_DOMAIN = "explore/padns/lookup/domain/asns" + +''' COMMANDS INPUTS ''' + +JOB_STATUS_INPUTS = [ + InputArgument(name='job_id', # option 1 + description='ID of the job returned by Silent Push actions.', + required=True), + InputArgument(name='max_wait', + description='Number of seconds to wait for results (0-25 seconds).'), + InputArgument(name='result_type', + description='Type of result to include in the response.') + ] +NAMESERVER_REPUTATION_INPUTS = [ + InputArgument(name='nameserver', + description='Nameserver name for which information needs to be retrieved', + required=True), + InputArgument(name='explain', + description='Show the information used to calculate the reputation score'), + InputArgument(name='limit', + description='The maximum number of reputation history to retrieve') + ] +SUBNET_REPUTATION_INPUTS = [ + InputArgument( + name='subnet', + description='IPv4 subnet for which reputation information needs to be retrieved.', + required=True + ), + InputArgument( + name='explain', + description='Show the detailed information used to calculate the reputation score.' + ), + InputArgument( + name='limit', + description='Maximum number of reputation history entries to retrieve.' + ) + ] +ASNS_DOMAIN_INPUTS = [ + InputArgument(name='domain', # option 1 + description='Domain name to search ASNs for. Retrieves ASNs associated with A records for the specified domain and its subdomains in the last 30 days.', + required=True) + ] + + + +''' COMMANDS OUTPUTS ''' + +JOB_STATUS_OUTPUTS = [ + OutputArgument(name='get', output_type=str, description='URL to retrieve the job status.'), + OutputArgument(name='job_id', output_type=str, description='Unique identifier for the job.'), + OutputArgument(name='status', output_type=str, description='Current status of the job.') + ] + +NAMESERVER_REPUTATION_OUTPUTS = [ + OutputArgument(name='date', output_type=int, description='Date of the reputation history entry (in YYYYMMDD format).'), + OutputArgument(name='ns_server', output_type=str, description='Name of the nameserver associated with the reputation history entry.'), + OutputArgument(name='ns_server_reputation', output_type=int, description='Reputation score of the nameserver on the specified date.'), + OutputArgument(name='ns_server_reputation_explain', output_type=dict, description='Explanation of the reputation score, including domain density and listed domains.'), + OutputArgument(name='ns_server_domain_density', output_type=int, description='Number of domains associated with the nameserver.'), + OutputArgument(name='ns_server_domains_listed', output_type=int, description='Number of domains listed in reputation databases.') + ] +SUBNET_REPUTATION_OUTPUTS = [ + OutputArgument(name='date', output_type=int, description='The date of the subnet reputation record.'), + OutputArgument(name='subnet', output_type=str, description='The subnet associated with the reputation record.'), + OutputArgument(name='subnet_reputation', output_type=int, description='The reputation score of the subnet.'), + OutputArgument(name='ips_in_subnet', output_type=int, description='Total number of IPs in the subnet.'), + OutputArgument(name='ips_num_active', output_type=int, description='Number of active IPs in the subnet.'), + OutputArgument(name='ips_num_listed', output_type=int, description='Number of listed IPs in the subnet.') + ] +ASNS_DOMAIN_OUTPUTS = [ + OutputArgument(name='domain', output_type=str, description='The domain name for which ASNs are retrieved.'), + OutputArgument(name='domain_asns', output_type=dict, description='Dictionary of Autonomous System Numbers (ASNs) associated with the domain.') + ] + + + +metadata_collector = YMLMetadataCollector( + integration_name="SilentPush", + description=( + "The Silent Push Platform uses first-party data and a proprietary scanning engine to enrich global DNS data " + "with risk and reputation scoring, giving security teams the ability to join the dots across the entire IPv4 and IPv6 range, " + "and identify adversary infrastructure before an attack is launched. The content pack integrates with the Silent Push system " + "to gain insights into domain/IP information, reputations, enrichment, and infratag-related details. It also provides " + "functionality to live-scan URLs and take screenshots of them. Additionally, it allows fetching future attack feeds " + "from the Silent Push system." + ), + display="SilentPush", + category="Data Enrichment & Threat Intelligence", + docker_image="demisto/python3:3.11.10.116949", + is_fetch=False, + long_running=False, + long_running_port=False, + is_runonce=False, + integration_subtype="python3", + integration_type="python", + fromversion="5.0.0", + conf=[ + ConfKey( + name="url", + display="Base URL", + required=True, + default_value="https://api.silentpush.com" + ), + ConfKey( + name="credentials", + display="API Key", + required=False, + key_type=ParameterTypes.TEXT_AREA_ENCRYPTED, + ), + ConfKey( + name="insecure", + display="Trust any certificate (not secure)", + required=False, + key_type=ParameterTypes.BOOLEAN + ), + ConfKey( + name="proxy", + display="Use system proxy settings", + required=False, + key_type=ParameterTypes.BOOLEAN + ) + ] +) + ''' CLIENT CLASS ''' - class Client(BaseClient): """Client class to interact with the SilentPush API @@ -44,7 +159,7 @@ class Client(BaseClient): def __init__(self, base_url: str, api_key: str, verify: bool = True, proxy: bool = False): """ Initializes the client with the necessary parameters. - + Args: base_url (str): The base URL for the SilentPush API. api_key (str): The API key for authentication. @@ -59,32 +174,25 @@ def __init__(self, base_url: str, api_key: str, verify: bool = True, proxy: bool 'X-API-Key': api_key, 'Content-Type': 'application/json' } - demisto.debug(f'Initialized client with base URL: {self.base_url}') def _http_request(self, method: str, url_suffix: str, params: dict = None, data: dict = None) -> Any: """ - Handles the HTTP requests to the SilentPush API. - - This function builds the request URL, adds the necessary headers, and sends a request - to the API. It returns the response in JSON format. - + Perform an HTTP request to the SilentPush API. + Args: - method (str): The HTTP method (GET, POST, etc.). - url_suffix (str): The specific endpoint to be appended to the base URL. - params (dict, optional): The URL parameters to be sent with the request. - data (dict, optional): The data to be sent with the request. - + method (str): The HTTP method to use (e.g., 'GET', 'POST'). + url_suffix (str): The endpoint suffix to append to the base URL. + params (dict, optional): Query parameters to include in the request. Defaults to None. + data (dict, optional): JSON data to send in the request body. Defaults to None. + Returns: - Any: The JSON response from the API. - + Any: The JSON response from the API or text response if not JSON. + Raises: - DemistoException: If there is an error in the API response. + DemistoException: If there's an error during the API call. """ - full_url = f'{self.base_url}{url_suffix}' - masked_headers = {k: v if k != 'X-API-Key' else '****' for k, v in self._headers.items()} - demisto.debug(f'Headers: {masked_headers}') - demisto.debug(f'Params: {params}') - demisto.debug(f'Data: {data}') + base_url = demisto.params().get('url', 'https://api.silentpush.com') if url_suffix.startswith("/api/v2/") else self.base_url + full_url = f'{base_url}{url_suffix}' try: response = requests.request( @@ -95,85 +203,352 @@ def _http_request(self, method: str, url_suffix: str, params: dict = None, data: params=params, json=data ) - demisto.debug(f'Response status code: {response.status_code}') - demisto.debug(f'Response body: {response.text}') - - if response.status_code not in {200, 201}: - raise DemistoException(f'Error in API call [{response.status_code}] - {response.text}') - return response.json() + if response.headers.get('Content-Type', '').startswith('application/json'): + return response.json() + else: + return response.text except Exception as e: - demisto.error(f'Error in API call: {str(e)}') - raise + raise DemistoException(f'Error in API call: {str(e)}') - def list_domain_information(self, domain: str) -> dict: + + def get_job_status(self, job_id: str, max_wait: Optional[int] = None, result_type: Optional[str] = None) -> Dict[str, Any]: """ - Fetches domain information such as WHOIS data, domain age, and risk scores. + Retrieve the status of a specific job. + + Args: + job_id (str): The unique identifier of the job to check. + max_wait (int, optional): Maximum wait time in seconds. Must be between 0 and 25. Defaults to None. + result_type (str, optional): Type of result to retrieve. Defaults to None. + + Returns: + Dict[str, Any]: Job status information. + + Raises: + ValueError: If max_wait is invalid or result_type is not in allowed values. + """ + url_suffix = f"{JOB_STATUS}/{job_id}" + params = {} + + if max_wait is not None: + if not (0 <= max_wait <= 25): + raise ValueError("max_wait must be an integer between 0 and 25") + params['max_wait'] = max_wait + + valid_result_types = {'Status', 'Include Metadata', 'Exclude Metadata'} + if result_type and result_type not in valid_result_types: + raise ValueError(f"result_type must be one of {valid_result_types}") + if result_type: + params['result_type'] = result_type + + return self._http_request(method="GET", url_suffix=url_suffix, params=params) + + def get_nameserver_reputation(self, nameserver: str, explain: bool = False, limit: int = None): + """ + Retrieve historical reputation data for the specified nameserver. + Args: - domain (str): The domain to fetch information for. - + nameserver (str): The nameserver for which the reputation data is to be fetched. + explain (bool): Whether to include detailed calculation explanations. + limit (int): Maximum number of reputation entries to return. + Returns: - dict: A dictionary containing domain information fetched from the API. + dict: Reputation history for the given nameserver. """ - demisto.debug(f'Fetching domain information for domain: {domain}') - url_suffix = f'explore/domain/domaininfo/{domain}' - return self._http_request('GET', url_suffix) + url_suffix = f"{NAMESERVER_REPUTATION}/{nameserver}" -def test_module(client: Client) -> str: - """ - Tests connectivity to the SilentPush API and checks the authentication status. - - This function will validate the API key and ensure that the client can successfully connect - to the API. It is called when running the 'Test' button in XSOAR. - - Args: - client (Client): The client instance to use for the connection test. - - Returns: - str: 'ok' if the connection is successful, otherwise returns an error message. + params = filter_none_values({'explain': explain, 'limit': limit}) + + response = self._http_request(method="GET", url_suffix=url_suffix, params=params) + + # Return the reputation history, or an empty list if not found + return response.get('response', {}).get('ns_server_reputation', []) + + def get_subnet_reputation(self, subnet: str, explain: bool = False, limit: Optional[int] = None) -> Dict[str, Any]: + """ + Retrieve reputation history for a specific subnet. + + Args: + subnet (str): The subnet to query. + explain (bool, optional): Whether to include detailed explanations. Defaults to False. + limit (int, optional): Maximum number of results to return. Defaults to None. + + Returns: + Dict[str, Any]: Subnet reputation history information. + """ + url_suffix = f"/{subnet}" + + params = { + "explain": str(explain).lower() if explain else None, + "limit": limit + } + + params = filter_none_values(params) + + return self._http_request(method="GET", url_suffix=url_suffix, params=params) + + def get_asns_for_domain(self, domain: str) -> Dict[str, Any]: + """ + Retrieve Autonomous System Numbers (ASNs) associated with the specified domain. + + Args: + domain (str): The domain to retrieve ASNs for. + + Returns: + Dict[str, Any]: A dictionary containing the ASN information for the domain. + """ + url_suffix = f"{ASNS_DOMAIN}/{domain}" + + # Send the request and return the response directly + return self._http_request(method="GET", url_suffix=url_suffix) + + +''' HELPER FUNCTIONS ''' +def filter_none_values(params: Dict[str, Any]) -> Dict[str, Any]: + """Removes None values from a dictionary.""" + return {k: v for k, v in params.items() if v is not None} + + +''' COMMAND FUNCTIONS ''' + + +def test_module(client: Client, first_fetch_time: int) -> str: + """Tests API connectivity and authentication' + + Returning 'ok' indicates that the integration works like it is supposed to. + Connection to the service is successful. + Raises exceptions if something goes wrong. + + :type client: ``Client`` + :param Client: SilentPush client to use + + :type name: ``str`` + :param name: name to append to the 'Hello' string + + :return: 'ok' if test passed, anything else will fail the test. + :rtype: ``str`` """ - demisto.debug('Running test module...') + + # INTEGRATION DEVELOPER TIP + # Client class should raise the exceptions, but if the test fails + # the exception text is printed to the Cortex XSOAR UI. + # If you have some specific errors you want to capture (i.e., auth failure) + # you should catch the exception here and return a string with a more + # readable output (for example return 'Authentication Error, API Key + # invalid'). + # Cortex XSOAR will print everything you return that is different than 'ok' as + # an error. try: - client.list_domain_information('silentpush.com') - demisto.debug('Test module completed successfully') + resp = client.get_job_status("job_id", "max_wait", "result_type") + if resp.get("status_code") != 200: + return f"Connection failed :- {resp.get('errors')}" return 'ok' except DemistoException as e: - demisto.debug(f'Test module failed: {str(e)}') if 'Forbidden' in str(e) or 'Authorization' in str(e): return 'Authorization Error: make sure API Key is correctly set' raise e - - -''' COMMAND FUNCTIONS ''' -def list_domain_information_command(client: Client, args: dict) -> CommandResults: +@metadata_collector.command( + command_name="silentpush-get-job-status", + inputs_list=JOB_STATUS_INPUTS, + outputs_prefix="SilentPush.JobStatus", + outputs_list=JOB_STATUS_OUTPUTS, + description="This command retrieve status of running job or results from completed job.", +) +def get_job_status_command(client: Client, args: dict) -> CommandResults: """ - Command handler for fetching domain information. - - This function processes the command for 'silentpush-list-domain-information', retrieves the - domain information using the client, and formats it for XSOAR output. - + Retrieves the status of a job based on the provided job ID and other optional parameters. + Args: - client (Client): The client instance to fetch the data. - args (dict): The arguments passed to the command, including the domain. - + client (Client): The client instance that interacts with the service to fetch job status. + args (dict): A dictionary of arguments, which should include: + - 'job_id' (str): The unique identifier of the job for which status is being retrieved. + - 'max_wait' (Optional[int]): The maximum wait time in seconds (default is None). + - 'result_type' (Optional[str]): Type of result to retrieve. Valid options are 'Status', + 'Include Metadata', or 'Exclude Metadata' (default is None). + Returns: - CommandResults: The command results containing readable output and the raw response. + CommandResults: The command results containing: + - 'outputs_prefix' (str): The prefix for the output context. + - 'outputs_key_field' (str): The field used as the key in the outputs. + - 'outputs' (dict): A dictionary with job ID and job status information. + - 'readable_output' (str): A formatted string that represents the job status in a human-readable format. + - 'raw_response' (dict): The raw response received from the service. + + Raises: + DemistoException: If the 'job_id' parameter is missing or if no job status is found for the given job ID. """ - domain = args.get('domain', 'silentpush.com') - demisto.debug(f'Processing domain: {domain}') + job_id = args.get('job_id') + max_wait = arg_to_number(args.get('max_wait')) + result_type = args.get('result_type') - raw_response = client.list_domain_information(domain) - demisto.debug(f'Response from API: {raw_response}') + if not job_id: + raise DemistoException("job_id is a required parameter") - readable_output = tableToMarkdown('Domain Information', raw_response) + raw_response = client.get_job_status(job_id, max_wait, result_type) + job_status = raw_response.get('response', {}) + if not job_status: + raise DemistoException(f"No job status found for Job ID: {job_id}") + + readable_output = tableToMarkdown( + f"Job Status for Job ID: {job_id}", + [job_status], + headers=list(job_status.keys()), + removeNull=True + ) return CommandResults( - outputs_prefix='SilentPush.Domain', + outputs_prefix='SilentPush.JobStatus', + outputs_key_field='job_id', + outputs={'job_id': job_id, **job_status}, + readable_output=readable_output, + raw_response=raw_response + ) + + +@metadata_collector.command( + command_name="silentpush-get-nameserver-reputation", + inputs_list=NAMESERVER_REPUTATION_INPUTS, + outputs_prefix="SilentPush.SubnetReputation", + outputs_list=NAMESERVER_REPUTATION_OUTPUTS, + description="This command retrieve historical reputation data for a specified nameserver, including reputation scores and optional detailed calculation information.", +) +def get_nameserver_reputation_command(client: Client, args: dict) -> CommandResults: + """ + Command handler for retrieving nameserver reputation. + + Args: + client (Client): The API client instance. + args (dict): Command arguments. + + Returns: + CommandResults: The command results containing nameserver reputation data. + """ + nameserver = args.get("nameserver") + explain = argToBoolean(args.get("explain", "false")) + limit = arg_to_number(args.get("limit")) + + if not nameserver: + raise ValueError("Nameserver is required.") + + # Fetch reputation data + reputation_data = client.get_nameserver_reputation(nameserver, explain, limit) + + # Prepare the readable output + if reputation_data: + readable_output = tableToMarkdown( + f"Nameserver Reputation for {nameserver}", + reputation_data, + headers=list(reputation_data[0].keys()), + removeNull=True + ) + else: + readable_output = f"No reputation history found for nameserver: {nameserver}" + + # Return command results + return CommandResults( + outputs_prefix="SilentPush.NameserverReputation", + outputs_key_field="ns_server", + outputs={"nameserver": nameserver, "reputation_data": reputation_data}, + readable_output=readable_output, + raw_response=reputation_data + ) + +@metadata_collector.command( + command_name="silentpush-get-subnet-reputation", + inputs_list=SUBNET_REPUTATION_INPUTS, + outputs_prefix="SilentPush.NameserverReputation", + outputs_list=SUBNET_REPUTATION_OUTPUTS, + description="This command retrieves the reputation history for a specific subnet." +) +def get_subnet_reputation_command(client: Client, args: dict) -> CommandResults: + """ + Retrieves the reputation history of a given subnet. + + Args: + client (Client): The API client instance. + args (dict): Command arguments containing: + - subnet (str): The subnet to query. + - explain (bool, optional): Whether to include an explanation. + - limit (int, optional): Limit the number of reputation records. + + Returns: + CommandResults: The command result containing the subnet reputation data. + """ + subnet = args.get('subnet') + if not subnet: + raise DemistoException("Subnet is a required parameter.") + + explain = argToBoolean(args.get('explain', False)) + limit = arg_to_number(args.get('limit')) + + raw_response = client.get_subnet_reputation(subnet, explain, limit) + subnet_reputation = raw_response.get('response', {}).get('subnet_reputation_history', []) + + readable_output = ( + f"No reputation history found for subnet: {subnet}" + if not subnet_reputation + else tableToMarkdown(f"Subnet Reputation for {subnet}", subnet_reputation, removeNull=True) + ) + + return CommandResults( + outputs_prefix='SilentPush.SubnetReputation', + outputs_key_field='subnet', + outputs={'subnet': subnet, 'reputation_history': subnet_reputation}, + readable_output=readable_output, + raw_response=raw_response + ) + + +@metadata_collector.command( + command_name="silentpush-get-asns-for-domain", + inputs_list=ASNS_DOMAIN_INPUTS, + outputs_prefix="SilentPush.DomainASNs", + outputs_list=ASNS_DOMAIN_OUTPUTS, + description="This command retrieves Autonomous System Numbers (ASNs) associated with a domain." +) +def get_asns_for_domain_command(client: Client, args: dict) -> CommandResults: + """ + Retrieves Autonomous System Numbers (ASNs) for the specified domain. + + Args: + client (Client): The client object used to interact with the service. + args (dict): Arguments passed to the command, including the domain. + + Returns: + CommandResults: The results containing ASNs for the domain or an error message. + """ + domain = args.get('domain') + + if not domain: + raise DemistoException("Domain is a required parameter.") + + raw_response = client.get_asns_for_domain(domain) + records = raw_response.get('response', {}).get('records', []) + + if not records or 'domain_asns' not in records[0]: + readable_output = f"No ASNs found for domain: {domain}" + asns = [] + else: + domain_asns = records[0]['domain_asns'] + asns = [{'ASN': asn, 'Description': description} + for asn, description in domain_asns.items()] + + readable_output = tableToMarkdown( + f"ASNs for Domain: {domain}", + asns, + headers=['ASN', 'Description'] + ) + + return CommandResults( + outputs_prefix='SilentPush.DomainASNs', outputs_key_field='domain', - outputs=raw_response, + outputs={ + 'domain': domain, + 'asns': asns + }, readable_output=readable_output, raw_response=raw_response ) @@ -182,17 +557,13 @@ def list_domain_information_command(client: Client, args: dict) -> CommandResult ''' MAIN FUNCTION ''' -def main(): - """ - Main function to initialize the client and process the commands. - - This function parses the parameters, sets up the client, and routes the command to - the appropriate function. - - It handles the setup of authentication, base URL, SSL verification, and proxy configuration. - Also, it routes the `test-module` and `silentpush-list-domain-information` commands to the - corresponding functions. +def main() -> None: + """main function, parses params and runs command functions + + :return: + :rtype: """ + try: params = demisto.params() api_key = params.get('credentials', {}).get('password') @@ -200,9 +571,6 @@ def main(): verify_ssl = not params.get('insecure', False) proxy = params.get('proxy', False) - demisto.debug(f'Base URL: {base_url}') - demisto.debug('Initializing client...') - client = Client( base_url=base_url, api_key=api_key, @@ -210,26 +578,28 @@ def main(): proxy=proxy ) - command = demisto.command() - demisto.debug(f'Command being called is {command}') - - if command == 'test-module': - result = test_module(client) + if demisto.command() == 'test-module': + result = test_module(client, demisto.args()) return_results(result) - - elif command == 'silentpush-list-domain-information': - return_results(list_domain_information_command(client, demisto.args())) - - else: - raise DemistoException(f'Unsupported command: {command}') + + elif demisto.command() == 'silentpush-get-job-status': + return_results(get_job_status_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-get-nameserver-reputation': + return_results(get_nameserver_reputation_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-get-subnet-reputation': + return_results(get_subnet_reputation_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-get-asns-for-domain': + return_results(get_asns_for_domain_command(client, demisto.args())) except Exception as e: - demisto.error(f'Failed to execute {demisto.command()} command. Error: {str(e)}') - return_error(f'Failed to execute {demisto.command()} command. Error: {str(e)}') + demisto.error(traceback.format_exc()) # print the traceback + return_error(f'Failed to execute {demisto.command()} command.\nError:\n{str(e)}') ''' ENTRY POINT ''' - if __name__ in ('__main__', '__builtin__', 'builtins'): main() diff --git a/Packs/SilentPush/Integrations/SilentPush/SilentPush.yml b/Packs/SilentPush/Integrations/SilentPush/SilentPush.yml deleted file mode 100644 index 098cfeef1186..000000000000 --- a/Packs/SilentPush/Integrations/SilentPush/SilentPush.yml +++ /dev/null @@ -1,109 +0,0 @@ -commonfields: - id: SilentPush - version: -1 - name: SilentPush - type: python - subType: python3 - description: | - This integration allows fetching domain information from the SilentPush API. It includes commands to get domain-related information such as WHOIS data, domain age, and risk scores. - tags: [] - enabled: true - manufacturer: SilentPush - comment: '' - minVersion: -1 - dependencies: - - CommonServerPython - - CommonServerUserPython - -scripts: - - path: SilentPush.py - comment: | - Integration for SilentPush that enables fetching domain information, including WHOIS data, domain age, and risk scores. - -commands: - - name: test-module - description: | - Tests the connectivity to the SilentPush API and checks the authentication status. - isArray: false - argContext: - - id: base_url - type: string - description: The base URL for the SilentPush API. - - id: api_key - type: string - description: The API key used to authenticate requests. - - id: verify_ssl - type: boolean - description: Flag to determine whether SSL verification is enabled. - examples: | - !test-module - - - name: silentpush-list-domain-information - description: | - Fetches domain information, such as WHOIS data, domain age, and risk scores. - isArray: false - argContext: - - id: domain - type: string - description: The domain name to fetch information for. - examples: | - !silentpush-list-domain-information domain=example.com - -args: - - id: domain - isArray: false - description: | - The domain to fetch information for. - type: string - -outputs: - - id: SilentPush.Domain - type: complex - description: | - The domain information fetched from SilentPush API, including WHOIS data, domain age, and risk scores. - contents: - - name: domain - type: string - - name: whois_data - type: string - - name: domain_age - type: integer - - name: risk_score - type: float - -tests: - - name: Test SilentPush Integration - description: Test the integration with the SilentPush API. - steps: - - script: test-module - name: Test SilentPush API Connectivity - args: - base_url: https://api.silentpush.com - api_key: 'your_api_key' - -# Optional: Adding the configuration section for any configuration-related parameters -configurations: - - default: true - isArray: false - description: The configuration parameters required for connecting to SilentPush API. - context: - - id: base_url - type: string - description: The base URL for the SilentPush API. - - id: api_key - type: string - description: The API key used to authenticate requests. - - id: verify_ssl - type: boolean - description: Flag to determine whether SSL verification is enabled. - - id: proxy - type: boolean - description: Flag to determine whether to use a proxy. - -errorHandling: - - errorCode: 403 - description: | - If an authorization error is encountered, it could indicate an incorrect or expired API key. - - errorCode: 400 - description: | - Bad Request error, likely due to incorrect input format or invalid parameters in the request. diff --git a/Packs/SilentPush/Integrations/SilentPush/SilentPush_test.py b/Packs/SilentPush/Integrations/SilentPush/SilentPush_test.py index d957c5edf2a0..c42f4e3a8b39 100644 --- a/Packs/SilentPush/Integrations/SilentPush/SilentPush_test.py +++ b/Packs/SilentPush/Integrations/SilentPush/SilentPush_test.py @@ -10,11 +10,13 @@ you are implementing with your integration """ +from demisto_sdk.commands.common.handlers import JSON_Handler + import json def util_load_json(path): - with open(path, encoding='utf-8') as f: + with open(path, encoding="utf-8") as f: return json.loads(f.read()) @@ -29,13 +31,11 @@ def test_baseintegration_dummy(): """ from BaseIntegration import Client, baseintegration_dummy_command - client = Client(base_url='some_mock_url', verify=False) - args = { - 'dummy': 'this is a dummy response' - } + client = Client(base_url="some_mock_url", verify=False) + args = {"dummy": "this is a dummy response", "dummy2": "a dummy value"} response = baseintegration_dummy_command(client, args) - mock_response = util_load_json('test_data/baseintegration-dummy.json') + assert response.outputs == args + - assert response.outputs == mock_response # TODO: ADD HERE unit tests for every command diff --git a/Packs/SilentPush/Integrations/SilentPush/test_data/baseintegration-dummy.json b/Packs/SilentPush/Integrations/SilentPush/test_data/baseintegration-dummy.json deleted file mode 100644 index 37fa47b18cd0..000000000000 --- a/Packs/SilentPush/Integrations/SilentPush/test_data/baseintegration-dummy.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "dummy": "this is a dummy response" -} \ No newline at end of file diff --git a/Packs/SilentPush/pack_metadata.json b/Packs/SilentPush/pack_metadata.json index b5cd72cb3e81..123de5898f65 100644 --- a/Packs/SilentPush/pack_metadata.json +++ b/Packs/SilentPush/pack_metadata.json @@ -1,18 +1,15 @@ { "name": "Silent Push", - "description": "The Silent Push platform exposes Indicators of Future Attack (IOFA) by applying unique behavioral fingerprints to attacker activity. By searching our dataset, security teams can identify new impending attacks, rather than relying on known IOCs.", + "description": "SilentPush integration for domain and IP intelligence\u001b[C", "support": "partner", "currentVersion": "1.0.0", - "author": "Silent Push", + "author": "Yash", "url": "", "email": "", "categories": [ "Data Enrichment & Threat Intelligence" ], - "tags": [ - "IoC", - "IoFA" - ], + "tags": [], "useCases": [], "keywords": [], "marketplaces": [