diff --git a/custom-recipes/pi-system-retrieve-list/recipe.py b/custom-recipes/pi-system-retrieve-list/recipe.py index 88b4fbb..b0f3c1c 100644 --- a/custom-recipes/pi-system-retrieve-list/recipe.py +++ b/custom-recipes/pi-system-retrieve-list/recipe.py @@ -7,7 +7,8 @@ get_credentials, get_interpolated_parameters, normalize_af_path, get_combined_description, get_base_for_data_type, check_debug_mode, PerformanceTimer, get_max_count, check_must_convert_object_to_string, - convert_schema_objects_to_string, get_summary_parameters, get_advanced_parameters + convert_schema_objects_to_string, get_summary_parameters, get_advanced_parameters, + get_batch_parameters ) from osisoft_client import OSIsoftClient from osisoft_constants import OSIsoftConstants @@ -63,6 +64,8 @@ def get_step_value(item): record_boundary_type = config.get("record_boundary_type") if data_type == "RecordedData" else None summary_type, summary_duration = get_summary_parameters(config) do_duplicate_input_row = config.get("do_duplicate_input_row", False) +max_request_size, estimated_density, maximum_points_returned = get_batch_parameters(config) +max_time_to_retrieve_per_batch = estimated_density / maximum_points_returned #density per hour <- max time is in hour network_timer = PerformanceTimer() processing_timer = PerformanceTimer() @@ -150,7 +153,9 @@ def get_step_value(item): object_id=object_id, summary_type=summary_type, summary_duration=summary_duration, - endpoint_type="AF" + endpoint_type="AF", + estimated_density=estimated_density, + maximum_points_returned=maximum_points_returned ) batch_buffer_size = 0 buffer = [] diff --git a/python-lib/osisoft_client.py b/python-lib/osisoft_client.py index 1fbf55c..8b10b9d 100644 --- a/python-lib/osisoft_client.py +++ b/python-lib/osisoft_client.py @@ -10,7 +10,8 @@ from osisoft_plugin_common import ( assert_server_url_ok, build_requests_params, is_filtered_out, is_server_throttling, escape, epoch_to_iso, - iso_to_epoch, RecordsLimit, is_iso8601, get_next_page_url, change_key_in_dict + iso_to_epoch, RecordsLimit, is_iso8601, get_next_page_url, change_key_in_dict, + BatchTimeCounter ) from osisoft_pagination import OffsetPagination from safe_logger import SafeLogger @@ -243,7 +244,10 @@ def get_rows_from_webid(self, webid, data_type, **kwargs): def get_rows_from_webids(self, input_rows, data_type, **kwargs): endpoint_type = kwargs.get("endpoint_type", "event_frames") batch_size = kwargs.get("batch_size", 500) - + estimated_density = kwargs.get("estimated_density", 500) + maximum_points_returned = kwargs.get("maximum_points_returned", 500) + max_time_to_retrieve_per_batch = maximum_points_returned / estimated_density + batch_time = BatchTimeCounter(max_time_to_retrieve_per_batch) batch_requests_parameters = [] number_processed_webids = 0 number_of_webids_to_process = len(input_rows) @@ -259,14 +263,18 @@ def get_rows_from_webids(self, input_rows, data_type, **kwargs): else: webid = input_row url = self.endpoint.get_data_from_webid_url(endpoint_type, data_type, webid) + start_date = kwargs.get("start_date") + end_date = kwargs.get("end_date") + interval = kwargs.get("interval") requests_kwargs = self.generic_get_kwargs(**kwargs) + batch_time.add(start_date, end_date, interval) requests_kwargs['url'] = build_query_string(url, requests_kwargs.get("params")) web_ids.append(webid) event_start_times.append(event_start_time) event_end_times.append(event_end_time) batch_requests_parameters.append(requests_kwargs) number_processed_webids += 1 - if (len(batch_requests_parameters) >= batch_size) or (number_processed_webids == number_of_webids_to_process): + if (len(batch_requests_parameters) >= batch_size) or (number_processed_webids == number_of_webids_to_process) or batch_time.is_batch_full(): json_responses = self._batch_requests(batch_requests_parameters) batch_requests_parameters = [] response_index = 0 diff --git a/python-lib/osisoft_plugin_common.py b/python-lib/osisoft_plugin_common.py index a4aff59..c8e0815 100644 --- a/python-lib/osisoft_plugin_common.py +++ b/python-lib/osisoft_plugin_common.py @@ -442,9 +442,9 @@ def epoch_to_iso(epoch): def iso_to_epoch(iso_timestamp): - logger.info("Converting iso timestamp '{}' to epoch".format(iso_timestamp)) + # logger.info("Converting iso timestamp '{}' to epoch".format(iso_timestamp)) if is_epoch(iso_timestamp): - logger.info("Timestamp is already epoch") + # logger.info("Timestamp is already epoch") return iso_timestamp epoch_timestamp = None try: @@ -453,7 +453,7 @@ def iso_to_epoch(iso_timestamp): except Exception: logger.error("Error when converting iso timestamp '{}' to epoch".format(iso_timestamp)) return None - logger.info("Timestamp is now '{}'".format(epoch_timestamp)) + # logger.info("Timestamp is now '{}'".format(epoch_timestamp)) return epoch_timestamp @@ -619,21 +619,18 @@ def get_worst_performers(self): class BatchTimeCounter(object): def __init__(self, max_time_to_retrieve_per_batch): + logger.info("BatchTimeCounter:max_time_to_retrieve_per_batch={}s".format(max_time_to_retrieve_per_batch * 60 * 60)) self.max_time_to_retrieve_per_batch = max_time_to_retrieve_per_batch * 60 * 60 - self.total_batch_time = 0 - # 2 points /h each line - # max 1 000 000 lines back -> 500k hours max + self.total_batched_time = 0 def is_batch_full(self): - # return False if self.max_time_to_retrieve_per_batch < 0: return False - if self.total_batch_time > self.max_time_to_retrieve_per_batch: - logger.warning("batch contains {}s of request, needs to flush now".format(self.total_batch_time)) - self.total_batch_time = 0 + if self.total_batched_time > self.max_time_to_retrieve_per_batch: + logger.warning("batch contains {}s of request, needs to flush now".format(self.total_batched_time)) + self.total_batched_time = 0 return True - logger.info("Batch below time threshold") return False def add(self, start_time, end_time, interval): - self.total_batch_time += compute_time_spent(start_time, end_time, interval) + self.total_batched_time += compute_time_spent(start_time, end_time, interval)