Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,121 changes: 2,121 additions & 0 deletions scratch.ipynb

Large diffs are not rendered by default.

86 changes: 86 additions & 0 deletions src/onc/modules/_Messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import logging
import re
import requests
import time

REQ_MSG = "Requested: {}" # get request url
RESPONSE_TIME_MSG = "Response received in {} seconds." # requests.elapsed value.
RESPONSE_MSG = "HTTP Response: {} ({})" # Brief description, status code
MULTIPAGE_MSG = ("The requested data quantity is greater than the "
"supplied row limit and will be downloaded over multiple requests.")


def setup_logger(logger_name: str = 'onc-client',
level: int | str = 'DEBUG') -> logging.Logger:
"""
Set up a logger object for displaying verbose messages to console.

:param logger_name: The unique logger name to use. Can be shared between modules
:param level: The logging level to use. Default is 2, which corresponds to DEBUG.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc string should be updated to reflect logging numeric values.
https://docs.python.org/3/library/logging.html#logging-levels

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind. I see that this is outdated.

:return: The configured logging.Logger object.
"""

logger = logging.getLogger(logger_name)
logger.propagate = False
if not logger.handlers:
logger.setLevel(logging.DEBUG)
console = logging.StreamHandler()
console.setLevel(level)

# Set the logging format.
dtfmt = '%Y-%m-%dT%H:%M:%S'
strfmt = f'%(asctime)s.%(msecs)03dZ | %(name)-12s | %(levelname)-8s | %(message)s'
#strfmt = f'%(asctime)s.%(msecs)03dZ | %(levelname)-8s | %(message)s' # Use this if you don't want to include logger name.
fmt = logging.Formatter(strfmt, datefmt=dtfmt)
fmt.converter = time.gmtime

console.setFormatter(fmt)
logger.addHandler(console)
return logger


def scrub_token(input: str) -> str:
"""
Replace a token in a query URL or other string with the string 'REDACTED'
so that users don't accidentally commit their tokens to public repositories
if ONC Info/Warnings are too verbose.

:param query_url: An Oceans 3.0 API URL or string with a token query parameter.
:return: A scrubbed url.
"""
token_regex = r'(&token=[a-f0-9-]{36})'
token_qp = re.findall(token_regex, input)[0]
redacted_url = input.replace(token_qp, '&token=REDACTED')
return redacted_url


def build_error_message(response: requests.Response, redact_token: bool) -> str:
"""
Build an error message from a requests.Response object.

:param response: A requests.Response object.
:param redact_token: If true, redact tokens before returning an error message.
:return: An error message.
"""
payload = response.json()
if 'message' in payload.keys():
message = payload['message']
else:
message = None

if 'errors' in payload.keys():
errors = payload['errors']
error_messages = []
for error in errors:
emsg = (f"(API Error Code {error['errorCode']}) "
f"{error['errorMessage']} for query parameter(s) "
f"'{error['parameter']}'.")
error_messages.append(emsg)
error_message = '\n'.join(error_messages)
else:
error_message = None
msg = '\n'.join([m for m in (message, error_message) if m is not None])
if redact_token is True and 'token=' in msg:
msg = scrub_token(msg)
return msg

83 changes: 44 additions & 39 deletions src/onc/modules/_MultiPage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,23 @@

from ._util import _formatDuration

from onc.modules._Messages import (setup_logger, MULTIPAGE_MSG,
build_error_message,
scrub_token,
REQ_MSG,
RESPONSE_TIME_MSG,
RESPONSE_MSG)



# Handles data multi-page downloads (scalardata, rawdata, archivefiles)
class _MultiPage:
def __init__(self, parent: object):
def __init__(self, parent: object, verbosity: bool, raise_http_errors: bool):
self.parent = weakref.ref(parent)
self.result = None
self.raise_http_errors = raise_http_errors
self.__log = setup_logger('onc-multi', verbosity)


