Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions custom-recipes/api-connect/recipe.json
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,81 @@
"type": "STRING",
"visibilityCondition": "model.use_mtls==true"
},
{
"name": "force_csv_parameters",
"label": "Force CSV parameters",
"description": "",
"type": "BOOLEAN",
"defaultValue": false
},
{
"name": "csv_delimiter",
"label": "Delimiter",
"description": "",
"type": "STRING",
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_doublequote",
"label": "Double quote",
"description": "",
"type": "SELECT",
"selectChoices":[
{"value": null, "label": "Auto detect"},
{"value": "double_quote", "label": "Double quote"},
{"value": "not_double_quote", "label": "No double quote"}
],
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_escapechar",
"label": "Escape char",
"description": "",
"type": "STRING",
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_lineterminator",
"label": "Line terminator",
"description": "",
"type": "STRING",
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_quotechar",
"label": "Quote char",
"description": "",
"type": "STRING",
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_quoting",
"label": "Quote",
"description": "",
"type": "SELECT",
"selectChoices":[
{"value": null, "label": "Auto detect"},
{"value": 0, "label": "Minimal"},
{"value": 1, "label": "All"},
{"value": 2, "label": "Non numeric"},
{"value": 3, "label": "None"},
{"value": 4, "label": "Strings"},
{"value": 5, "label": "Not null"}
],
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_skipinitialspace",
"label": "Skip initial space",
"description": "",
"type": "SELECT",
"selectChoices":[
{"value": null, "label": "Auto detect"},
{"value": "skip", "label": "Skip"},
{"value": "not_skip", "label": "Do not skip"}
],
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "redirect_auth_header",
"label": "Redirect authorization header",
Expand Down
75 changes: 75 additions & 0 deletions python-connectors/api-connect_dataset/connector.json
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,81 @@
"type": "STRING",
"visibilityCondition": "model.use_mtls==true"
},
{
"name": "force_csv_parameters",
"label": " ",
"description": "Force CSV parameters",
"type": "BOOLEAN",
"defaultValue": false
},
{
"name": "csv_delimiter",
"label": " ",
"description": "Delimiter",
"type": "STRING",
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_doublequote",
"label": " ",
"description": "Double quote",
"type": "SELECT",
"selectChoices":[
{"value": null, "label": "Auto detect"},
{"value": "double_quote", "label": "Double quote"},
{"value": "not_double_quote", "label": "No double quote"}
],
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_escapechar",
"label": " ",
"description": "Escape char",
"type": "STRING",
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_lineterminator",
"label": " ",
"description": "Line terminator",
"type": "STRING",
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_quotechar",
"label": " ",
"description": "Quote char",
"type": "STRING",
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_quoting",
"label": " ",
"description": "Quote",
"type": "SELECT",
"selectChoices":[
{"value": null, "label": "Auto detect"},
{"value": 0, "label": "Minimal"},
{"value": 1, "label": "All"},
{"value": 2, "label": "Non numeric"},
{"value": 3, "label": "None"},
{"value": 4, "label": "Strings"},
{"value": 5, "label": "Not null"}
],
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "csv_skipinitialspace",
"label": " ",
"description": "Skip initial space",
"type": "SELECT",
"selectChoices":[
{"value": null, "label": "Auto detect"},
{"value": "skip", "label": "Skip"},
{"value": "not_skip", "label": "Do not skip"}
],
"visibilityCondition": "model.force_csv_parameters==true"
},
{
"name": "redirect_auth_header",
"label": " ",
Expand Down
3 changes: 2 additions & 1 deletion python-connectors/api-connect_dataset/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, config, plugin_config):
self.raw_output = endpoint_parameters.get("raw_output", None)
self.maximum_number_rows = config.get("maximum_number_rows", -1)
self.display_metadata = config.get("display_metadata", False)
self.csv_configuration = config

