Utils

AgriquestBlocks

Bases: Enum

Available AgriQuest Block codes

Source code in geosyspy/utils/constants.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class AgriquestBlocks(Enum):
    """
    Available AgriQuest Block codes
    """
    FIRST_LEVEL = 129
    AMU_AUSTRALIA_LEVEL_1 = 205
    AMU_AUSTRALIA_LEVEL_2 = 206
    AMU_CHINA = 202
    AMU_EUROPE_RUSSIA = 197
    AMU_INDIA = 204
    AMU_MEXICO = 212
    AMU_NORTH_AMERICA = 207
    AMU_SOUTH_AFRICA = 213
    BM_REGIONS = 139
    CAR = 140
    COUNTY = 141
    FRA_CANTONS = 216
    FRA_COMMUNES = 135
    FRA_DEPARTEMENTS = 226
    MESOREGION = 131
    NORTH_AFRICA_AMU = 125
    RAION = 127
    SERBIA = 132
    SOUTH_AMERICA_MUNICIPIOS_2020 = 267
    SOUTH_AMERICA_AMU = 115
    SPAIN_COMARCAS = 136
    US_ASD = 130
    WESTERN_AFRICA_AMU = 122

AgriquestCommodityCode

Bases: Enum

Available AgriQuest Commodity values

Source code in geosyspy/utils/constants.py
54
55
56
57
58
59
class AgriquestCommodityCode(Enum):
    """
    Available AgriQuest Commodity values
    """
    ALL_VEGETATION = 33
    ALL_CROPS = 35

AgriquestFranceBlockCode

Bases: Enum

Available AgriQuest Block codes dedicated to France

Source code in geosyspy/utils/constants.py
62
63
64
65
66
67
68
class AgriquestFranceBlockCode(Enum):
    """
    Available AgriQuest Block codes dedicated to France
    """
    FRA_CANTONS = 216
    FRA_COMMUNES = 135
    FRA_DEPARTEMENTS = 226

AgriquestWeatherType

Bases: Enum

Available AgriQuest Weather types

Source code in geosyspy/utils/constants.py
101
102
103
104
105
106
107
108
109
110
111
112
113
class AgriquestWeatherType(Enum):
    """
    Available AgriQuest Weather types
    """
    CUMULATIVE_PRECIPITATION = "cumulative-precipitation"
    MIN_TEMPERATURE = "min-temperature"
    AVERAGE_TEMPERATURE = "average-temperature"
    MAX_TEMPERATURE = "max-temperature"
    MAX_WIND_SPEED = "max-wind-speed"
    RELATIVE_HUMIDITY = "relative-humidity"
    SNOW_DEPTH = "snow-depth"
    SOIL_MOISTURE = "soil-moisture"
    SOLAR_RADIATION = "solar-radiation"

CropIdSeason

Bases: Enum

Available season values for analytics processor Zarc

Source code in geosyspy/utils/constants.py
124
125
126
127
128
129
class CropIdSeason(Enum):
    """
    Available season values  for analytics processor Zarc
    """
    SEASON_1="SEASON_1"
    SEASON_2="SEASON_2"

Emergence

Bases: Enum

Type of Emergence query used for Emergence analytics processor

Source code in geosyspy/utils/constants.py
45
46
47
48
49
50
51
class Emergence(Enum):
    """
    Type of Emergence query used for Emergence analytics processor
    """
    EMERGENCE_IN_SEASON = "IN_SEASON"
    EMERGENCE_HISTORICAL = "HISTORICAL"
    EMERGENCE_DELAY = "DELAY"

Env

Bases: Enum

Environment to target (PROD, PREPROD)

Source code in geosyspy/utils/constants.py
23
24
25
26
27
28
class Env(Enum):
    """
    Environment to target (PROD, PREPROD)
    """
    PROD = "prod"
    PREPROD = "preprod"

GeosysApiEndpoints

Bases: Enum

