import json
import logging
import operator
import os
import time
import warnings
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import geopandas as gpd
import numpy as np
import requests
import xarray as xr
from odc import stac
from pystac.item_collection import ItemCollection
from pystac_client import Client
from pystac_client.stac_api_io import StacApiIO
from urllib3 import Retry
from . import _scales_collections, cube_utils, mask
from .cube_utils import _datacubes, asset_mapper, datacube, metacube
from .parallel_search import NoItemsFoundError, parallel_search
__all__ = ["datacube", "metacube", "xr", "stac"]
logging.getLogger("earthdaily-earthdatastore")
[docs]
@dataclass
class EarthDataStoreConfig:
auth_url: Optional[str] = None
client_secret: Optional[str] = None
client_id: Optional[str] = None
eds_url: str = "https://api.earthdaily.com/platform/v1/stac"
access_token: Optional[str] = None
[docs]
def apply_single_condition(
item_value, condition_op: str, condition_value: [any, list[any]]
) -> bool:
"""
Apply a single comparison condition to an item's property value.
Parameters
----------
item_value : any
The value of the property in the item.
condition_op : str
The comparison operator (e.g., 'lt', 'gt', 'eq').
condition_value : [any, list[any]]
The value or list of values to compare against.
Returns
-------
bool
True if the condition is met, False otherwise.
"""
# Ensure condition_value is always a list
values = condition_value if isinstance(condition_value, list) else [condition_value]
# Get the comparison function from the operator module
op_func = operator.__dict__.get(condition_op)
if not op_func:
raise ValueError(f"Unsupported operator: {condition_op}")
# Check if any value meets the condition
return any(op_func(item_value, val) for val in values)
[docs]
def validate_property_condition(
item: any, property_name: str, conditions: dict[str, any]
) -> bool:
"""
Validate if an item meets all conditions for a specific property.
Parameters
----------
item : any
The STAC item to check.
property_name : str
The name of the property to validate.
conditions : dict[str, any]
Dictionary of conditions to apply to the property.
Returns
-------
bool
True if all conditions are met, False otherwise.
"""
# Check if the property exists in the item
if property_name not in item.properties:
return False
# Check each condition for the property
return all(
apply_single_condition(
item.properties.get(property_name), condition_op, condition_value
)
for condition_op, condition_value in conditions.items()
)
[docs]
def filter_items(items: list[any], query: dict[str, dict[str, any]]) -> list[any]:
"""
Filter items based on a complex query dictionary.
Parameters
----------
items : list[any]
List of STAC items to filter.
query : dict[str, dict[str, any]]
Query filter with operations to apply to item properties.
Returns
-------
list[any]
Filtered list of items matching the query.
Examples
--------
>>> query = {
... 'eo:cloud_cover': {'lt': [10], 'gt': [0]},
... 'datetime': {'eq': '2023-01-01'}
... }
>>> filtered_items = filter_items(catalog_items, query)
"""
return [
item
for item in items
if all(
validate_property_condition(item, property_name, conditions)
for property_name, conditions in query.items()
)
]
[docs]
def post_query_items(
items: list[any], query: dict[str, dict[str, any]]
) -> ItemCollection:
"""
Apply a query filter to items fetched from a STAC catalog and return an ItemCollection.
Parameters
----------
items : list[any]
List of STAC items to filter.
query : dict[str, dict[str, any]]
Query filter with operations to apply to item properties.
Returns
-------
ItemCollection
Filtered collection of items matching the query.
Examples
--------
>>> query = {
... 'eo:cloud_cover': {'lt': [10], 'gt': [0]},
... 'datetime': {'eq': '2023-01-01'}
... }
>>> filtered_items = post_query_items(catalog_items, query)
"""
filtered_items = filter_items(items, query)
return ItemCollection(
filtered_items
) # Assuming ItemCollection is imported/defined elsewhere
def _select_last_common_occurrences(first, second):
"""
For each date in second dataset, select the last N occurrences of that date from first dataset,
where N is the count of that date in second dataset.
Parameters:
first (xarray.Dataset): Source dataset
second (xarray.Dataset): Dataset containing the dates to match and their counts
Returns:
xarray.Dataset: Subset of first dataset with selected time indices
"""
# Convert times to datetime64[ns] if they aren't already
first_times = first.time.astype("datetime64[ns]")
second_times = second.time.astype("datetime64[ns]")
# Get unique dates and their counts from second dataset
unique_dates, counts = np.unique(second_times.values, return_counts=True)
# Initialize list to store selected indices
selected_indices = []
# For each unique date in second
for date, count in zip(unique_dates, counts):
# Find all indices where this date appears in first
date_indices = np.where(first_times == date)[0]
# Take the last 'count' number of indices
selected_indices.extend(date_indices[-count:])
# Sort indices to maintain temporal order (or reverse them if needed)
selected_indices = sorted(selected_indices, reverse=True)
# Select these indices from the first dataset
return first.isel(time=selected_indices)
def _cloud_path_to_http(cloud_path):
"""Convert a cloud path to HTTP URL.
Parameters
----------
cloud_path : str
Cloud path
Returns
-------
url : str
HTTP URL
"""
endpoints = dict(s3="s3.amazonaws.com")
cloud_provider = cloud_path.split("://")[0]
container = cloud_path.split("/")[2]
key = "/".join(cloud_path.split("/")[3:])
endpoint = endpoints.get(cloud_provider, None)
if endpoint:
url = f"https://{container}.{endpoint}/{key}"
else:
url = cloud_path
return url
[docs]
def enhance_assets(
items: ItemCollection,
alternate: str = "download",
use_http_url: bool = False,
add_default_scale_factor: bool = False,
) -> ItemCollection:
"""
Enhance STAC item assets with additional metadata and URL transformations.
Parameters
----------
items : ItemCollection
Collection of STAC items to enhance
alternate : Optional[str], optional
Alternate asset href to use, by default "download"
use_http_url : bool, optional
Convert cloud URLs to HTTP URLs, by default False
add_default_scale_factor : bool, optional
Add default scale, offset, nodata to raster bands, by default False
Returns
-------
ItemCollection
Enhanced collection of STAC items
"""
if any((alternate, use_http_url, add_default_scale_factor)):
for idx, item in enumerate(items):
keys = list(item.assets.keys())
for asset in keys:
# use the alternate href if it exists
if alternate:
href = (
item.assets[asset]
.extra_fields.get("alternate", {})
.get(alternate, {})
.get("href")
)
if href:
items[idx].assets[asset].href = href
# use HTTP URL instead of cloud path
if use_http_url:
href = item.assets[asset].to_dict().get("href", {})
if href:
items[idx].assets[asset].href = _cloud_path_to_http(href)
if add_default_scale_factor:
scale_factor_collection = (
_scales_collections.scale_factor_collections.get(
item.collection_id, [{}]
)
)
for scales_collection in scale_factor_collection:
if asset in scales_collection.get("assets", []):
if (
"raster:bands"
not in items[idx].assets[asset].extra_fields
):
items[idx].assets[asset].extra_fields[
"raster:bands"
] = [{}]
if (
not items[idx]
.assets[asset]
.extra_fields["raster:bands"][0]
.get("scale")
):
items[idx].assets[asset].extra_fields["raster:bands"][
0
]["scale"] = scales_collection["scale"]
items[idx].assets[asset].extra_fields["raster:bands"][
0
]["offset"] = scales_collection["offset"]
items[idx].assets[asset].extra_fields["raster:bands"][
0
]["nodata"] = scales_collection["nodata"]
return items
[docs]
class StacCollectionExplorer:
"""
A class to explore a STAC collection.
Parameters
----------
client : Client
A PySTAC client for interacting with the Earth Data Store STAC API.
collection : str
The name of the collection to explore.
Returns
-------
None
"""
def __init__(self, client, collection):
self.client = client
self.collection = collection
self.client_collection = self.client.get_collection(self.collection)
self.item = self.__first_item()
self.properties = self.client_collection.to_dict()
def __first_item(self):
"""Get the first item of the STAC collection as an overview of the items content.
Returns
-------
item : Item
The first item of the collection.
"""
for item in self.client.get_collection(self.collection).get_items():
self.item = item
break
return self.item
@property
def item_properties(self):
return {k: self.item.properties[k] for k in sorted(self.item.properties.keys())}
def assets(self, asset_name=None):
if asset_name:
return self.asset_metadata(asset_name)
return list(sorted(self.item.assets.keys()))
def assets_metadata(self, asset_name=None):
if asset_name:
return self.asset_metadata(asset_name)
return {k: self.asset_metadata(k) for k in self.assets()}
def asset_metadata(self, asset_name):
return self.item.assets[asset_name].to_dict()
def __repr__(self):
return f'Exploring collection "{self.collection}"'
[docs]
class Auth:
def __init__(
self, config: str | dict = None, presign_urls=True, asset_proxy_enabled=False
):
"""
A client for interacting with the Earth Data Store API.
By default, Earth Data Store will look for environment variables called
EDS_AUTH_URL, EDS_SECRET and EDS_CLIENT_ID.
Parameters
----------
config : str | dict, optional
The path to the json file containing the Earth Data Store credentials,
or a dict with those credentials.
asset_proxy_enabled : bool, optional
Use asset proxy URLs, by default False
Returns
-------
None.
Example
--------
>>> eds = earthdaily.earthdatastore()
>>> collection = "venus-l2a"
>>> theia_location = "MEAD"
>>> max_cloud_cover = 20
>>> query = { "theia:location": {"eq": theia_location}, "eo:cloud_cover": {"lt": max_cloud_cover} }
>>> items = eds.search(collections=collection, query=query)
>>> print(len(items))
132
"""
if not isinstance(config, dict):
warnings.warn(
"Using directly the Auth class to load credentials is deprecated. "
"Please use earthdaily.EarthDataStore() instead",
FutureWarning,
)
self._client = None
self.__auth_config = config
self.__presign_urls = presign_urls
self.__asset_proxy_enabled = asset_proxy_enabled
self._first_items_ = {}
self._staccollectionexplorer = {}
self.__time_eds_log = time.time()
self._client = self.client
[docs]
@classmethod
def from_credentials(
cls,
json_path: Optional[Path] = None,
toml_path: Optional[Path] = None,
profile: Optional[str] = None,
presign_urls: bool = True,
asset_proxy_enabled: bool = False,
) -> "Auth":
"""
Secondary Constructor.
Try to read Earth Data Store credentials from multiple sources, in the following order:
- from input credentials stored in a given JSON file
- from input credentials stored in a given TOML file
- from environement variables
- from the $HOME/.earthdaily/credentials TOML file and a given profile
- from the $HOME/.earthdaily/credentials TOML file and the "default" profile
Parameters
----------
path : Path, optional
The path to the TOML file containing the Earth Data Store credentials.
Uses "$HOME/.earthdaily/credentials" by default.
profile : profile, optional
Name of the profile to use in the TOML file.
Uses "default" by default.
asset_proxy_enabled : bool, optional
Use asset proxy URLs, by default False
Returns
-------
Auth
A :class:`Auth` instance
"""
config = cls.read_credentials(
json_path=json_path,
toml_path=toml_path,
profile=profile,
)
for item, value in config.items():
if not value:
raise ValueError(f"Missing value for {item}")
return cls(
config=config,
presign_urls=presign_urls,
asset_proxy_enabled=asset_proxy_enabled,
)
[docs]
@classmethod
def read_credentials(
cls,
json_path: Optional[Path] = None,
toml_path: Optional[Path] = None,
profile: Optional[str] = None,
) -> "Auth":
"""
Try to read Earth Data Store credentials from multiple sources, in the following order:
- from input credentials stored in a given JSON file
- from input credentials stored in a given TOML file
- from environement variables
- from the $HOME/.earthdaily/credentials TOML file and a given profile
- from the $HOME/.earthdaily/credentials TOML file and the "default" profile
Parameters
----------
path : Path, optional
The path to the TOML file containing the Earth Data Store credentials.
Uses "$HOME/.earthdaily/credentials" by default.
profile : profile, optional
Name of the profile to use in the TOML file.
Uses "default" by default.
Returns
-------
dict
Dictionary containing credentials
"""
if json_path is not None:
config = cls.read_credentials_from_json(json_path=json_path)
elif toml_path is not None:
config = cls.read_credentials_from_toml(
toml_path=toml_path, profile=profile
)
elif (
os.getenv("EDS_AUTH_URL")
and os.getenv("EDS_SECRET")
and os.getenv("EDS_CLIENT_ID")
):
config = cls.read_credentials_from_environment()
else:
config = cls.read_credentials_from_ini()
return config
[docs]
@classmethod
def read_credentials_from_ini(cls, profile: str = "default") -> dict:
"""
Read Earth Data Store credentials from a ini file.
Parameters
----------
ini_path : Path
The path to the INI file containing the Earth Data Store credentials.
Returns
-------
dict
Dictionary containing credentials
"""
from configparser import ConfigParser
ini_path = Path.home() / ".earthdaily/credentials"
ini_config = ConfigParser()
ini_config.read(ini_path)
ini_config = ini_config[profile]
config = {key.upper(): value for key, value in ini_config.items()}
return config
[docs]
@classmethod
def read_credentials_from_json(cls, json_path: Path) -> dict:
"""
Read Earth Data Store credentials from a JSON file.
Parameters
----------
json_path : Path
The path to the JSON file containing the Earth Data Store credentials.
Returns
-------
dict
Dictionary containing credentials
"""
if isinstance(json_path, dict):
return json_path
with json_path.open() as file_object:
config = json.load(file_object)
return config
[docs]
@classmethod
def read_credentials_from_environment(cls) -> dict:
"""
Read Earth Data Store credentials from environment variables.
Returns
-------
dict
Dictionary containing credentials
"""
config = {
"EDS_AUTH_URL": os.getenv("EDS_AUTH_URL"),
"EDS_SECRET": os.getenv("EDS_SECRET"),
"EDS_CLIENT_ID": os.getenv("EDS_CLIENT_ID"),
}
# Optional
if "EDS_API_URL" in os.environ:
config["EDS_API_URL"] = os.getenv("EDS_API_URL")
return config
[docs]
@classmethod
def read_credentials_from_toml(
cls, toml_path: Path = None, profile: str = None
) -> dict:
"""
Read Earth Data Store credentials from a TOML file
Parameters
----------
toml_path : Path, optional
The path to the TOML file containing the Earth Data Store credentials.
profile : profile, optional
Name of the profile to use in the TOML file
Returns
-------
dict
Dictionary containing credentials
"""
import toml
if not toml_path.exists():
raise FileNotFoundError(
f"Credentials file {toml_path} not found. Make sure the path is valid"
)
with toml_path.open() as f:
config = toml.load(f)
if profile not in config:
raise ValueError(f"Credentials profile {profile} not found in {toml_path}")
config = config[profile]
return config
[docs]
def get_access_token(self, config: Optional[EarthDataStoreConfig] = None):
"""
Retrieve an access token for interacting with the EarthDataStore API.
By default, the method will look for environment variables:
EDS_AUTH_URL, EDS_SECRET, and EDS_CLIENT_ID. Alternatively, a configuration
object or dictionary can be passed to override these values.
Parameters
----------
config : EarthDataStoreConfig, optional
A configuration object containing the Earth Data Store credentials.
Returns
-------
str
The access token for authenticating with the Earth Data Store API.
"""
if not config:
config = self._config_parser(self.__auth_config)
response = requests.post(
config.auth_url,
data={"grant_type": "client_credentials"},
allow_redirects=False,
auth=(config.client_id, config.client_secret),
)
response.raise_for_status()
return response.json()["access_token"]
def _config_parser(self, config=None):
"""
Parse and construct the EarthDataStoreConfig object from various input formats.
The method supports configuration as a dictionary, JSON file path, tuple,
or environment variables.
Parameters
----------
config : dict or str or tuple, optional
Configuration source. It can be:
- A dictionary containing the required API credentials.
- A string path to a JSON file containing these credentials.
- A tuple of (access_token, eds_url).
- None, in which case environment variables will be used.
Returns
-------
EarthDataStoreConfig
A configuration object containing the required API credentials.
Raises
------
AttributeError
If required credentials are missing in the provided input or environment variables.
"""
if isinstance(config, tuple): # token
access_token, eds_url = config
logging.log(level=logging.INFO, msg="Using token to reauth")
return EarthDataStoreConfig(eds_url=eds_url, access_token=access_token)
else:
if isinstance(config, dict):
config = config.get
elif isinstance(config, str) and config.endswith(".json"):
config = json.load(open(config, "rb")).get
if config is None:
config = os.getenv
auth_url = config("EDS_AUTH_URL")
client_secret = config("EDS_SECRET")
client_id = config("EDS_CLIENT_ID")
eds_url = config(
"EDS_API_URL", "https://api.earthdaily.com/platform/v1/stac"
)
if auth_url is None or client_secret is None or client_id is None:
raise AttributeError(
"You need to have env : EDS_AUTH_URL, EDS_SECRET and EDS_CLIENT_ID"
)
return EarthDataStoreConfig(
auth_url=auth_url,
client_secret=client_secret,
client_id=client_id,
eds_url=eds_url,
)
def _get_client(self, config=None, presign_urls=True, asset_proxy_enabled=False):
"""Get client for interacting with the EarthDataStore API.
By default, Earth Data Store will look for environment variables called
EDS_AUTH_URL, EDS_SECRET and EDS_CLIENT_ID.
Parameters
----------
config : str | dict, optional
A JSON string or a dictionary with the credentials for the Earth Data Store.
presign_urls : bool, optional
Use presigned URLs, by default True
asset_proxy_enabled : bool, optional
Use asset proxy URLs, by default False
Returns
-------
client : Client
A PySTAC client for interacting with the Earth Data Store STAC API.
"""
config = self._config_parser(config)
if config.access_token:
access_token = config.access_token
else:
access_token = self.get_access_token(config)
headers = {"Authorization": f"bearer {access_token}"}
if asset_proxy_enabled:
headers["X-Proxy-Asset-Urls"] = "True"
elif presign_urls:
headers["X-Signed-Asset-Urls"] = "True"
retry = Retry(
total=5,
backoff_factor=1,
status_forcelist=[502, 503, 504],
allowed_methods=None,
)
stac_api_io = StacApiIO(max_retries=retry)
return Client.open(config.eds_url, headers=headers, stac_io=stac_api_io)
@property
def client(self) -> Client:
"""
Create an instance of pystac client from EarthDataSTore
Returns
-------
A :class:`Client` instance for this Catalog.
"""
if t := (time.time() - self.__time_eds_log) > 3600 or self._client is None:
if t:
logging.log(level=logging.INFO, msg="Reauth to EarthDataStore")
self._client = self._get_client(
self.__auth_config, self.__presign_urls, self.__asset_proxy_enabled
)
self.__time_eds_log = time.time()
return self._client
[docs]
def explore(self, collection: str = None):
"""
Explore a collection, its properties and assets. If not collection specified,
returns the list of collections.
Parameters
----------
collection : str, optional.
Collection name. The default is None.
Returns
-------
str|StacCollectionExplorer
The list of collections, or a collection to explore using module
StacCollectionExplorer.
Example
--------
>>> eds = earthdaily.earthdatastore()
>>> collection = "venus-l2a"
>>> eds.explore(collection).item_properties
{'constellation': 'VENUS',
'created': '2023-06-14T00:14:10.167450Z',
'datetime': '2023-06-07T11:23:18.000000Z',
'description': '',
'eda:geometry_tags': ['RESOLVED_CLOCKWISE_POLYGON'],
'eda:loose_validation_status': 'VALID',
'eda:num_cols': 9106,
'eda:num_rows': 11001,
'eda:original_geometry': {'type': 'Polygon',
'coordinates': [[[-16.684545516968, 16.109294891357],
[-16.344039916992, 16.111709594727],
[-16.341398239136, 15.714001655579],
[-16.68123626709, 15.711649894714],
[-16.684545516968, 16.109294891357]]]},
'eda:product_type': 'REFLECTANCE',
'eda:sensor_type': 'OPTICAL',
'eda:source_created': '2023-06-13T18:47:27.000000Z',
'eda:source_updated': '2023-06-13T20:22:35.000000Z',
'eda:status': 'PUBLISHED',
'eda:tracking_id': 'MutZbYe54RY7eP3iuAbtKb',
'eda:unusable_cover': 0.0,
'eda:water_cover': 0.0,
'end_datetime': '2023-06-07T11:23:18.000000Z',
'eo:bands': [{'name': 'B1',
'common_name': 'coastal',
'description': 'B1',
'center_wavelength': 0.424},
{'name': 'B2',
'common_name': 'coastal',
'description': 'B2',
'center_wavelength': 0.447},
{'name': 'B3',
'common_name': 'blue',
'description': 'B3',
'center_wavelength': 0.492},
{'name': 'B4',
'common_name': 'green',
'description': 'B4',
'center_wavelength': 0.555},
{'name': 'B5',
'common_name': 'yellow',
'description': 'B5',
'center_wavelength': 0.62},
{'name': 'B6',
'common_name': 'yellow',
'description': 'B6',
'center_wavelength': 0.62},
{'name': 'B7',
'common_name': 'red',
'description': 'B7',
'center_wavelength': 0.666},
{'name': 'B8',
'common_name': 'rededge',
'description': 'B8',
'center_wavelength': 0.702},
{'name': 'B9',
'common_name': 'rededge',
'description': 'B9',
'center_wavelength': 0.741},
{'name': 'B10',
'common_name': 'rededge',
'description': 'B10',
'center_wavelength': 0.782},
{'name': 'B11',
'common_name': 'nir08',
'description': 'B11',
'center_wavelength': 0.861},
{'name': 'B12',
'common_name': 'nir09',
'description': 'B12',
'center_wavelength': 0.909}],
'eo:cloud_cover': 0.0,
'gsd': 4.0,
'instruments': ['VENUS'],
'license': 'CC-BY-NC-4.0',
'mission': 'venus',
'platform': 'VENUS',
'processing:level': 'L2A',
'proj:epsg': 32628,
'providers': [{'name': 'Theia',
'roles': ['licensor', 'producer', 'processor']},
{'url': 'https://earthdaily.com',
'name': 'EarthDaily Analytics',
'roles': ['processor', 'host']}],
'sat:absolute_orbit': 31453,
'start_datetime': '2023-06-07T11:23:18.000000Z',
'theia:location': 'STLOUIS',
'theia:product_id': 'VENUS-XS_20230607-112318-000_L2A_STLOUIS_C_V3-1',
'theia:product_version': '3.1',
'theia:publication_date': '2023-06-13T18:08:10.205000Z',
'theia:sensor_mode': 'XS',
'theia:source_uuid': 'a29bfc89-8372-5e91-841c-b11cdb40bb14',
'title': 'VENUS-XS_20230607-112318-000_L2A_STLOUIS_D',
'updated': '2023-06-14T00:42:17.898993Z',
'view:azimuth': 33.293623499999995,
'view:incidence_angle': 14.6423245,
'view:sun_azimuth': 69.8849963957,
'view:sun_elevation': 65.0159541684}
"""
if collection:
if collection not in self._staccollectionexplorer.keys():
self._staccollectionexplorer[collection] = StacCollectionExplorer(
self.client, collection
)
return self._staccollectionexplorer.get(collection)
return sorted(c.id for c in self.client.get_all_collections())
def _update_search_kwargs_for_ag_cloud_mask(
self,
search_kwargs,
collections,
key="eda:ag_cloud_mask_available",
target_param="query",
):
"""Update the STAC search kwargs to only get items that have an available agricultural cloud mask.
Args:
search_kwargs (dict): The search kwargs to be updated.
collections (str | list): The collection(s) to search.
Returns:
dict: The updated search kwargs.
"""
search_kwargs = search_kwargs.copy()
# to get only items that have a ag_cloud_mask
ag_query = {key: {"eq": True}}
# to check if field is queryable
# =============================================================================
# queryables = self.client._stac_io.request(
# self.client.get_root_link().href
# + f"/queryables?collections={collections[0] if isinstance(collections,list) else collections}"
# )
# queryables = json.loads(queryables)
# queryables = queryables["properties"]
# if "eda:ag_cloud_mask_available" not in queryables.keys():
# target_param = "post_query"
# else:
# target_param = "query"
# =============================================================================
query = search_kwargs.get("target_param", {})
query.update(ag_query)
search_kwargs[target_param] = query
return search_kwargs
[docs]
@_datacubes
def datacube(
self,
collections: str | list,
datetime=None,
assets: None | list | dict = None,
intersects: (gpd.GeoDataFrame | str | dict) = None,
bbox=None,
mask_with: (None | str | list) = None,
mask_statistics: bool | int = False,
clear_cover: (int | float) = None,
prefer_alternate: (str | bool) = "download",
search_kwargs: dict = {},
add_default_scale_factor: bool = True,
common_band_names=True,
cross_calibration_collection: (None | str) = None,
properties: (bool | str | list) = False,
groupby_date: str = "mean",
cloud_search_kwargs={},
**kwargs,
) -> xr.Dataset:
"""
Create a datacube.
Parameters
----------
collections : str | list
If several collections, the first collection will be the reference collection (for spatial resolution).
datetime: Either a single datetime or datetime range used to filter results.
You may express a single datetime using a :class:`datetime.datetime`
instance, a `RFC 3339-compliant <https://tools.ietf.org/html/rfc3339>`__
timestamp, or a simple date string (see below). Instances of
:class:`datetime.datetime` may be either
timezone aware or unaware. Timezone aware instances will be converted to
a UTC timestamp before being passed
to the endpoint. Timezone unaware instances are assumed to represent UTC
timestamps. You may represent a
datetime range using a ``"/"`` separated string as described in the
spec, or a list, tuple, or iterator
of 2 timestamps or datetime instances. For open-ended ranges, use either
``".."`` (``'2020-01-01:00:00:00Z/..'``,
``['2020-01-01:00:00:00Z', '..']``) or a value of ``None``
(``['2020-01-01:00:00:00Z', None]``).
If using a simple date string, the datetime can be specified in
``YYYY-mm-dd`` format, optionally truncating
to ``YYYY-mm`` or just ``YYYY``. Simple date strings will be expanded to
include the entire time period, for example:
- ``2017`` expands to ``2017-01-01T00:00:00Z/2017-12-31T23:59:59Z``
- ``2017-06`` expands to ``2017-06-01T00:00:00Z/2017-06-30T23:59:59Z``
- ``2017-06-10`` expands to
``2017-06-10T00:00:00Z/2017-06-10T23:59:59Z``
If used in a range, the end of the range expands to the end of that
day/month/year, for example:
- ``2017/2018`` expands to
``2017-01-01T00:00:00Z/2018-12-31T23:59:59Z``
- ``2017-06/2017-07`` expands to
``2017-06-01T00:00:00Z/2017-07-31T23:59:59Z``
- ``2017-06-10/2017-06-11`` expands to
``2017-06-10T00:00:00Z/2017-06-11T23:59:59Z``
assets : None | list | dict, optional
DESCRIPTION. The default is None.
intersects : (gpd.GeoDataFrame, str(wkt), dict(json)), optional
DESCRIPTION. The default is None.
bbox : TYPE, optional
DESCRIPTION. The default is None.
mask_with : (None, str, list), optional
"native" mask, or "ag_cloud_mask", or ["ag_cloud_mask","native"],
and so if ag_cloud_mask is not available, will switch to native.
The default is None.
mask_statistics : bool | int, optional
DESCRIPTION. The default is False.
clear_cover : (int, float), optional
Percent of clear data above a field (from 0 to 100).
The default is None.
prefer_alternate : (str, False), optional
Uses the alternate/download href instead of the default href.
The default is "download".
search_kwargs : dict, optional
DESCRIPTION. The default is {}.
add_default_scale_factor : bool, optional
DESCRIPTION. The default is True.
common_band_names : TYPE, optional
DESCRIPTION. The default is True.
cross_calibration_collection : (None | str), optional
DESCRIPTION. The default is None.
properties : (bool | str | list), optional
Retrieve properties per item. The default is False.
**kwargs : TYPE
DESCRIPTION.
: TYPE
DESCRIPTION.
Raises
------
ValueError
DESCRIPTION.
Warning
DESCRIPTION.
Returns
-------
xr_datacube : TYPE
DESCRIPTION.
"""
# Properties (per items) are not compatible with groupby_date.
if properties not in (None, False) and groupby_date is not None:
raise NotImplementedError(
"You must set `groupby_date=None` to have properties per item."
)
# convert collections to list
collections = [collections] if isinstance(collections, str) else collections
# if intersects a geometry, create a GeoDataFRame
if intersects is not None:
intersects = cube_utils.GeometryManager(intersects).to_geopandas()
self.intersects = intersects
# if mask_with, need to add assets or to get mask item id
if mask_with:
if mask_with not in mask._available_masks:
raise NotImplementedError(
f"Specified mask '{mask_with}' is not available. Available masks providers are : {mask._available_masks}"
)
elif mask_with in ["ag_cloud_mask", "agriculture-cloud-mask"]:
search_kwargs = self._update_search_kwargs_for_ag_cloud_mask(
search_kwargs, collections[0], key="eda:ag_cloud_mask_available"
)
mask_with = "ag_cloud_mask"
elif mask_with in [
"cloud_mask",
"cloudmask",
"cloud_mask_ag_version",
"cloudmask_ag_version",
]:
search_kwargs = self._update_search_kwargs_for_ag_cloud_mask(
search_kwargs,
collections[0],
key="eda:cloud_mask_available",
)
mask_with = "cloud_mask"
else:
mask_with = mask._native_mask_def_mapping.get(collections[0], None)
sensor_mask = mask._native_mask_asset_mapping.get(collections[0], None)
if isinstance(assets, list) and sensor_mask not in assets:
assets.append(sensor_mask)
elif isinstance(assets, dict):
assets[sensor_mask] = sensor_mask
bbox_query = None
if bbox is None and intersects is not None:
bbox_query = list(cube_utils.GeometryManager(intersects).to_bbox())
elif bbox is not None and intersects is None:
bbox_query = bbox
# query the items
items = self.search(
collections=collections,
bbox=bbox_query,
datetime=datetime,
assets=assets,
prefer_alternate=prefer_alternate,
add_default_scale_factor=add_default_scale_factor,
**search_kwargs,
)
xcal_items = None
if (
isinstance(cross_calibration_collection, str)
and cross_calibration_collection != collections[0]
):
try:
xcal_items = self.search(
collections="eda-cross-calibration",
intersects=intersects,
query={
"eda_cross_cal:source_collection": {"eq": collections[0]},
"eda_cross_cal:destination_collection": {
"eq": cross_calibration_collection
},
},
)
except Warning:
raise Warning(
"No cross calibration coefficient available for the specified collections."
)
# Create datacube from items
xr_datacube = datacube(
items,
intersects=intersects,
bbox=bbox,
assets=assets,
common_band_names=common_band_names,
cross_calibration_items=xcal_items,
properties=properties,
groupby_date=None,
**kwargs,
)
if intersects is not None:
xr_datacube = xr_datacube.ed.clip(intersects)
# Create mask datacube and apply it to xr_datacube
if mask_with:
if "geobox" not in kwargs:
kwargs["geobox"] = xr_datacube.odc.geobox
kwargs.pop("crs", "")
kwargs.pop("resolution", "")
kwargs["dtype"] = "int8"
if clear_cover and mask_statistics is False:
mask_statistics = True
mask_kwargs = dict(mask_statistics=False)
if mask_with == "ag_cloud_mask" or mask_with == "cloud_mask":
mask_asset_mapping = {
"ag_cloud_mask": {"agriculture-cloud-mask": "ag_cloud_mask"},
"cloud_mask": {"cloud-mask": "cloud_mask"},
}
acm_items = self.find_cloud_mask_items(
items, cloudmask=mask_with, **cloud_search_kwargs
)
acm_datacube = datacube(
acm_items,
intersects=intersects,
bbox=bbox,
groupby_date=None,
assets=mask_asset_mapping[mask_with],
**kwargs,
)
xr_datacube["time"] = xr_datacube.time.astype("M8[ns]")
if xr_datacube.time.size != acm_datacube.time.size:
xr_datacube = _select_last_common_occurrences(
xr_datacube, acm_datacube
)
acm_datacube["time"] = xr_datacube["time"].time
acm_datacube = cube_utils._match_xy_dims(acm_datacube, xr_datacube)
xr_datacube = xr.merge((xr_datacube, acm_datacube), compat="override")
# mask_kwargs.update(acm_datacube=acm_datacube)
else:
mask_assets = {
mask._native_mask_asset_mapping[
collections[0]
]: mask._native_mask_def_mapping[collections[0]]
}
clouds_datacube = datacube(
items,
groupby_date=None,
bbox=list(cube_utils.GeometryManager(intersects).to_bbox()),
assets=mask_assets,
resampling=0,
**kwargs,
)
clouds_datacube = cube_utils._match_xy_dims(
clouds_datacube, xr_datacube
)
if intersects is not None:
clouds_datacube = clouds_datacube.ed.clip(intersects)
xr_datacube = xr.merge(
(xr_datacube, clouds_datacube), compat="override"
)
Mask = mask.Mask(xr_datacube, intersects=intersects, bbox=bbox)
xr_datacube = getattr(Mask, mask_with)(**mask_kwargs)
# keep only one value per pixel per day
if groupby_date:
xr_datacube = xr_datacube.groupby("time.date", restore_coord_dims=True)
xr_datacube = getattr(xr_datacube, groupby_date)().rename(dict(date="time"))
xr_datacube["time"] = xr_datacube.time.astype("M8[ns]")
# To filter by cloud_cover / clear_cover, we need to compute clear pixels as field level
if clear_cover or mask_statistics:
xy = xr_datacube[mask_with].isel(time=0).size
null_pixels = xr_datacube[mask_with].isnull().sum(dim=("x", "y"))
n_pixels_as_labels = xy - null_pixels
xr_datacube = xr_datacube.assign_coords(
{"clear_pixels": ("time", n_pixels_as_labels.data)}
)
xr_datacube = xr_datacube.assign_coords(
{
"clear_percent": (
"time",
np.multiply(
np.divide(
xr_datacube["clear_pixels"].data,
xr_datacube.attrs["usable_pixels"],
),
100,
).astype(np.int8),
)
}
)
xr_datacube["clear_pixels"] = xr_datacube["clear_pixels"].load()
xr_datacube["clear_percent"] = xr_datacube["clear_percent"].load()
if mask_with:
xr_datacube = xr_datacube.drop(mask_with)
if clear_cover:
xr_datacube = mask.filter_clear_cover(xr_datacube, clear_cover)
return xr_datacube
def _update_search_for_assets(self, assets):
fields = {
"include": [
"id",
"type",
"collection",
"stac_version",
"stac_extensions",
"collection",
"geometry",
"bbox",
"properties",
]
}
fields["include"].extend([f"assets.{asset}" for asset in assets])
return fields
[docs]
@parallel_search
def search(
self,
collections: str | list,
intersects: gpd.GeoDataFrame = None,
bbox=None,
post_query=None,
prefer_alternate=None,
add_default_scale_factor=False,
assets=None,
raise_no_items=True,
batch_days="auto",
n_jobs=-1,
**kwargs,
):
"""
A wrapper around the pystac client search method. Add some features to enhance experience.
Parameters
----------
collections : str | list
Collection(s) to search. It is recommended to only search one collection at a time.
intersects : gpd.GeoDataFrame, optional
If provided, the results will contain only intersecting items. The default is None.
bbox : TYPE, optional
If provided, the results will contain only intersecting items. The default is None.
post_query : TYPE, optional
STAC-like filters applied on retrieved items. The default is None.
prefer_alternate : TYPE, optional
Prefer alternate links when available. The default is None.
**kwargs : TYPE
Keyword arguments passed to the pystac client search method.
Returns
-------
items_collection : ItemCollection
The filtered STAC items.
Example
-------
>>> items = eds.search(collections='sentinel-2-l2a',bbox=[1,43,1,43],datetime='2017')
>>> len(items)
27
>>> print(items[0].id)
S2A_31TCH_20170126_0_L2A
>>> print(items[0].assets.keys())
dict_keys(['aot', 'nir', 'red', 'scl', 'wvp', 'blue', 'green', 'nir08', 'nir09',
'swir16', 'swir22', 'visual', 'aot-jp2', 'coastal', 'nir-jp2',
'red-jp2', 'scl-jp2', 'wvp-jp2', 'blue-jp2', 'rededge1', 'rededge2',
'rededge3', 'green-jp2', 'nir08-jp2', 'nir09-jp2', 'thumbnail',
'swir16-jp2', 'swir22-jp2', 'visual-jp2', 'coastal-jp2',
'rededge1-jp2', 'rededge2-jp2', 'rededge3-jp2', 'granule_metadata',
'tileinfo_metadata'])
>>> print(items[0].properties)
{
"created": "2020-09-01T04:59:33.629000Z",
"updated": "2022-11-08T13:08:57.661605Z",
"platform": "sentinel-2a",
"grid:code": "MGRS-31TCH",
"proj:epsg": 32631,
"instruments": ["msi"],
"s2:sequence": "0",
"constellation": "sentinel-2",
"mgrs:utm_zone": 31,
"s2:granule_id": "S2A_OPER_MSI_L2A_TL_SHIT_20190506T054613_A008342_T31TCH_N00.01",
"eo:cloud_cover": 26.518754,
"s2:datatake_id": "GS2A_20170126T105321_008342_N00.01",
"s2:product_uri": "S2A_MSIL2A_20170126T105321_N0001_R051_T31TCH_20190506T054611.SAFE",
"s2:datastrip_id": "S2A_OPER_MSI_L2A_DS_SHIT_20190506T054613_S20170126T105612_N00.01",
"s2:product_type": "S2MSI2A",
"mgrs:grid_square": "CH",
"s2:datatake_type": "INS-NOBS",
"view:sun_azimuth": 161.807489888479,
"eda:geometry_tags": ["RESOLVED_CLOCKWISE_POLYGON"],
"mgrs:latitude_band": "T",
"s2:generation_time": "2019-05-06T05:46:11.879Z",
"view:sun_elevation": 26.561907592092602,
"earthsearch:s3_path": "s3://sentinel-cogs/sentinel-s2-l2a-cogs/31/T/CH/2017/1/S2A_31TCH_20170126_0_L2A",
"processing:software": {"sentinel2-to-stac": "0.1.0"},
"s2:water_percentage": 0.697285,
"eda:original_geometry": {
"type": "Polygon",
"coordinates": [
[
[0.5332306381710475, 43.32623760511659],
[1.887065663431107, 43.347431265475954],
[1.9046784554725638, 42.35884880571144],
[0.5722310999779479, 42.3383710796791],
[0.5332306381710475, 43.32623760511659],
]
],
},
"earthsearch:payload_id": "roda-sentinel2/workflow-sentinel2-to-stac/80f56ba6349cf8e21c1424491f1589c2",
"s2:processing_baseline": "00.01",
"s2:snow_ice_percentage": 23.041981,
"s2:vegetation_percentage": 15.52531,
"s2:thin_cirrus_percentage": 0.563798,
"s2:cloud_shadow_percentage": 4.039595,
"s2:nodata_pixel_percentage": 0.000723,
"s2:unclassified_percentage": 9.891956,
"s2:dark_features_percentage": 15.112966,
"s2:not_vegetated_percentage": 5.172154,
"earthsearch:boa_offset_applied": False,
"s2:degraded_msi_data_percentage": 0.0,
"s2:high_proba_clouds_percentage": 18.044451,
"s2:reflectance_conversion_factor": 1.03230935243016,
"s2:medium_proba_clouds_percentage": 7.910506,
"s2:saturated_defective_pixel_percentage": 0.0,
"eda:tracking_id": "eZbRVxsbEGdWLKXDK2i9Ve",
"eda:status": "PUBLISHED",
"datetime": "2017-01-26T10:56:12.238000Z",
"eda:loose_validation_status": "VALID",
"eda:ag_cloud_mask_available": False,
}
"""
# Find available assets for a collection
# And query only these assets to avoid requesting unused data
if isinstance(collections, str):
collections = [collections]
if assets is not None:
assets = list(
asset_mapper.AssetMapper()
.map_collection_assets(collections[0], assets)
.keys()
)
kwargs["fields"] = self._update_search_for_assets(assets)
if bbox is None and intersects is not None:
intersects = cube_utils.GeometryManager(intersects).to_intersects(
crs="4326"
)
if bbox is not None and intersects is not None:
bbox = None
items_collection = self.client.search(
collections=collections,
bbox=bbox,
intersects=intersects,
sortby="properties.datetime",
**kwargs,
)
# Downloading the items
items_collection = items_collection.item_collection()
# prefer_alternate means to prefer alternate url (to replace default href)
if any((prefer_alternate, add_default_scale_factor)):
items_collection = enhance_assets(
items_collection.clone(),
alternate=prefer_alternate,
add_default_scale_factor=add_default_scale_factor,
)
if post_query:
items_collection = post_query_items(items_collection, post_query)
if len(items_collection) == 0 and raise_no_items:
raise NoItemsFoundError("No items found.")
return items_collection
[docs]
def find_cloud_mask_items(
self, items_collection, cloudmask="ag_cloud_mask", **kwargs
):
"""
Search the catalog for the ag_cloud_mask items matching the given items_collection.
The ag_cloud_mask items are searched in the `ag_cloud_mask_collection_id` collection using the
`ag_cloud_mask_item_id` properties of the items.
Parameters
----------
items_collection : pystac.ItemCollection
The items to find corresponding ag cloud mask items for.
Returns
-------
pystac.ItemCollection
The filtered item collection.
"""
def ag_cloud_mask_from_items(items):
products = {}
for item in items:
if not item.properties.get(f"eda:{cloudmask}_available"):
continue
collection = item.properties[f"eda:{cloudmask}_collection_id"]
if products.get(collection, None) is None:
products[collection] = []
products[collection].append(
item.properties.get(f"eda:{cloudmask}_item_id")
)
return products
items_id = ag_cloud_mask_from_items(items_collection)
if len(items_id) == 0:
raise ValueError("Sorry, no ag_cloud_mask available.")
collections = list(items_id.keys())
ids_ = [x for n in (items_id.values()) for x in n]
items_list = []
step = 100
kwargs.setdefault("prefer_alternate", "download")
for items_start_idx in range(0, len(ids_), step):
items = self.search(
collections=collections,
ids=ids_[items_start_idx : items_start_idx + step],
limit=step,
**kwargs,
)
items_list.extend(list(items))
return ItemCollection(items_list)