Source code for earthdaily.platform

from pystac_client import Client
from pystac_client.stac_api_io import StacApiIO

from earthdaily._api_requester import APIRequester
from earthdaily._eds_config import AssetAccessMode
from earthdaily.platform._bulk_delete import BulkDeleteService
from earthdaily.platform._bulk_insert import BulkInsertService
from earthdaily.platform._bulk_search import BulkSearchService
from earthdaily.platform._stac_item import StacItemService


[docs] class PlatformService: """ Represents the Platform Service for interacting with specific platform-related endpoints. Attributes: ----------- api_requester : APIRequester An instance of APIRequester used to send HTTP requests to the EDS API. """ def __init__(self, api_requester: APIRequester, asset_access_mode: AssetAccessMode): """ Initialize the PlatformService. Parameters: ----------- api_requester : APIRequester An instance of APIRequester used to send HTTP requests to the EDS API. asset_access_mode : AssetAccessMode The mode of access for assets. Defaults to AssetAccessMode.PRESIGNED_URLS. """ self.api_requester = api_requester self.bulk_search = BulkSearchService(api_requester) self.bulk_insert = BulkInsertService(api_requester) self.bulk_delete = BulkDeleteService(api_requester) self.stac_item = StacItemService(api_requester) headers = { "Content-Type": "application/json", "Accept": "application/json", **api_requester.headers, } if asset_access_mode == AssetAccessMode.PROXY_URLS: headers["X-Proxy-Asset-Urls"] = "True" elif asset_access_mode == AssetAccessMode.PRESIGNED_URLS: headers["X-Signed-Asset-Urls"] = "True" if api_requester.auth: headers["Authorization"] = f"Bearer {api_requester.auth.get_token()}" self.pystac_client = Client.open( f"{api_requester.base_url}/platform/v1/stac", stac_io=StacApiIO(max_retries=3), headers=headers, )