Available Geosys APIs Endpoints

Source code in geosyspy/utils/constants.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class GeosysApiEndpoints(Enum):
    """
    Available Geosys APIs Endpoints
    """
    MASTER_DATA_MANAGEMENT_ENDPOINT = "master-data-management/v6"
    VTS_ENDPOINT = "vegetation-time-series/v1/season-fields"
    VTS_BY_PIXEL_ENDPOINT = "vegetation-time-series/v1/season-fields/pixels"
    FLM_CATALOG_IMAGERY = "field-level-maps/v4/season-fields/{}/catalog-imagery"
    FLM_COVERAGE = "field-level-maps/v4/season-fields/{}/coverage"
    WEATHER_ENDPOINT = "Weather/v1/weather"
    ANALYTICS_FABRIC_ENDPOINT = "analytics/metrics"
    ANALYTICS_FABRIC_LATEST_ENDPOINT = "analytics/metrics-latest"
    ANALYTICS_FABRIC_SCHEMA_ENDPOINT = "analytics/schemas"
    AGRIQUEST_ENDPOINT = "agriquest/Geosys.Agriquest.CropMonitoring.WebApi/v0/api"
    # Analytics processor
    PROCESSOR_EVENTS_ENDPOINT = "analytics-pipeline/v1/processors/events"
    LAUNCH_PROCESSOR_ENDPOINT = "analytics-pipeline/v1/processors/{}/launch"

Harvest

Bases: Enum

Type of Harvest query used for Harvest analytics processor

Source code in geosyspy/utils/constants.py
38
39
40
41
42
43
class Harvest(Enum):
    """
    Type of Harvest query used for Harvest analytics processor
    """
    HARVEST_IN_SEASON = "IN_SEASON"
    HARVEST_HISTORICAL = "HISTORICAL"

Region

Bases: Enum

Region to target (NA)

Source code in geosyspy/utils/constants.py
31
32
33
34
35
class Region(Enum):
    """
    Region to target (NA)
    """
    NA = "na"

SatelliteImageryCollection

Bases: Enum

Available imagery collections

Source code in geosyspy/utils/constants.py
 4
 5
 6
 7
 8
 9
10
11
class SatelliteImageryCollection(Enum):
    """
    Available imagery collections
    """
    MODIS = "MODIS"
    SENTINEL_2 = "SENTINEL_2"
    LANDSAT_8 = "LANDSAT_8"
    LANDSAT_9 = "LANDSAT_9"

WeatherTypeCollection

Bases: Enum

Available weather collections

Source code in geosyspy/utils/constants.py
14
15
16
17
18
19
20
class WeatherTypeCollection(Enum):
    """
    Available weather collections
    """
    WEATHER_FORECAST_DAILY = "FORECAST_DAILY"
    WEATHER_FORECAST_HOURLY = "FORECAST_HOURLY"
    WEATHER_HISTORICAL_DAILY = "HISTORICAL_DAILY"

ZarcCycleType

Bases: Enum

Available season values for analytics processor Zarc

Source code in geosyspy/utils/constants.py
132
133
134
135
136
137
138
139
class ZarcCycleType(Enum):
    """
    Available season values  for analytics processor Zarc
    """
    CYCLE_TYPE_1 = "1"
    CYCLE_TYPE_2 = "2"
    CYCLE_TYPE_3 = "3"
    NONE = None

ZarcSoilType

Bases: Enum

Available Soil Type values for analytics processor Zarc

Source code in geosyspy/utils/constants.py
115
116
117
118
119
120
121
122
class ZarcSoilType(Enum):
    """
    Available Soil Type values for analytics processor Zarc
    """
    SOIL_TYPE_1 = "1"
    SOIL_TYPE_2 = "2"
    SOIL_TYPE_3 = "3"
    NONE = None

Helper

