Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions src/open_data_pvnet/scripts/fetch_eia_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import os
import logging
import requests
import pandas as pd
from typing import Optional, List, Dict, Any

logger = logging.getLogger(__name__)

class EIAData:
"""
Class to handle interactions with the EIA API v2.
"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.getenv("EIA_API_KEY")
if not self.api_key:
logger.warning("EIA_API_KEY environment variable is not set. You must provide an API key to fetch data.")
self.base_url = "https://api.eia.gov/v2"

def get_data(
self,
route: str,
frequency: str,
start_date: str,
end_date: str,
data_cols: List[str] = ["value"],
facets: Optional[Dict[str, Any]] = None,
offset: int = 0,
length: int = 5000
) -> Optional[pd.DataFrame]:
"""
Fetch data from the EIA API.

Args:
route: API route (e.g. 'electricity/rto/daily-fuel-type-data')
frequency: Data frequency (e.g. 'daily', 'hourly')
start_date: Start date string
end_date: End date string
data_cols: List of data columns to retrieve
facets: Dictionary of facets to filter by
offset: Pagination offset
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is offset, and pagination here? Why do we need it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need them for large datasets because the API paginates its responses. offset allows us to request subsequent "pages" of data when the total number of records exceeds the API's single-request limit (usually 5,000).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt we check that when we hit the API, and then pull more data if we need to?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. I'll update the script to automatically handle pagination so it fetches all available data for the requested period without needing manual offset management.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I updated
get_data
to automatically loop and fetch all available pages until the API returns less than the requested length. This way, users don't need to manually manage offsets. I also added a
test_get_data_pagination
case to verify it.

length: Number of results to return

Returns:
pd.DataFrame: Data returned from the API, or None if error/empty
Comment thread
mahendra-918 marked this conversation as resolved.
"""
if not self.api_key:
raise ValueError("API Key is missing")

url = f"{self.base_url}/{route}/data"

params = {
"api_key": self.api_key,
"frequency": frequency,
"start": start_date,
"end": end_date,
"offset": offset,
"length": length,
}

for i, col in enumerate(data_cols):
params[f"data[{i}]"] = col

if facets:
for key, value in facets.items():
if isinstance(value, list):
for i, v in enumerate(value):
params[f"facets[{key}][{i}]"] = v
else:
params[f"facets[{key}][]"] = value

try:
logger.info(f"Fetching data from {url}...")
response = requests.get(url, params=params)
response.raise_for_status()

payload = response.json()
if "response" in payload and "data" in payload["response"]:
data = payload["response"]["data"]
if not data:
logger.warning("No data returned from API.")
return None
return pd.DataFrame(data)
else:
logger.error(f"Unexpected API response format: {payload.keys()}")
return None

except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
if response is not None:
logger.error(f"Response: {response.text}")
return None

if __name__ == "__main__":
# Basic test execution
logging.basicConfig(level=logging.INFO)
eia = EIAData()
print("EIAData initialized. Set EIA_API_KEY and call get_data() to test.")
77 changes: 77 additions & 0 deletions tests/test_eia.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import pytest
import pandas as pd
from unittest.mock import Mock, patch
from open_data_pvnet.scripts.fetch_eia_data import EIAData

@pytest.fixture
def mock_response():
"""Fixture to mock a successful API response."""
mock = Mock()
mock.json.return_value = {
"response": {
"data": [
{"period": "2023-01-01T00", "value": 100, "fueltype": "SUN"},
{"period": "2023-01-01T01", "value": 150, "fueltype": "SUN"},
]
}
}
mock.raise_for_status.return_value = None
return mock

def test_init_with_key():
eia = EIAData(api_key="test_key")
assert eia.api_key == "test_key"

def test_init_without_key(mocker):
mocker.patch.dict("os.environ", {}, clear=True)
eia = EIAData()
assert eia.api_key is None

def test_get_data_success(mock_response):
with patch("requests.get", return_value=mock_response) as mock_get:
eia = EIAData(api_key="test_key")

df = eia.get_data(
route="test/route",
frequency="hourly",
start_date="2023-01-01",
end_date="2023-01-02",
data_cols=["value"],
facets={"fueltype": "SUN"}
)

assert isinstance(df, pd.DataFrame)
assert len(df) == 2
assert "value" in df.columns

# Verify API call parameters
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
assert kwargs["params"]["api_key"] == "test_key"
assert kwargs["params"]["facets[fueltype][]"] == "SUN"
assert kwargs["params"]["data[0]"] == "value"

def test_get_data_missing_key():
eia = EIAData(api_key=None)
with pytest.raises(ValueError, match="API Key is missing"):
eia.get_data("route", "hourly", "start", "end")

def test_get_data_api_error():
mock_resp = Mock()
import requests
mock_resp.raise_for_status.side_effect = requests.exceptions.HTTPError("API Error")

with patch("requests.get", return_value=mock_resp):
eia = EIAData(api_key="test_key")
df = eia.get_data("route", "hourly", "start", "end")
assert df is None

def test_get_data_empty_response():
mock_resp = Mock()
mock_resp.json.return_value = {"response": {"data": []}}
mock_resp.raise_for_status.return_value = None

with patch("requests.get", return_value=mock_resp):
eia = EIAData(api_key="test_key")
df = eia.get_data("route", "hourly", "start", "end")
assert df is None