def get_read_schema(self):
# In this example, we don't specify a schema here, so DSS will infer the schema
Expand Down Expand Up @@ -60,7 +61,7 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None,
record_count += 1
yield self.format_output(data, metadata)
else:
csv_data = decode_csv_data(data)
csv_data = decode_csv_data(data, self.csv_configuration)
if csv_data:
record_count += len(csv_data)
for row in csv_data:
Expand Down
2 changes: 1 addition & 1 deletion python-lib/dku_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ class DKUConstants(object):
API_RESPONSE_KEY = "api_response"
FORBIDDEN_KEYS = ["token", "password", "api_key_value", "secure_token"]
FORM_DATA_BODY_FORMAT = "FORM_DATA"
PLUGIN_VERSION = "1.2.7-beta.2"
PLUGIN_VERSION = "1.2.7-beta.3"
RAW_BODY_FORMAT = "RAW"
REPONSE_ERROR_KEY = "dku_error"
69 changes: 59 additions & 10 deletions python-lib/dku_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def get_endpoint_parameters(configuration):
"next_page_url_key", "is_next_page_url_relative", "next_page_url_base",
"top_key", "skip_key", "maximum_number_rows",
"use_mtls", "mtls_certificate_path", "mtls_key_path",
"force_csv_parameters", "csv_delimiter"
]
parameters = {
endpoint_parameter: configuration.get(endpoint_parameter) for endpoint_parameter in endpoint_parameters if configuration.get(endpoint_parameter) is not None
Expand Down Expand Up @@ -167,7 +168,7 @@ def xml_to_json(content):
return json_response


def decode_csv_data(data):
def decode_csv_data(data, csv_configuation):
import csv
import io
json_data = None
Expand All @@ -189,19 +190,67 @@ def decode_csv_data(data):
)
except Exception as error:
logger.error("Could not sniff csv dialect. Error={}".format(error))
dialect = "excel"
try:
reader = csv.DictReader(
io.StringIO(data),
dialect=dialect
)
json_data = list(reader)
except Exception as error:
logger.error("Could not extract csv data. Error={}. Trying method 2.".format(error))
# dialect = "excel"
dialect = csv.Dialect()
dialect.delimiter = ','
dialect.quotechar = '"'
dialect.doublequote = True
dialect.skipinitialspace = False
dialect.lineterminator = '\r\n'
dialect.quoting = 0
dialect = update_csv_dialect(csv_configuation, dialect)
if not csv_configuation.get("force_csv_parameters", False):
# For back compatibility reason, if csv params are not forced,
# we try the old method first.
try:
reader = csv.DictReader(
io.StringIO(data),
dialect=dialect
)
json_data = list(reader)
except Exception as error:
logger.error("Could not extract csv data. Error={}. Trying method 2.".format(error))
json_data = decode_csv_data_m2(data, dialect)
else:
logger.error("CSV parameters are forced, trying method 2")
json_data = decode_csv_data_m2(data, dialect)
return json_data


def update_csv_dialect(config, input_dialect):
if config.get("force_csv_parameters", False):
logger.info("Updating csv parameters with ")
csv_delimiter = config.get("csv_delimiter")
if csv_delimiter:
input_dialect.delimiter = csv_delimiter
logger.info("delimiter={}".format(csv_delimiter))
csv_doublequote = config.get("csv_doublequote", None)
if csv_doublequote:
input_dialect.doublequote = csv_doublequote == "double_quote"
logger.info("doublequote={}".format(input_dialect.doublequote))
csv_escapechar = config.get("csv_escapechar", "")
if csv_escapechar:
input_dialect.escapechar = csv_escapechar
logger.info("escapechar={}".format(csv_escapechar))
csv_lineterminator = config.get("csv_lineterminator", "")
if csv_lineterminator:
input_dialect.lineterminator = csv_lineterminator
logger.info("lineterminator={}".format(csv_lineterminator))
csv_quotechar = config.get("csv_quotechar", "")
if csv_quotechar:
input_dialect.quotechar = csv_quotechar
logger.info("quotechar={}".format(csv_quotechar))
csv_quoting = config.get("csv_quoting", None)
if csv_quoting is not None:
input_dialect.quoting = csv_quoting
logger.info("quoting={}".format(csv_quoting))
csv_skipinitialspace = config.get("csv_skipinitialspace", None)
if csv_skipinitialspace:
input_dialect.skipinitialspace = csv_skipinitialspace == "skip"
logger.info("skipinitialspace={}".format(input_dialect.skipinitialspace))
return input_dialect


def decode_csv_data_m2(data, dialect):
import csv
json_data = None
Expand Down
5 changes: 3 additions & 2 deletions python-lib/rest_api_recipe_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, custom_key_values, credential_parameters, secure_credentials,
self.is_row_limit = (self.maximum_number_rows > 0)
self.behaviour_when_error = behaviour_when_error or "add-error-column"
self.can_raise = self.behaviour_when_error == "raise"
self.csv_configuration = endpoint_parameters

@staticmethod
def get_column_to_parameter_dict(parameter_columns, parameter_renamings):
Expand Down Expand Up @@ -126,7 +127,7 @@ def retrieve_next_page(self, is_raw_output):
base_row.update(self.initial_parameter_columns)
page_rows.append(base_row)
else:
decoded_csv_data = decode_csv_data(json_response)
decoded_csv_data = decode_csv_data(json_response, self.csv_configuration)
is_api_returning_dict = False
if not decoded_csv_data and json_response:
logger.warning("Data is not in CSV format. Dumping it in text mode.")
Expand All @@ -151,7 +152,7 @@ def format_page_rows(self, data_rows, is_raw_output, metadata=None):
page_rows = []
metadata = metadata or {}
if type(data_rows) in [str, bytes]:
data_rows = decode_csv_data(data_rows)
data_rows = decode_csv_data(data_rows, self.csv_configuration)
if type(data_rows) in [list]:
for data_row in data_rows:
base_row = copy.deepcopy(self.initial_parameter_columns)
Expand Down