Source code in geosyspy/utils/helper.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Helper:

    @staticmethod
    def get_matched_str_from_pattern(pattern: str,
                                     text: str) -> str:
        """Returns the first occurence of the matched pattern in text.

        Args:
            pattern : A string representing the regex pattern to look for.
            text : The text to look into.

        Returns:
            A string representing the first occurence in text of the pattern.

        """
        p = re.compile(pattern)
        return p.findall(text)[0]

    @staticmethod
    def convert_to_wkt(geometry):
        """ convert a geometry (WKT or geoJson) to WKT
        Args:
            geometry : A string representing the geometry (WKT or geoJson)

        Returns:
            a valid WKT

        """

        try:
            # check if the geometry is a valid WKT
            if Helper.is_valid_wkt(geometry):
                # return the wkt
                return geometry
        except:
            try:
                # check if the geometry is a valid geoJson
                geojson_data = json.loads(geometry)
                geom = shape(geojson_data)
                geometry = geom.wkt

                return geometry

            except ValueError:
                # geometry is not a valid geoJson
                return None

    @staticmethod
    def is_valid_wkt(geometry):
        """ check if the geometry is a valid WKT
        Args:
            geometry : A string representing the geometry

        Returns:
            boolean (True/False)

        """
        try:
            wkt.loads(geometry)
            return True
        except ValueError:
            return False

convert_to_wkt(geometry) staticmethod

convert a geometry (WKT or geoJson) to WKT Args: geometry : A string representing the geometry (WKT or geoJson)

Returns:
  • a valid WKT

Source code in geosyspy/utils/helper.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@staticmethod
def convert_to_wkt(geometry):
    """ convert a geometry (WKT or geoJson) to WKT
    Args:
        geometry : A string representing the geometry (WKT or geoJson)

    Returns:
        a valid WKT

    """

    try:
        # check if the geometry is a valid WKT
        if Helper.is_valid_wkt(geometry):
            # return the wkt
            return geometry
    except:
        try:
            # check if the geometry is a valid geoJson
            geojson_data = json.loads(geometry)
            geom = shape(geojson_data)
            geometry = geom.wkt

            return geometry

        except ValueError:
            # geometry is not a valid geoJson
            return None

get_matched_str_from_pattern(pattern, text) staticmethod

Returns the first occurence of the matched pattern in text.

Parameters:
  • pattern

    A string representing the regex pattern to look for.

  • text

    The text to look into.

Returns:
  • str

    A string representing the first occurence in text of the pattern.

Source code in geosyspy/utils/helper.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@staticmethod
def get_matched_str_from_pattern(pattern: str,
                                 text: str) -> str:
    """Returns the first occurence of the matched pattern in text.

    Args:
        pattern : A string representing the regex pattern to look for.
        text : The text to look into.

    Returns:
        A string representing the first occurence in text of the pattern.

    """
    p = re.compile(pattern)
    return p.findall(text)[0]

is_valid_wkt(geometry) staticmethod

check if the geometry is a valid WKT Args: geometry : A string representing the geometry

Returns:
  • boolean (True/False)

Source code in geosyspy/utils/helper.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@staticmethod
def is_valid_wkt(geometry):
    """ check if the geometry is a valid WKT
    Args:
        geometry : A string representing the geometry

    Returns:
        boolean (True/False)

    """
    try:
        wkt.loads(geometry)
        return True
    except ValueError:
        return False

HttpClient