def getAllPages(self, service: str, url: str, filters: dict):
"""
Expand All @@ -30,48 +41,42 @@ def getAllPages(self, service: str, url: str, filters: dict):
# download first page
start = time()
response, responseTime = self._doPageRequest(url, filters, service, extension)
rNext = response["next"]

if rNext is not None:
print(
"Data quantity is greater than the row limit and",
"will be downloaded in multiple pages.",
)

pageCount = 1
pageEstimate = self._estimatePages(response, service)
if pageEstimate > 0:
# Exclude the first page when calculating the time estimation
timeEstimate = _formatDuration((pageEstimate - 1) * responseTime)
print(
f"Downloading time for the first page: {humanize.naturaldelta(responseTime)}" # noqa: E501
)
print(f"Estimated approx. {pageEstimate} pages in total.")
print(
f"Estimated approx. {timeEstimate} to complete for the rest of the pages." # noqa: E501
)

# keep downloading pages until next is None
print("")
while rNext is not None:
pageCount += 1
rowCount = self._rowCount(response, service)
if isinstance(response,dict):
rNext = response["next"]

print(f" ({rowCount} samples) Downloading page {pageCount}...")
nextResponse, nextTime = self._doPageRequest(
url, rNext["parameters"], service, extension
)
rNext = nextResponse["next"]
if rNext is not None:
self.__log.info("The requested data quantity is greater than the supplied "
"row limit and will be downloaded over multiple requests.")

pageCount = 1
pageEstimate = self._estimatePages(response, service)
if pageEstimate > 0:
# Exclude the first page when calculating the time estimation
timeEstimate = _formatDuration((pageEstimate - 1) * responseTime)
self.__log.debug(f'Download time for page {pageCount}: {round(responseTime,2)} seconds')
self.__log.info(f'Est. number of pages remaining for download: {pageEstimate-1}')
self.__log.info(f'Est. number of seconds to download remaining data: {timeEstimate}')

# keep downloading pages until next is None
while rNext is not None:
pageCount += 1
rowCount = self._rowCount(response, service)

self.__log.debug(f"Submitting request for page {pageCount} ({rowCount} samples)...")

nextResponse, nextTime = self._doPageRequest(
url, rNext["parameters"], service, extension
)
rNext = nextResponse["next"]

# concatenate new data obtained
self._catenateData(response, nextResponse, service)

# concatenate new data obtained
self._catenateData(response, nextResponse, service)
totalTime = _formatDuration(time() - start)

totalTime = _formatDuration(time() - start)
print(
f" ({self._rowCount(response, service):d} samples)"
f" Completed in {totalTime}."
)
response["next"] = None
self.__log.info(f"Downloaded {self._rowCount(response, service):d} total samples in {totalTime}.")
response["next"] = None

return response

Expand Down
5 changes: 3 additions & 2 deletions src/onc/modules/_OncArchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ class _OncArchive(_OncService):
Methods that wrap the API archivefiles service
"""

def __init__(self, parent: object):
super().__init__(parent)
def __init__(self, parent: object, verbosity: str, redact_token: str, raise_http_errors: bool):
super().__init__(parent, verbosity, redact_token, raise_http_errors)


def getArchivefileByLocation(self, filters: dict, allPages: bool):
"""
Expand Down
4 changes: 2 additions & 2 deletions src/onc/modules/_OncDelivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class _OncDelivery(_OncService):
Methods that wrap the API data product delivery services
"""

def __init__(self, parent: object):
super().__init__(parent)
def __init__(self, parent: object, verbosity: str, redact_token: str, raise_http_errors: bool):
super().__init__(parent, verbosity, redact_token, raise_http_errors)

# Default seconds to wait between consecutive download tries of a file
# (when no estimate processing time is available)
Expand Down
5 changes: 3 additions & 2 deletions src/onc/modules/_OncDiscovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ class _OncDiscovery(_OncService):
locations, deployments, devices, deviceCategories, properties, dataProducts
"""

def __init__(self, parent: object):
super().__init__(parent)
def __init__(self, parent: object, verbosity: str, redact_token: str, raise_http_errors: bool):
super().__init__(parent, verbosity, redact_token, raise_http_errors)


def _discoveryRequest(self, filters: dict, service: str):
url = self._serviceUrl(service)
Expand Down
6 changes: 3 additions & 3 deletions src/onc/modules/_OncRealTime.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ class _OncRealTime(_OncService):
Near real-time services methods
"""

def __init__(self, config: dict):
super().__init__(config)
def __init__(self, config: dict, verbosity: str, redact_token: str, raise_http_errors: bool):
super().__init__(config, verbosity, redact_token, raise_http_errors)

