diff --git a/openeihttp/__init__.py b/openeihttp/__init__.py index 85b1a38..4f41043 100644 --- a/openeihttp/__init__.py +++ b/openeihttp/__init__.py @@ -1,497 +1,19 @@ """Provide a package for python-openei.""" -from __future__ import annotations - -import datetime -import json -import logging -import time -from typing import Any - -import aiohttp # type: ignore -from aiohttp.client_exceptions import ContentTypeError, ServerTimeoutError - -from .cache import OpenEICache -from .const import BASE_URL - -_LOGGER = logging.getLogger(__name__) -DEFAULT_HEADERS = { - "Content-Type": "application/json", -} -ERROR_TIMEOUT = "Timeout while updating" - - -class UrlNotFound(Exception): - """Exception for NotFound.""" - - -class NotAuthorized(Exception): - """Exception for invalid API key.""" - - -class APIError(Exception): - """Exception for API errors.""" - - -class RateLimit(Exception): - """Exception for API errors.""" - - -class InvalidCall(Exception): - """Exception for invalid library calls.""" - - -class Rates: - """Represent OpenEI Rates.""" - - def __init__( - self, - api: str, - lat: float = 9000, - lon: float = 9000, - plan: str = "", - radius: float = 0.0, - address: str = "", - reading: float = 0.0, - cache_file: str = "", - ) -> None: - """Initialize.""" - self._api = api - self._lat = lat - self._lon = lon - self._plan = plan - self._radius = radius - self._reading = reading - self._address = address - self._data = None - self._redact = [ - self._api, - self._address, - ] - self._cache_file = cache_file - self._timestamp = datetime.datetime(1990, 1, 1, 0, 0, 0) - - async def process_request(self, params: dict, timeout: int = 90) -> dict[str, Any]: - """Process API requests.""" - async with aiohttp.ClientSession(headers=DEFAULT_HEADERS) as session: - _LOGGER.debug("URL: %s", BASE_URL) - try: - async with session.get( - BASE_URL, - params=params, - timeout=aiohttp.ClientTimeout(total=timeout), - ) as response: - message: Any = {} - try: - message = await response.text() - except UnicodeDecodeError: - _LOGGER.debug("Decoding error.") - data = await response.read() - message = data.decode(errors="replace") - - try: - message = json.loads(message) - except ValueError: - _LOGGER.warning("Non-JSON response: %s", message) - message = {"error": message} - - if response.status == 404: - raise UrlNotFound - if response.status == 401: - raise NotAuthorized - if response.status != 200: - _LOGGER.error( - "An error reteiving data from the server, code: %s\nmessage: %s", - response.status, - message, - ) - message = {"error": message} - return message - - except (TimeoutError, ServerTimeoutError): - _LOGGER.error("%s: %s", ERROR_TIMEOUT, BASE_URL) - message = {"error": ERROR_TIMEOUT} - except ContentTypeError as err: - _LOGGER.error("%s", err) - message = {"error": err} - - await session.close() - return message - - async def lookup_plans(self) -> dict[str, Any]: - """Return the rate plan names per utility in the area.""" - if self._address == "" and (self._lat == 9000 and self._lon == 9000): - _LOGGER.error("Missing location data for a plan lookup.") - raise InvalidCall - - thetime = time.time() - - params = { - "version": "latest", - "format": "json", - "api_key": self._api, - "orderby": "startdate", - "sector": "Residential", - "effective_on_date": thetime, - } - - if self._radius != 0.0: - params["radius"] = self._radius - - if self._address == "": - params["lat"] = self._lat - params["lon"] = self._lon - else: - params["address"] = self._address - - rate_names: dict[str, Any] = {} - - result = await self.process_request(params, timeout=90) - - if "error" in result: - err = result["error"] - message = err["message"] if isinstance(err, dict) and "message" in err else str(err) - _LOGGER.error("Error: %s", message) - raise APIError - - if "items" in result: - for item in result["items"]: - utility: str = item["utility"] - if utility not in rate_names: - rate_names[utility] = [] - info = {"name": item["name"], "label": item["label"]} - rate_names[utility].append(info) - - notlisted = "Not Listed" - rate_names[notlisted] = [{"name": notlisted, "label": notlisted}] - return rate_names - - async def update(self) -> None: - """Update data only if we need to.""" - if self._data is None: - _LOGGER.debug("No data populated, refreshing data.") - cache = OpenEICache(self._cache_file) if self._cache_file else OpenEICache() - # Load cached file if one exists - if await cache.cache_exists(): - _LOGGER.debug("Cache file exists, reading...") - self._data = await cache.read_cache() - else: - _LOGGER.debug("Cache file missing, pulling API data...") - await self.update_data() - self._timestamp = datetime.datetime.now() - else: - elapsedtime = datetime.datetime.now() - self._timestamp - past = datetime.timedelta(hours=24) - if elapsedtime >= past: - _LOGGER.debug("Data stale, refreshing from API.") - await self.update_data() - self._timestamp = datetime.datetime.now() - - async def update_data(self) -> None: - """Update the data.""" - params = { - "version": "latest", - "format": "json", - "detail": "full", - "api_key": self._api, - "getpage": self._plan, - } - - result = await self.process_request(params, timeout=90) - - if "error" in result: - err = result["error"] - message = err["message"] if isinstance(err, dict) and "message" in err else str(err) - _LOGGER.error("Error: %s", message) - if "You have exceeded your rate limit." in message: - raise RateLimit - raise APIError - - if "items" in result: - data = result["items"][0] - self._data = data - # Insert cache writing call here - cache = OpenEICache(self._cache_file) if self._cache_file else OpenEICache() - json_data = json.dumps(data).encode("utf-8") - await cache.write_cache(json_data) - _LOGGER.debug("Data updated, results: %s", self._data) - - async def clear_cache(self) -> None: - """Clear cache file.""" - cache = OpenEICache(self._cache_file) if self._cache_file else OpenEICache() - await cache.clear_cache() - - @property - def current_energy_rate_structure(self) -> int | None: - """Return the current rate structure.""" - return self.rate_structure(datetime.datetime.today(), "energy") - - def rate_structure(self, date, rate_type) -> int | None: - """Return the rate structure for a specific date.""" - assert self._data is not None - if f"{rate_type}ratestructure" in self._data: - month = date.month - 1 - hour = date.hour - weekend = date.weekday() > 4 - - table = f"{rate_type}weekendschedule" if weekend else f"{rate_type}weekdayschedule" - lookup_table = self._data[table] - rate_structure = lookup_table[month][hour] - - return rate_structure - return None - - @property - def next_energy_rate_structure(self) -> int | None: - """Return the next rate structure.""" - return self.next_rate_schedule(datetime.datetime.today(), "energy")[1] - - @property - def next_energy_rate_structure_time(self) -> datetime.datetime | None: - """Return the time at which the next rate structure will take effect.""" - return self.next_rate_schedule(datetime.datetime.today(), "energy")[0] - - def next_rate_schedule( - self, start: datetime.datetime, rate_type: str - ) -> tuple[datetime.datetime | None, int | None]: - """Return the next datetime at which the rate structure changes.""" - assert self._data is not None - if f"{rate_type}ratestructure" not in self._data: - return None, None - - current_structure = self.rate_structure(start, rate_type) - current_time = start - - for month_idx in range(start.month - 1, 12 + start.month - 1): - current_time = current_time.replace( - year=start.year + (month_idx // 12), - month=(month_idx % 12) + 1, - minute=0, - second=0, - microsecond=0, - ) - day_of_week = current_time.weekday() - - schedules = ( - ["weekendschedule", "weekdayschedule"] - if day_of_week > 4 - else ["weekdayschedule", "weekendschedule"] - ) - - if current_time.hour > 0: - schedules.append(schedules[0]) - - for schedule in schedules: - table = f"{rate_type}{schedule}" - day_of_week = current_time.weekday() - - for hour in range(current_time.hour, 24 + current_time.hour): - hour = hour % 24 - rate_structure = self._data[table][current_time.month - 1][hour] - - if rate_structure == current_structure: - continue - - if hour >= current_time.hour: - return current_time.replace(hour=hour), rate_structure - - # Handle next day transition - if ( - day_of_week not in [4, 6] - and (current_time + datetime.timedelta(days=1)).month == current_time.month - ): - return ( - current_time.replace(day=current_time.day + 1, hour=hour), - rate_structure, - ) - - days_to_move = 5 - day_of_week if day_of_week <= 4 else 7 - day_of_week - - if ( - current_time + datetime.timedelta(days=days_to_move) - ).month != current_time.month: - break - - current_time = current_time.replace(hour=0, day=current_time.day + days_to_move) - - current_time = current_time.replace(day=1) - - return None, current_structure - - @property - def current_rate(self) -> float | None: - """Return the current rate.""" - return self.rate(datetime.datetime.today()) - - def rate(self, date) -> float | None: - """Return the rate for a specific date.""" - assert self._data is not None - rate_structure = self.rate_structure(date, "energy") - if rate_structure is not None: - if self._reading: - value = float(self._reading) - rate_data = self._data["energyratestructure"][rate_structure] - for rate in rate_data: - if "max" in rate and value < rate["max"]: - return rate["rate"] - continue - return rate_data[-1]["rate"] - rate = self._data["energyratestructure"][rate_structure][0]["rate"] - return rate - return None - - @property - def current_adjustment(self) -> float | None: - """Return the current rate.""" - return self.adjustment(datetime.datetime.today()) - - def adjustment(self, date) -> float | None: - """Return the rate for a specific date.""" - assert self._data is not None - rate_structure = self.rate_structure(date, "energy") - if rate_structure is not None: - adj = None - if self._reading: - rate_data = self._data["energyratestructure"][rate_structure] - if "adj" in rate_data[-1]: - return rate_data[-1]["adj"] - adj_data = self._data["energyratestructure"][rate_structure][0] - if "adj" in adj_data: - adj = adj_data["adj"] - return adj - return None - - @property - def monthly_tier_rate(self) -> float | None: - """Return tier rate. - - Requires the monthy accumulative meter reading. - """ - return self.tier_rate_for_month(datetime.datetime.today()) - - def tier_rate_for_month(self, date) -> float | None: - """Return tier rate for a specific month. - - Requires the monthy accumulative meter reading. - """ - assert self._data is not None - rate_structure = self.rate_structure(date, "energy") - if rate_structure is not None: - if self._reading: - value = float(self._reading) - rate_data = self._data["energyratestructure"][rate_structure] - for rate in rate_data: - if "max" in rate and value < (rate["max"] * 29): - return rate["rate"] - continue - return rate_data[-1]["rate"] - return None - return None - - @property - def all_rates(self) -> tuple | None: - """Return the current rate.""" - assert self._data is not None - if "energyratestructure" in self._data: - rates = [] - adjs = [] - rate_data = self._data["energyratestructure"] - for rate in rate_data: - rates.append(rate[0]["rate"]) - if "adj" in rate[0]: - adjs.append(rate[0]["adj"]) - - return rates, adjs - return None - - @property - def current_demand_rate(self) -> float | None: - """Return the current rate.""" - return self.demand_rate(datetime.datetime.today()) - - def demand_rate(self, date) -> float | None: - """Return the rate for a specific date.""" - assert self._data is not None - rate_structure = self.rate_structure(date, "demand") - if rate_structure is not None: - rate = self._data["demandratestructure"][rate_structure][0]["rate"] - return rate - return None - - @property - def current_demand_adjustment(self) -> float | None: - """Return the current rate.""" - return self.demand_adjustment(datetime.datetime.today()) - - def demand_adjustment(self, date) -> float | None: - """Return the rate for a specific date.""" - assert self._data is not None - rate_structure = self.rate_structure(date, "demand") - if rate_structure is not None: - adj = None - - adj_data = self._data["demandratestructure"][rate_structure][0] - if "adj" in adj_data: - adj = adj_data["adj"] - return adj - return None - - @property - def demand_unit(self) -> str | None: - """Return the demand rate unit.""" - assert self._data is not None - if "demandrateunit" in self._data: - return self._data["demandrateunit"] - return None - - @property - def rate_name(self) -> str: - """Return the rate name.""" - assert self._data is not None - return self._data["name"] - - @property - def approval(self) -> bool: - """Return the rate name.""" - assert self._data is not None - return self._data["approved"] - - @property - def distributed_generation(self) -> str | None: - """Return the distributed generation name.""" - assert self._data is not None - if "dgrules" in self._data: - return self._data["dgrules"] - return None - - @property - def mincharge(self) -> tuple | None: - """Return the mincharge.""" - assert self._data is not None - if "mincharge" in self._data: - return (self._data["mincharge"], self._data["minchargeunits"]) - return None - - @property - def fixedchargefirstmeter(self) -> tuple | None: - """Return the fixedchargefirstmeter.""" - assert self._data is not None - if "fixedchargefirstmeter" in self._data: - return (self._data["fixedchargefirstmeter"], self._data["fixedchargeunits"]) - return None - - @property - def current_sell_rate(self) -> float | None: - """Return the current sell rate.""" - return self.sell_rate(datetime.datetime.today()) - - def sell_rate(self, date) -> float | None: - """Return the rate for a specific date.""" - assert self._data is not None - rate_structure = self.rate_structure(date, "energy") - if rate_structure is not None: - try: - return self._data["energyratestructure"][rate_structure][0]["sell"] - except (KeyError, IndexError): - return None - return None +from .client import Rates +from .exceptions import ( + APIError, + InvalidCall, + NotAuthorized, + RateLimit, + UrlNotFound, +) + +__all__ = [ + "Rates", + "APIError", + "InvalidCall", + "NotAuthorized", + "RateLimit", + "UrlNotFound", +] diff --git a/openeihttp/cache.py b/openeihttp/cache.py index eba2ce2..9c99545 100644 --- a/openeihttp/cache.py +++ b/openeihttp/cache.py @@ -2,11 +2,14 @@ import json import logging -from os.path import dirname, exists, join, split +from os.path import dirname, join, split from typing import Any import aiofiles import aiofiles.os +import aiofiles.ospath + +from .const import MIN_CACHE_SIZE _LOGGER = logging.getLogger(__name__) @@ -21,11 +24,11 @@ def __init__(self, cache_file: str = "") -> None: self._cache_file = cache_file self._directory, self._filename = split(cache_file) - async def write_cache(self, data: Any) -> None: + async def write_cache(self, data: bytes) -> None: """Write cache file.""" - if self._directory != "" and not exists(self._directory): - _LOGGER.debug("Directory missing creating: %s", self._directory) - await aiofiles.os.makedirs(self._directory) + if self._directory != "": + _LOGGER.debug("Ensuring directory exists: %s", self._directory) + await aiofiles.os.makedirs(self._directory, exist_ok=True) async with aiofiles.open(self._cache_file, mode="wb") as file: _LOGGER.debug("Writing file: %s", self._cache_file) await file.write(data) @@ -33,7 +36,7 @@ async def write_cache(self, data: Any) -> None: async def read_cache(self) -> Any: """Read cache file.""" _LOGGER.debug("Attempting to read file: %s", self._cache_file) - if exists(self._cache_file): + if await aiofiles.ospath.exists(self._cache_file): async with aiofiles.open(self._cache_file) as file: _LOGGER.debug("Reading file: %s", self._cache_file) value = await file.read() @@ -53,7 +56,7 @@ async def cache_exists(self) -> bool: if check: size = await aiofiles.os.path.getsize(self._cache_file) _LOGGER.debug("Checking cache file size: %s", size) - return size > 194 + return size >= MIN_CACHE_SIZE return False async def clear_cache(self) -> None: diff --git a/openeihttp/client.py b/openeihttp/client.py new file mode 100644 index 0000000..0bb81a5 --- /dev/null +++ b/openeihttp/client.py @@ -0,0 +1,485 @@ +"""Client for python-openei.""" + +from __future__ import annotations + +import datetime +import json +import logging +import time +from typing import Any + +import aiohttp +from aiohttp.client_exceptions import ContentTypeError, ServerTimeoutError + +from .cache import OpenEICache +from .const import BASE_URL, DEFAULT_HEADERS, ERROR_TIMEOUT +from .exceptions import APIError, InvalidCall, NotAuthorized, RateLimit, UrlNotFound + +_LOGGER = logging.getLogger(__name__) + + +class Rates: + """Represent OpenEI Rates.""" + + def __init__( + self, + api: str, + lat: float | None = None, + lon: float | None = None, + plan: str = "", + radius: float = 0.0, + address: str = "", + reading: float = 0.0, + cache_file: str = "", + session: aiohttp.ClientSession | None = None, + ) -> None: + """Initialize.""" + self._api = api + self._lat = lat + self._lon = lon + self._plan = plan + self._radius = radius + self._reading = reading + self._address = address + self._data: dict[str, Any] | None = None + self._redact = [ + self._api, + self._address, + ] + self._cache_file = cache_file + self._timestamp = datetime.datetime(1990, 1, 1, 0, 0, 0) + self._session = session + + async def process_request(self, params: dict[str, Any], timeout: int = 90) -> dict[str, Any]: + """Process API requests.""" + if self._session is not None: + return await self._execute_request(self._session, params, timeout) + + async with aiohttp.ClientSession(headers=DEFAULT_HEADERS) as session: + return await self._execute_request(session, params, timeout) + + async def _execute_request( + self, session: aiohttp.ClientSession, params: dict[str, Any], timeout: int + ) -> dict[str, Any]: + """Execute the request with the given session.""" + _LOGGER.debug("URL: %s", BASE_URL) + try: + async with session.get( + BASE_URL, + params=params, + timeout=aiohttp.ClientTimeout(total=timeout), + ) as response: + message: Any = {} + try: + message = await response.text() + except UnicodeDecodeError: + _LOGGER.debug("Decoding error.") + data = await response.read() + message = data.decode(errors="replace") + + try: + message = json.loads(message) + except ValueError: + _LOGGER.warning("Non-JSON response: %s", message) + message = {"error": message} + + if response.status == 404: + raise UrlNotFound + if response.status == 401: + raise NotAuthorized + if response.status != 200: + _LOGGER.error( + "An error retrieving data from the server, code: %s\nmessage: %s", + response.status, + message, + ) + message = {"error": message} + return message + + except (TimeoutError, ServerTimeoutError): + _LOGGER.error("%s: %s", ERROR_TIMEOUT, BASE_URL) + return {"error": ERROR_TIMEOUT} + except ContentTypeError as err: + _LOGGER.error("%s", err) + return {"error": err} + + async def lookup_plans(self) -> dict[str, Any]: + """Return the rate plan names per utility in the area.""" + if self._address == "" and (self._lat is None or self._lon is None): + _LOGGER.error("Missing location data for a plan lookup.") + raise InvalidCall + + thetime = time.time() + + params: dict[str, Any] = { + "version": "latest", + "format": "json", + "api_key": self._api, + "orderby": "startdate", + "sector": "Residential", + "effective_on_date": thetime, + } + + if self._radius != 0.0: + params["radius"] = self._radius + + if self._address == "": + params["lat"] = self._lat + params["lon"] = self._lon + else: + params["address"] = self._address + + rate_names: dict[str, Any] = {} + + result = await self.process_request(params, timeout=90) + + if "error" in result: + err = result["error"] + message = err["message"] if isinstance(err, dict) and "message" in err else str(err) + _LOGGER.error("Error: %s", message) + raise APIError + + if "items" in result: + for item in result["items"]: + utility: str = item["utility"] + if utility not in rate_names: + rate_names[utility] = [] + info = {"name": item["name"], "label": item["label"]} + rate_names[utility].append(info) + + notlisted = "Not Listed" + rate_names[notlisted] = [{"name": notlisted, "label": notlisted}] + return rate_names + + async def update(self) -> None: + """Update data only if we need to.""" + if self._data is None: + _LOGGER.debug("No data populated, refreshing data.") + cache = OpenEICache(self._cache_file) if self._cache_file else OpenEICache() + # Load cached file if one exists + if await cache.cache_exists(): + _LOGGER.debug("Cache file exists, reading...") + self._data = await cache.read_cache() + else: + _LOGGER.debug("Cache file missing, pulling API data...") + await self.update_data() + self._timestamp = datetime.datetime.now() + else: + elapsedtime = datetime.datetime.now() - self._timestamp + past = datetime.timedelta(hours=24) + if elapsedtime >= past: + _LOGGER.debug("Data stale, refreshing from API.") + await self.update_data() + self._timestamp = datetime.datetime.now() + + async def update_data(self) -> None: + """Update the data.""" + params = { + "version": "latest", + "format": "json", + "detail": "full", + "api_key": self._api, + "getpage": self._plan, + } + + result = await self.process_request(params, timeout=90) + + if "error" in result: + err = result["error"] + message = err["message"] if isinstance(err, dict) and "message" in err else str(err) + _LOGGER.error("Error: %s", message) + if "You have exceeded your rate limit." in message: + raise RateLimit + raise APIError + + if "items" in result: + data = result["items"][0] + self._data = data + # Insert cache writing call here + cache = OpenEICache(self._cache_file) if self._cache_file else OpenEICache() + json_data = json.dumps(data).encode("utf-8") + await cache.write_cache(json_data) + _LOGGER.debug("Data updated, results: %s", self._data) + + async def clear_cache(self) -> None: + """Clear cache file.""" + cache = OpenEICache(self._cache_file) if self._cache_file else OpenEICache() + await cache.clear_cache() + + @property + def current_energy_rate_structure(self) -> int | None: + """Return the current rate structure.""" + return self.rate_structure(datetime.datetime.today(), "energy") + + def rate_structure(self, date: datetime.datetime, rate_type: str) -> int | None: + """Return the rate structure for a specific date.""" + assert self._data is not None + if f"{rate_type}ratestructure" in self._data: + month = date.month - 1 + hour = date.hour + weekend = date.weekday() > 4 + + table = f"{rate_type}weekendschedule" if weekend else f"{rate_type}weekdayschedule" + lookup_table = self._data[table] + rate_structure = lookup_table[month][hour] + + return rate_structure + return None + + @property + def next_energy_rate_structure(self) -> int | None: + """Return the next rate structure.""" + return self.next_rate_schedule(datetime.datetime.today(), "energy")[1] + + @property + def next_energy_rate_structure_time(self) -> datetime.datetime | None: + """Return the time at which the next rate structure will take effect.""" + return self.next_rate_schedule(datetime.datetime.today(), "energy")[0] + + def next_rate_schedule( + self, start: datetime.datetime, rate_type: str + ) -> tuple[datetime.datetime | None, int | None]: + """Return the next datetime at which the rate structure changes.""" + assert self._data is not None + if f"{rate_type}ratestructure" not in self._data: + return None, None + + current_structure = self.rate_structure(start, rate_type) + current_time = start + + for month_idx in range(start.month - 1, 12 + start.month - 1): + current_time = current_time.replace( + year=start.year + (month_idx // 12), + month=(month_idx % 12) + 1, + minute=0, + second=0, + microsecond=0, + ) + day_of_week = current_time.weekday() + + schedules = ( + ["weekendschedule", "weekdayschedule"] + if day_of_week > 4 + else ["weekdayschedule", "weekendschedule"] + ) + + if current_time.hour > 0: + schedules.append(schedules[0]) + + for schedule in schedules: + table = f"{rate_type}{schedule}" + day_of_week = current_time.weekday() + + for hour in range(current_time.hour, 24 + current_time.hour): + hour = hour % 24 + rate_structure = self._data[table][current_time.month - 1][hour] + + if rate_structure == current_structure: + continue + + if hour >= current_time.hour: + return current_time.replace(hour=hour), rate_structure + + # Handle next day transition + if ( + day_of_week not in [4, 6] + and (current_time + datetime.timedelta(days=1)).month == current_time.month + ): + return ( + current_time.replace(day=current_time.day + 1, hour=hour), + rate_structure, + ) + + days_to_move = 5 - day_of_week if day_of_week <= 4 else 7 - day_of_week + + if ( + current_time + datetime.timedelta(days=days_to_move) + ).month != current_time.month: + break + + current_time = current_time.replace(hour=0, day=current_time.day + days_to_move) + + current_time = current_time.replace(day=1) + + return None, current_structure + + @property + def current_rate(self) -> float | None: + """Return the current rate.""" + return self.rate(datetime.datetime.today()) + + def rate(self, date: datetime.datetime) -> float | None: + """Return the rate for a specific date.""" + assert self._data is not None + rate_structure = self.rate_structure(date, "energy") + if rate_structure is not None: + if self._reading: + value = float(self._reading) + rate_data = self._data["energyratestructure"][rate_structure] + for rate in rate_data: + if "max" in rate and value < rate["max"]: + return rate["rate"] + continue + return rate_data[-1]["rate"] + rate = self._data["energyratestructure"][rate_structure][0]["rate"] + return rate + return None + + @property + def current_adjustment(self) -> float | None: + """Return the current rate.""" + return self.adjustment(datetime.datetime.today()) + + def adjustment(self, date: datetime.datetime) -> float | None: + """Return the rate for a specific date.""" + assert self._data is not None + rate_structure = self.rate_structure(date, "energy") + if rate_structure is not None: + adj = None + if self._reading: + rate_data = self._data["energyratestructure"][rate_structure] + if "adj" in rate_data[-1]: + return rate_data[-1]["adj"] + adj_data = self._data["energyratestructure"][rate_structure][0] + if "adj" in adj_data: + adj = adj_data["adj"] + return adj + return None + + @property + def monthly_tier_rate(self) -> float | None: + """Return tier rate. + + Requires the monthy accumulative meter reading. + """ + return self.tier_rate_for_month(datetime.datetime.today()) + + def tier_rate_for_month(self, date: datetime.datetime) -> float | None: + """Return tier rate for a specific month. + + Requires the monthy accumulative meter reading. + """ + assert self._data is not None + rate_structure = self.rate_structure(date, "energy") + if rate_structure is not None: + if self._reading: + value = float(self._reading) + rate_data = self._data["energyratestructure"][rate_structure] + for rate in rate_data: + if "max" in rate and value < (rate["max"] * 29): + return rate["rate"] + continue + return rate_data[-1]["rate"] + return None + return None + + @property + def all_rates(self) -> tuple[list[float], list[float]] | None: + """Return the current rate.""" + assert self._data is not None + if "energyratestructure" in self._data: + rates = [] + adjs = [] + rate_data = self._data["energyratestructure"] + for rate in rate_data: + rates.append(rate[0]["rate"]) + if "adj" in rate[0]: + adjs.append(rate[0]["adj"]) + + return rates, adjs + return None + + @property + def current_demand_rate(self) -> float | None: + """Return the current rate.""" + return self.demand_rate(datetime.datetime.today()) + + def demand_rate(self, date: datetime.datetime) -> float | None: + """Return the rate for a specific date.""" + assert self._data is not None + rate_structure = self.rate_structure(date, "demand") + if rate_structure is not None: + rate = self._data["demandratestructure"][rate_structure][0]["rate"] + return rate + return None + + @property + def current_demand_adjustment(self) -> float | None: + """Return the current rate.""" + return self.demand_adjustment(datetime.datetime.today()) + + def demand_adjustment(self, date: datetime.datetime) -> float | None: + """Return the rate for a specific date.""" + assert self._data is not None + rate_structure = self.rate_structure(date, "demand") + if rate_structure is not None: + adj = None + + adj_data = self._data["demandratestructure"][rate_structure][0] + if "adj" in adj_data: + adj = adj_data["adj"] + return adj + return None + + @property + def demand_unit(self) -> str | None: + """Return the demand rate unit.""" + assert self._data is not None + if "demandrateunit" in self._data: + return self._data["demandrateunit"] + return None + + @property + def rate_name(self) -> str: + """Return the rate name.""" + assert self._data is not None + return self._data["name"] + + @property + def approval(self) -> bool: + """Return the rate name.""" + assert self._data is not None + return self._data["approved"] + + @property + def distributed_generation(self) -> str | None: + """Return the distributed generation name.""" + assert self._data is not None + if "dgrules" in self._data: + return self._data["dgrules"] + return None + + @property + def mincharge(self) -> tuple[Any, Any] | None: + """Return the mincharge.""" + assert self._data is not None + if "mincharge" in self._data: + return (self._data["mincharge"], self._data["minchargeunits"]) + return None + + @property + def fixedchargefirstmeter(self) -> tuple[Any, Any] | None: + """Return the fixedchargefirstmeter.""" + assert self._data is not None + if "fixedchargefirstmeter" in self._data: + return ( + self._data["fixedchargefirstmeter"], + self._data["fixedchargeunits"], + ) + return None + + @property + def current_sell_rate(self) -> float | None: + """Return the current sell rate.""" + return self.sell_rate(datetime.datetime.today()) + + def sell_rate(self, date: datetime.datetime) -> float | None: + """Return the rate for a specific date.""" + assert self._data is not None + rate_structure = self.rate_structure(date, "energy") + if rate_structure is not None: + try: + return self._data["energyratestructure"][rate_structure][0]["sell"] + except (KeyError, IndexError): + return None + return None diff --git a/openeihttp/const.py b/openeihttp/const.py index f40528b..ec5657f 100644 --- a/openeihttp/const.py +++ b/openeihttp/const.py @@ -1,3 +1,8 @@ -"""Cosntants for python-openei.""" +"""Constants for python-openei.""" BASE_URL = "https://api.openei.org/utility_rates" +DEFAULT_HEADERS = { + "Content-Type": "application/json", +} +ERROR_TIMEOUT = "Timeout while updating" +MIN_CACHE_SIZE = 194 # Minimum size for a valid JSON cache file from OpenEI diff --git a/openeihttp/exceptions.py b/openeihttp/exceptions.py new file mode 100644 index 0000000..85eb04b --- /dev/null +++ b/openeihttp/exceptions.py @@ -0,0 +1,21 @@ +"""Exceptions for python-openei.""" + + +class UrlNotFound(Exception): + """Exception for NotFound.""" + + +class NotAuthorized(Exception): + """Exception for invalid API key.""" + + +class APIError(Exception): + """Exception for API errors.""" + + +class RateLimit(Exception): + """Exception for API errors.""" + + +class InvalidCall(Exception): + """Exception for invalid library calls.""" diff --git a/tests/test_init.py b/tests/test_init.py index 8a9ff35..a53fd1c 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -1485,3 +1485,42 @@ async def test_get_lookup_data_content_type_error(mock_aioclient, caplog): with pytest.raises(openeihttp.APIError): await test_lookup.lookup_plans() assert "Attempt to decode JSON with unexpected mimetype" in caplog.text + + +async def test_session_reuse(mock_aioclient): + """Test custom session is used and not closed.""" + import aiohttp + + mock_aioclient.get( + re.compile(TEST_PATTERN), + status=200, + body=load_fixture("lookup.json"), + ) + async with aiohttp.ClientSession() as custom_session: + test_lookup = openeihttp.Rates( + api="fakeAPIKey", + lat=1.0, + lon=1.0, + session=custom_session, + ) + status = await test_lookup.lookup_plans() + assert status is not None + assert not custom_session.closed + + +async def test_default_none_coordinates(): + """Test that default coordinates are None.""" + test_lookup = openeihttp.Rates(api="fakeAPIKey") + assert test_lookup._lat is None + assert test_lookup._lon is None + + +async def test_partial_coordinates(): + """Test that partial coordinates raise InvalidCall.""" + test_lookup_lat = openeihttp.Rates(api="fakeAPIKey", lat=1.0) + with pytest.raises(openeihttp.InvalidCall): + await test_lookup_lat.lookup_plans() + + test_lookup_lon = openeihttp.Rates(api="fakeAPIKey", lon=1.0) + with pytest.raises(openeihttp.InvalidCall): + await test_lookup_lon.lookup_plans()