Source code in geosyspy/utils/http_client.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class HttpClient:

    def __init__(
            self,
            client_id: str,
            client_secret: str,
            username: str,
            password: str,
            enum_env: str,
            enum_region: str,
            bearer_token: str = None
    ):
        self.__client_oauth = oauth2_client.Oauth2Api(
            client_id=client_id,
            client_secret=client_secret,
            password=password,
            username=username,
            enum_env=enum_env,
            enum_region=enum_region,
            bearer_token=bearer_token
        )
        self.access_token = self.__client_oauth.token

        self.__client = OAuth2Session(self.__client_oauth.client_id, token=self.__client_oauth.token)

    @renew_access_token
    def get(self, url_endpoint: str, headers={}):
        """Gets the url_endpopint.

        Args:
            url_endpoint : A string representing the url to get.

        Returns:
            A response object.
        """
        return self.__client.get(url_endpoint, headers=headers)

    @renew_access_token
    def post(self, url_endpoint: str, payload: dict, headers={}):
        """Posts payload to the url_endpoint.

        Args:
            url_endpoint : A string representing the url to post paylaod to.
            payload : A python dict representing the payload.

        Returns:
            A response object.
        """
        return self.__client.post(url_endpoint, json=payload, headers=headers)

    @renew_access_token
    def patch(self, url_endpoint: str, payload: dict):
        """Patchs payload to the url_endpoint.

        Args:
            url_endpoint : A string representing the url to patch paylaod to.
            payload : A python dict representing the payload.

        Returns:
            A response object.
        """
        return self.__client.patch(url_endpoint, json=payload)

    def get_access_token(self):
        return self.access_token

get(url_endpoint, headers={})

Gets the url_endpopint.

Parameters:
  • url_endpoint

    A string representing the url to get.

Returns:
  • A response object.

Source code in geosyspy/utils/http_client.py
50
51
52
53
54
55
56
57
58
59
60
@renew_access_token
def get(self, url_endpoint: str, headers={}):
    """Gets the url_endpopint.

    Args:
        url_endpoint : A string representing the url to get.

    Returns:
        A response object.
    """
    return self.__client.get(url_endpoint, headers=headers)

patch(url_endpoint, payload)

Patchs payload to the url_endpoint.

Parameters:
  • url_endpoint

    A string representing the url to patch paylaod to.

  • payload

    A python dict representing the payload.

Returns:
  • A response object.

Source code in geosyspy/utils/http_client.py
75
76
77
78
79
80
81
82
83
84
85
86
@renew_access_token
def patch(self, url_endpoint: str, payload: dict):
    """Patchs payload to the url_endpoint.

    Args:
        url_endpoint : A string representing the url to patch paylaod to.
        payload : A python dict representing the payload.

    Returns:
        A response object.
    """
    return self.__client.patch(url_endpoint, json=payload)

post(url_endpoint, payload, headers={})

Posts payload to the url_endpoint.

Parameters:
  • url_endpoint

    A string representing the url to post paylaod to.

  • payload

    A python dict representing the payload.

Returns:
  • A response object.

Source code in geosyspy/utils/http_client.py
62
63
64
65
66
67
68
69
70
71
72
73
@renew_access_token
def post(self, url_endpoint: str, payload: dict, headers={}):
    """Posts payload to the url_endpoint.

    Args:
        url_endpoint : A string representing the url to post paylaod to.
        payload : A python dict representing the payload.

    Returns:
        A response object.
    """
    return self.__client.post(url_endpoint, json=payload, headers=headers)

renew_access_token(func)

Decorator used to wrap the Geosys class's http methods.

This decorator wraps the geosys http methods (get,post...) and checks whether the used token is still valid or not. If not, it fetches a new token and uses it to make another request.

Source code in geosyspy/utils/http_client.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def renew_access_token(func):
    """Decorator used to wrap the Geosys class's http methods.

    This decorator wraps the geosys http methods (get,post...) and checks
    whether the used token is still valid or not. If not, it fetches a new token and
    uses it to make another request.

    """

    def wrapper(self, *args, **kwargs):
        try:
            return func(self, *args, **kwargs)
        except TokenExpiredError:
            self.access_token = self.__client_oauth.get_refresh_token()
            return func(self, *args, **kwargs)

    return wrapper

Oauth2Api