def getScalardataByLocation(self, filters: dict, allPages: bool):
"""
Expand Down Expand Up @@ -90,7 +90,7 @@ def _getDirectAllPages(self, filters: dict, service: str, allPages: bool) -> Any
filters["sensorCategoryCodes"] = ",".join(filters["sensorCategoryCodes"])

if allPages:
mp = _MultiPage(self)
mp = _MultiPage(self, self.verbosity, self.raise_http_errors)
result = mp.getAllPages(service, url, filters)
else:
result = self._doRequest(url, filters)
Expand Down
107 changes: 63 additions & 44 deletions src/onc/modules/_OncService.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
import requests

from ._util import _createErrorMessage, _formatDuration
from onc.modules._Messages import (setup_logger,
build_error_message,
scrub_token,
REQ_MSG,
RESPONSE_TIME_MSG,
RESPONSE_MSG)

logging.basicConfig(format="%(levelname)s: %(message)s")

Expand All @@ -16,8 +22,16 @@ class _OncService:
Provides common configuration and functionality to Onc service classes (children)
"""

def __init__(self, parent: object):
def __init__(self, parent: object,
verbosity: str,
redact_token: bool,
raise_http_errors: bool):
self.parent = weakref.ref(parent)
self.redact_token = redact_token
self.raise_http_errors = raise_http_errors
self.verbosity = verbosity

self.__log = setup_logger('onc-service', level = verbosity)

def _doRequest(self, url: str, filters: dict | None = None, getTime: bool = False):
"""
Expand All @@ -44,48 +58,60 @@ def _doRequest(self, url: str, filters: dict | None = None, getTime: bool = Fals
filters["token"] = self._config("token")
timeout = self._config("timeout")

txtParams = parse.unquote(parse.urlencode(filters))
self._log(f"Requesting URL:\n{url}?{txtParams}")

start = time()
response = requests.get(url, filters, timeout=timeout)
responseTime = time() - start

if response.ok:
jsonResult = response.json()
if self.redact_token is True:
try:
response_url = scrub_token(response.url)
except:
response_url = response.url
else:
status = response.status_code
if status in [400, 401]:
msg = _createErrorMessage(response)
raise requests.HTTPError(msg)
else:
response.raise_for_status()
self._log(f"Web Service response time: {_formatDuration(responseTime)}")

# Log warning messages only when showWarning is True
# and jsonResult["messages"] is not an empty list
if (
self._config("showWarning")
and "messages" in jsonResult
and jsonResult["messages"]
):
long_message = "\n".join(
[f"* {message}" for message in jsonResult["messages"]]
)
response_url = response.url

filters_without_token = filters.copy()
del filters_without_token["token"]
filters_str = pprint.pformat(filters_without_token)
# Log the url the user submitted.
self.__log.info(REQ_MSG.format(response_url))

logging.warning(
f"When calling {url} with filters\n{filters_str},\n"
f"there are several warning messages:\n{long_message}\n"
)
# Display the time it took for ONC to respond in seconds.
# The requests.Response.elapsed value is a datetime.timedelta object.
responseTime = round(response.elapsed.total_seconds(),3) # To milliseconds.
self.__log.debug(RESPONSE_TIME_MSG.format(responseTime))

json_response = response.json()

if getTime:
return jsonResult, responseTime
if response.status_code == requests.codes.ok:
self.__log.info(RESPONSE_MSG.format("OK", response.status_code))
if getTime is True:
return json_response, responseTime
else:
return json_response
else:
return jsonResult
if response.status_code == requests.codes.not_found:
self.__log.error(RESPONSE_MSG.format("Not Found",
response.status_code))
elif response.status_code == requests.codes.bad:
self.__log.error(RESPONSE_MSG.format("Bad Request",
response.status_code))
elif response.status_code == requests.codes.unauthorized:
self.__log.error(RESPONSE_MSG.format("Unauthorized Request",
response.status_code))
elif response.status_code == requests.codes.internal_server_error:
self.__log.error(RESPONSE_MSG.format("Internal Server Error",
response.status_code))
else:
self.__log.error(RESPONSE_MSG.format('Error',response.status_code))

self.__log.error(build_error_message(response,
self.redact_token))

if self.raise_http_errors is True:
response.raise_for_status()

else:
if getTime is True:
return response, responseTime
else:
return response


def _serviceUrl(self, service: str):
"""
Expand Down Expand Up @@ -115,14 +141,6 @@ def _serviceUrl(self, service: str):

return ""

def _log(self, message: str):
"""
Prints message to console only when self.showInfo is true
@param message: String
"""
if self._config("showInfo"):
print(message)

def _config(self, key: str):
"""
Returns a property from the parent (ONC class)
Expand All @@ -145,3 +163,4 @@ def _delegateByFilters(self, byDevice, byLocation, **kwargs):
"'locationCode' and 'deviceCategoryCode', "
"or a 'deviceCode' present."
)

Loading