Source code in geosyspy/utils/oauth2_client.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Oauth2Api:
    def __init__(
            self,
            client_id: str,
            client_secret: str,
            username: str,
            password: str,
            enum_env: str,
            enum_region: str,
            bearer_token: str = None
    ):
        """Initializes a Geosys instance with the required credentials
        to connect to the GEOSYS API.
        """
        self.logger = logging.getLogger(__name__)
        self.client_id = client_id
        self.server_url = geosys_platform_urls.IDENTITY_URLS[enum_region][enum_env]
        self.client_secret = client_secret
        self.token = None
        self.username = username
        self.password = password

        if bearer_token:
            self.token = {"access_token": bearer_token}
        else:
            self.__authenticate()

    def __authenticate(self):
        """Authenticates the http_client to the API.

        This method connects the user to the API which generates a token that
        will be valid for one hour. A refresh token is also generated, which
        makes it possible for the http methods wrappers to get a new token
        once the previous one is no more valid through the renew_access_token
        decorator. This method is only run once when a Geosys object is instantiated.

        """

        try:
            oauth = OAuth2Session(
                client=LegacyApplicationClient(client_id=self.client_id)
            )
            self.token = oauth.fetch_token(
                token_url=self.server_url,
                username=self.username,
                password=self.password,
                client_id=self.client_id,
                client_secret=self.client_secret,
            )
            self.token["refresh_token"] = oauth.cookies["refresh_token"]
            self.logger.info("Authenticated")
        except Exception as e:
            logging.error(e)

    def get_refresh_token(self):
        """Fetches a new token."""
        client = OAuth2Session(self.client_id, token=self.token)
        return client.refresh_token(
            self.server_url,
            client_id=self.client_id,
            client_secret=self.client_secret,
        )

__authenticate()

Authenticates the http_client to the API.

This method connects the user to the API which generates a token that will be valid for one hour. A refresh token is also generated, which makes it possible for the http methods wrappers to get a new token once the previous one is no more valid through the renew_access_token decorator. This method is only run once when a Geosys object is instantiated.

Source code in geosyspy/utils/oauth2_client.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __authenticate(self):
    """Authenticates the http_client to the API.

    This method connects the user to the API which generates a token that
    will be valid for one hour. A refresh token is also generated, which
    makes it possible for the http methods wrappers to get a new token
    once the previous one is no more valid through the renew_access_token
    decorator. This method is only run once when a Geosys object is instantiated.

    """

    try:
        oauth = OAuth2Session(
            client=LegacyApplicationClient(client_id=self.client_id)
        )
        self.token = oauth.fetch_token(
            token_url=self.server_url,
            username=self.username,
            password=self.password,
            client_id=self.client_id,
            client_secret=self.client_secret,
        )
        self.token["refresh_token"] = oauth.cookies["refresh_token"]
        self.logger.info("Authenticated")
    except Exception as e:
        logging.error(e)

__init__(client_id, client_secret, username, password, enum_env, enum_region, bearer_token=None)

Initializes a Geosys instance with the required credentials to connect to the GEOSYS API.

Source code in geosyspy/utils/oauth2_client.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(
        self,
        client_id: str,
        client_secret: str,
        username: str,
        password: str,
        enum_env: str,
        enum_region: str,
        bearer_token: str = None
):
    """Initializes a Geosys instance with the required credentials
    to connect to the GEOSYS API.
    """
    self.logger = logging.getLogger(__name__)
    self.client_id = client_id
    self.server_url = geosys_platform_urls.IDENTITY_URLS[enum_region][enum_env]
    self.client_secret = client_secret
    self.token = None
    self.username = username
    self.password = password

    if bearer_token:
        self.token = {"access_token": bearer_token}
    else:
        self.__authenticate()

get_refresh_token()

Fetches a new token.

Source code in geosyspy/utils/oauth2_client.py
61
62
63
64
65
66
67
68
def get_refresh_token(self):
    """Fetches a new token."""
    client = OAuth2Session(self.client_id, token=self.token)
    return client.refresh_token(
        self.server_url,
        client_id=self.client_id,
        client_secret=self.client_secret,
    )