From 2b0eb63ee34ad5f29973c02fdb2232bd553feea0 Mon Sep 17 00:00:00 2001 From: "shenwen.yin" Date: Fri, 8 May 2026 19:48:41 +0800 Subject: [PATCH] poc: support multiple ip - add active_passive_client --- cloudtower/__init__.py | 2 + cloudtower/active_passive_client.py | 507 ++++++++++++++++++++++++++++ cloudtower/api_client.py | 64 +++- cloudtower/rest.py | 48 ++- cloudtower/utils.py | 9 +- test/test_active_passive_client.py | 438 ++++++++++++++++++++++++ 6 files changed, 1047 insertions(+), 21 deletions(-) create mode 100644 cloudtower/active_passive_client.py create mode 100644 test/test_active_passive_client.py diff --git a/cloudtower/__init__.py b/cloudtower/__init__.py index 38f5efef..ac2d3113 100644 --- a/cloudtower/__init__.py +++ b/cloudtower/__init__.py @@ -139,6 +139,8 @@ # import ApiClient from cloudtower.api_client import ApiClient +from cloudtower.active_passive_client import ActivePassiveApiClient +from cloudtower.active_passive_client import FailoverStrategy from cloudtower.configuration import Configuration from cloudtower.exceptions import OpenApiException from cloudtower.exceptions import ApiTypeError diff --git a/cloudtower/active_passive_client.py b/cloudtower/active_passive_client.py new file mode 100644 index 00000000..584199d1 --- /dev/null +++ b/cloudtower/active_passive_client.py @@ -0,0 +1,507 @@ +# coding: utf-8 +from __future__ import absolute_import + +import json +import logging +import threading + +import urllib3 + +from cloudtower.api_client import ApiClient +from cloudtower.configuration import Configuration +from cloudtower.exceptions import ApiException +from cloudtower import utils + +logger = logging.getLogger(__name__) + +DEFAULT_BASE_PATH = "/v2/api" +DEFAULT_SCHEME = "http" +DEFAULT_PROBE_TIMEOUT = 5 +PROBE_PATH = "/api/healthz" +AUTH_API_PATH = "/api" +HOST_STATE_ACTIVE = "active" +HOST_STATE_PASSIVE = "passive" +HTTP_STATUS_OK = 200 +HTTP_STATUS_TEMPORARY_REDIRECT = 307 +REQUEST_ATTEMPTS = 2 + + +class ActivePassiveException(ApiException): + """Base exception for active-passive client errors.""" + default_reason = "active-passive client error" + + def __init__(self, status=None, reason=None, http_resp=None, host=None, + endpoints=None, active_hosts=None, failures=None, + strategy=None): + if reason is None: + reason = self.default_reason + super(ActivePassiveException, self).__init__( + status=status, + reason=reason, + http_resp=http_resp, + ) + self.host = host + self.endpoints = list(endpoints) if endpoints is not None else None + self.active_hosts = ( + list(active_hosts) if active_hosts is not None else None + ) + self.failures = list(failures) if failures is not None else None + self.strategy = strategy + + +class ActivePassiveNoEndpoints(ActivePassiveException): + default_reason = "active-passive client requires at least one endpoint" + + +class ActivePassiveDuplicateHost(ActivePassiveException): + default_reason = "active-passive client endpoints must be unique" + + +class ActivePassiveNoActiveHost(ActivePassiveException): + default_reason = "active-passive discover found no active host" + + +class ActivePassiveMultipleActives(ActivePassiveException): + default_reason = "active-passive discover found multiple active hosts" + + +class ActivePassiveRetryExhausted(ActivePassiveException): + default_reason = "active-passive request retry exhausted after discover" + + +class ActivePassiveFailoverRequired(ActivePassiveException): + default_reason = "active-passive failover required" + + +class ActivePassiveUnknownHost(ActivePassiveException): + default_reason = "active-passive host is not configured" + + +class FailoverStrategy(object): + """Failover strategy for active-passive client.""" + DEFAULT = "default" + MANUAL_FAILOVER = "manual_failover" + ALWAYS_PROBE = "always_probe" + + +class ActivePassiveApiClient(ApiClient): + """API client with active-passive endpoint support. + + Automatically discovers the active endpoint among a list of candidates, + caches it, and failovers on 307 Temporary Redirect responses. + """ + + def __init__(self, endpoints=None, base_path=DEFAULT_BASE_PATH, schemes=None, + user_config=None, probe_timeout=None, failover_strategy=None, + configuration=None, header_name=None, header_value=None, + cookie=None, pool_threads=1): + """Initialize an active-passive API client. + + :param endpoints: List of host strings (e.g., ["172.21.152.75"]). + :param base_path: Base path for API requests (default "/v2/api"). + :param schemes: List of URL schemes (default ["http"]). + :param user_config: Optional dict with keys ``name``, ``password``, + ``source`` for automatic login. + :param probe_timeout: Timeout in seconds for healthz probes + (default 5). + :param failover_strategy: One of :class:`FailoverStrategy` constants. + :param configuration: ``Configuration`` instance (optional). + :param header_name: Default header name. + :param header_value: Default header value. + :param cookie: Default cookie. + :param pool_threads: Thread-pool size for async requests. + """ + if endpoints is None or len(endpoints) == 0: + raise ActivePassiveNoEndpoints( + status=0, + endpoints=endpoints, + ) + if schemes is None: + schemes = [DEFAULT_SCHEME] + + seen = set() + self._ordered_hosts = [] + for ep in endpoints: + host = ep.strip() + if not host: + raise ActivePassiveNoEndpoints( + status=0, + host=host, + endpoints=endpoints, + ) + if host in seen: + raise ActivePassiveDuplicateHost( + status=0, + reason=( + "active-passive client endpoints must be unique: {}" + .format(host) + ), + host=host, + endpoints=endpoints, + ) + seen.add(host) + self._ordered_hosts.append(host) + + self._base_path = base_path + self._schemes = schemes + self._probe_timeout = ( + probe_timeout if probe_timeout is not None else DEFAULT_PROBE_TIMEOUT + ) + self.failover_strategy = ( + failover_strategy + if failover_strategy is not None + else FailoverStrategy.DEFAULT + ) + + self._lock = threading.RLock() + self._current_active_host = None + self._current_active_host_url = None + self._discover_event = None + self._discover_err = None + self._local = threading.local() + + if configuration is None: + configuration = Configuration.get_default_copy() + + # Set an initial host so that RESTClientObject can be created; + # the real active host is discovered on the first request. + configuration.host = self._build_base_url(self._ordered_hosts[0]) + + super(ActivePassiveApiClient, self).__init__( + configuration=configuration, + header_name=header_name, + header_value=header_value, + cookie=cookie, + pool_threads=pool_threads, + ) + + if user_config is not None: + self._login_with_user_config(user_config) + + # ------------------------------------------------------------------ # + # Public helpers + # ------------------------------------------------------------------ # + @property + def current_active_host(self): + """Return the currently cached active host (host only).""" + with self._lock: + return self._current_active_host + + # ------------------------------------------------------------------ # + # URL builders + # ------------------------------------------------------------------ # + @staticmethod + def _build_url(scheme, host, path): + return "{}://{}{}".format(scheme, host, path) + + def _build_base_url(self, host): + scheme = self._schemes[0] if self._schemes else DEFAULT_SCHEME + return self._build_url(scheme, host, self._base_path) + + def _build_probe_url(self, host): + scheme = self._schemes[0] if self._schemes else DEFAULT_SCHEME + return self._build_url(scheme, host, PROBE_PATH) + + # ------------------------------------------------------------------ # + # Active-host lifecycle + # ------------------------------------------------------------------ # + def _clear_active_host(self): + with self._lock: + self._current_active_host = None + self._current_active_host_url = None + + def _ensure_active_host(self): + while True: + with self._lock: + if self.failover_strategy == FailoverStrategy.ALWAYS_PROBE: + self._current_active_host = None + self._current_active_host_url = None + + if self._current_active_host_url is not None: + return self._current_active_host_url + + if self._discover_event is not None: + event = self._discover_event + need_discover = False + else: + event = threading.Event() + self._discover_event = event + need_discover = True + + if not need_discover: + event.wait() + with self._lock: + if self._current_active_host_url is not None: + return self._current_active_host_url + if self._discover_err is not None: + raise self._discover_err + # Another thread discovered nothing; retry. + continue + + try: + host = self._discover() + with self._lock: + self._current_active_host = host + self._current_active_host_url = self._build_base_url(host) + self._discover_err = None + return self._current_active_host_url + except Exception as e: + with self._lock: + self._discover_err = e + raise + finally: + event.set() + with self._lock: + self._discover_event = None + + def _discover(self): + active_hosts = [] + failures = [] + + for host in self._ordered_hosts: + try: + state = self._probe_host(host) + if state == HOST_STATE_ACTIVE: + active_hosts.append(host) + except Exception as e: + failures.append("{}: {}".format(host, str(e))) + + if len(active_hosts) == 1: + logger.debug( + "active-passive discover found active host: {}".format( + active_hosts[0] + ), + ) + return active_hosts[0] + elif len(active_hosts) == 0: + if len(failures) == 0: + raise ActivePassiveNoActiveHost( + status=0, + endpoints=self._ordered_hosts, + ) + raise ActivePassiveNoActiveHost( + status=0, + reason=( + "active-passive discover found no active host: {}" + .format("; ".join(failures)) + ), + endpoints=self._ordered_hosts, + failures=failures, + ) + else: + raise ActivePassiveMultipleActives( + status=0, + reason=( + "active-passive discover found multiple active hosts: {}" + .format( + ", ".join( + self._build_base_url(h) for h in active_hosts + ) + ) + ), + endpoints=self._ordered_hosts, + active_hosts=active_hosts, + ) + + def _probe_host(self, host): + url = self._build_probe_url(host) + is_active = self._do_probe(url, timeout=self._probe_timeout) + return HOST_STATE_ACTIVE if is_active else HOST_STATE_PASSIVE + + # ------------------------------------------------------------------ # + # Authentication / login + # ------------------------------------------------------------------ # + def _login_with_user_config(self, user_config): + active_host = self._ensure_active_host() + self.configuration.host = active_host + + if isinstance(user_config, dict): + username = user_config.get("name") + password = user_config.get("password") + source = user_config.get("source") + else: + username = getattr(user_config, "name", None) + password = getattr(user_config, "password", None) + source = getattr(user_config, "source", None) + + utils.login(self, username, password, source) + + def _get_auth_config_id(self, active_host_base_url): + """Query authn strategies and return the LDAP config id if present.""" + # active_host_base_url is e.g. http://host/v2/api + # We need to post to http://host/api + scheme = self._schemes[0] if self._schemes else DEFAULT_SCHEME + host = None + for h in self._ordered_hosts: + if active_host_base_url == self._build_base_url(h): + host = h + break + if host is None: + return None + + url = self._build_url(scheme, host, AUTH_API_PATH) + body = json.dumps( + { + "operationName": None, + "variables": {}, + "query": "{authnStrategies{id type}}", + } + ) + try: + r = self.rest_client.pool_manager.request( + "POST", + url, + body=body, + headers={"Content-Type": "application/json"}, + redirect=False, + timeout=urllib3.Timeout(total=self._probe_timeout), + ) + if r.status != HTTP_STATUS_OK: + return None + data = json.loads(r.data.decode("utf-8")) + for strategy in data.get("data", {}).get("authnStrategies", []): + if strategy.get("type") == "LDAP": + return strategy.get("id") + except Exception: + pass + return None + + # ------------------------------------------------------------------ # + # ApiClient overrides + # ------------------------------------------------------------------ # + def request(self, method, url, query_params=None, headers=None, + post_params=None, body=None, _preload_content=True, + _request_timeout=None, redirect=None): + """Override to route requests to the current active host.""" + if getattr(self._local, "bypass_active_passive", False): + return super(ActivePassiveApiClient, self).request( + method, url, + query_params=query_params, + headers=headers, + post_params=post_params, + body=body, + _preload_content=_preload_content, + _request_timeout=_request_timeout, + redirect=False, + ) + + active_host = self._ensure_active_host() + if not url.startswith(active_host): + for host in self._ordered_hosts: + base_url = self._build_base_url(host) + if url.startswith(base_url): + url = active_host + url[len(base_url):] + break + return super(ActivePassiveApiClient, self).request( + method, url, + query_params=query_params, + headers=headers, + post_params=post_params, + body=body, + _preload_content=_preload_content, + _request_timeout=_request_timeout, + redirect=False, + ) + + def call_api(self, resource_path, method, + path_params=None, query_params=None, header_params=None, + body=None, post_params=None, files=None, + response_types_map=None, auth_settings=None, + async_req=None, _return_http_data_only=None, + collection_formats=None, _preload_content=True, + _request_timeout=None, _host=None, _request_auth=None): + """Override to inject active-passive host selection and failover.""" + if async_req: + return self.pool.apply_async( + self._call_api_with_failover, + ( + resource_path, method, + path_params, query_params, header_params, + body, post_params, files, + response_types_map, auth_settings, + _return_http_data_only, collection_formats, + _preload_content, _request_timeout, _host, _request_auth, + ), + ) + return self._call_api_with_failover( + resource_path, method, + path_params, query_params, header_params, + body, post_params, files, + response_types_map, auth_settings, + _return_http_data_only, collection_formats, + _preload_content, _request_timeout, _host, _request_auth, + ) + + def _call_api_with_failover(self, resource_path, method, + path_params=None, query_params=None, + header_params=None, body=None, + post_params=None, files=None, + response_types_map=None, auth_settings=None, + _return_http_data_only=None, + collection_formats=None, + _preload_content=True, _request_timeout=None, + _host=None, _request_auth=None): + """Internal wrapper that handles active-host discovery and 307 retry.""" + # When the caller explicitly supplies _host, bypass active-passive logic. + if _host is not None: + self._local.bypass_active_passive = True + try: + return super(ActivePassiveApiClient, self)._ApiClient__call_api( + resource_path, method, + path_params, query_params, header_params, + body, post_params, files, + response_types_map, auth_settings, + _return_http_data_only, collection_formats, + _preload_content, _request_timeout, _host, _request_auth, + ) + finally: + self._local.bypass_active_passive = False + + for attempt in range(REQUEST_ATTEMPTS): + active_host = self._ensure_active_host() + self.configuration.host = active_host + + try: + return super(ActivePassiveApiClient, self)._ApiClient__call_api( + resource_path, method, + path_params, query_params, header_params, + body, post_params, files, + response_types_map, auth_settings, + _return_http_data_only, collection_formats, + _preload_content, _request_timeout, _host, _request_auth, + ) + except ApiException as e: + if e.status == HTTP_STATUS_TEMPORARY_REDIRECT: + with self._lock: + failed_host = self._current_active_host + self._clear_active_host() + if self.failover_strategy in ( + FailoverStrategy.MANUAL_FAILOVER, + FailoverStrategy.ALWAYS_PROBE, + ): + raise ActivePassiveFailoverRequired( + status=HTTP_STATUS_TEMPORARY_REDIRECT, + host=failed_host, + endpoints=self._ordered_hosts, + strategy=self.failover_strategy, + ) + if attempt == 0: + continue + raise ActivePassiveRetryExhausted( + status=HTTP_STATUS_TEMPORARY_REDIRECT, + host=failed_host, + endpoints=self._ordered_hosts, + strategy=self.failover_strategy, + ) + if self._should_clear_active_host_on_error( + e, response_types_map + ): + self._clear_active_host() + raise + + def _should_clear_active_host_on_error(self, err, response_types_map): + if err.status is None or err.status == 0: + return True + if response_types_map is not None and err.status in response_types_map: + return False + return True diff --git a/cloudtower/api_client.py b/cloudtower/api_client.py index b70ac894..64235c39 100644 --- a/cloudtower/api_client.py +++ b/cloudtower/api_client.py @@ -22,7 +22,8 @@ # python 2 and python 3 compatibility library import six -from six.moves.urllib.parse import quote +from six.moves.urllib.parse import quote, urlparse +import urllib3 from cloudtower.configuration import Configuration import cloudtower.models @@ -394,26 +395,29 @@ def call_api(self, resource_path, method, def request(self, method, url, query_params=None, headers=None, post_params=None, body=None, _preload_content=True, - _request_timeout=None): + _request_timeout=None, redirect=None): """Makes the HTTP request using RESTClient.""" if method == "GET": return self.rest_client.GET(url, query_params=query_params, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, headers=headers) elif method == "HEAD": return self.rest_client.HEAD(url, query_params=query_params, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, headers=headers) elif method == "OPTIONS": return self.rest_client.OPTIONS(url, query_params=query_params, headers=headers, _preload_content=_preload_content, - _request_timeout=_request_timeout) + _request_timeout=_request_timeout, + redirect=redirect) elif method == "POST": return self.rest_client.POST(url, query_params=query_params, @@ -421,6 +425,7 @@ def request(self, method, url, query_params=None, headers=None, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, body=body) elif method == "PUT": return self.rest_client.PUT(url, @@ -429,6 +434,7 @@ def request(self, method, url, query_params=None, headers=None, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, body=body) elif method == "PATCH": return self.rest_client.PATCH(url, @@ -437,6 +443,7 @@ def request(self, method, url, query_params=None, headers=None, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, body=body) elif method == "DELETE": return self.rest_client.DELETE(url, @@ -444,6 +451,7 @@ def request(self, method, url, query_params=None, headers=None, headers=headers, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, body=body) else: raise ApiValueError( @@ -451,6 +459,56 @@ def request(self, method, url, query_params=None, headers=None, " `POST`, `PATCH`, `PUT` or `DELETE`." ) + def _do_probe(self, url, timeout=None): + """Send a healthz probe to the given URL. + + :param url: Full probe URL (e.g. http://host/api/healthz). + :return: True if active (200), False if passive (307). + :param timeout: Probe timeout in seconds. + :raises ApiException: On network errors or unexpected status codes. + """ + if timeout is None: + timeout = 5 + try: + r = self.rest_client.pool_manager.request( + "GET", url, + redirect=False, + timeout=urllib3.Timeout(total=timeout), + headers={"Accept": "application/json"}, + ) + if r.status == 200: + return True + elif r.status == 307: + return False + else: + raise ApiException( + status=r.status, + reason="probe active-passive returned unexpected status", + ) + except urllib3.exceptions.HTTPError as e: + raise ApiException(status=0, reason=str(e)) + + def probe_active_passive(self): + """Probe whether the current endpoint is active. + + Sends a GET request to /api/healthz on the endpoint base URL. + Returns True if the endpoint is active (200), False if passive (307). + Any other status is treated as an error. + + :return: True if active, False if passive. + :rtype: bool + :raises ApiException: If the probe request fails or returns an unexpected status. + """ + parsed = urlparse(self.configuration.host) + if not parsed.scheme or not parsed.netloc: + raise ApiException( + status=0, + reason="probe active-passive missing host", + ) + + url = "%s://%s/api/healthz" % (parsed.scheme, parsed.netloc) + return self._do_probe(url) + def parameters_to_tuples(self, params, collection_formats): """Get parameters as list of tuples, formatting collections. diff --git a/cloudtower/rest.py b/cloudtower/rest.py index 051dfdc2..dccdbd4f 100644 --- a/cloudtower/rest.py +++ b/cloudtower/rest.py @@ -92,7 +92,7 @@ def __init__(self, configuration, pools_size=4, maxsize=None): def request(self, method, url, query_params=None, headers=None, body=None, post_params=None, _preload_content=True, - _request_timeout=None): + _request_timeout=None, redirect=None): """Perform requests. :param method: http request method @@ -110,6 +110,8 @@ def request(self, method, url, query_params=None, headers=None, number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. + :param redirect: Whether urllib3 should follow redirects. If unset, + urllib3 uses its default behavior. """ method = method.upper() assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT', @@ -135,6 +137,10 @@ def request(self, method, url, query_params=None, headers=None, if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' + redirect_kwargs = {} + if redirect is not None: + redirect_kwargs["redirect"] = redirect + try: # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: @@ -149,7 +155,8 @@ def request(self, method, url, query_params=None, headers=None, body=request_body, preload_content=_preload_content, timeout=timeout, - headers=headers) + headers=headers, + **redirect_kwargs) elif headers['Content-Type'] == 'application/x-www-form-urlencoded': # noqa: E501 r = self.pool_manager.request( method, url, @@ -157,7 +164,8 @@ def request(self, method, url, query_params=None, headers=None, encode_multipart=False, preload_content=_preload_content, timeout=timeout, - headers=headers) + headers=headers, + **redirect_kwargs) elif headers['Content-Type'] == 'multipart/form-data': # must del headers['Content-Type'], or the correct # Content-Type which generated by urllib3 will be @@ -169,7 +177,8 @@ def request(self, method, url, query_params=None, headers=None, encode_multipart=True, preload_content=_preload_content, timeout=timeout, - headers=headers) + headers=headers, + **redirect_kwargs) # Pass a `string` parameter directly in the body to support # other content types than Json when `body` argument is # provided in serialized form @@ -180,7 +189,8 @@ def request(self, method, url, query_params=None, headers=None, body=request_body, preload_content=_preload_content, timeout=timeout, - headers=headers) + headers=headers, + **redirect_kwargs) else: # Cannot generate the request from given parameters msg = """Cannot prepare a request message for provided @@ -193,7 +203,8 @@ def request(self, method, url, query_params=None, headers=None, fields=query_params, preload_content=_preload_content, timeout=timeout, - headers=headers) + headers=headers, + **redirect_kwargs) except urllib3.exceptions.SSLError as e: msg = "{0}\n{1}".format(type(e).__name__, str(e)) raise ApiException(status=0, reason=msg) @@ -222,66 +233,77 @@ def request(self, method, url, query_params=None, headers=None, return r def GET(self, url, headers=None, query_params=None, _preload_content=True, - _request_timeout=None): + _request_timeout=None, redirect=None): return self.request("GET", url, headers=headers, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, query_params=query_params) def HEAD(self, url, headers=None, query_params=None, _preload_content=True, - _request_timeout=None): + _request_timeout=None, redirect=None): return self.request("HEAD", url, headers=headers, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, query_params=query_params) def OPTIONS(self, url, headers=None, query_params=None, post_params=None, - body=None, _preload_content=True, _request_timeout=None): + body=None, _preload_content=True, _request_timeout=None, + redirect=None): return self.request("OPTIONS", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, body=body) def DELETE(self, url, headers=None, query_params=None, body=None, - _preload_content=True, _request_timeout=None): + _preload_content=True, _request_timeout=None, redirect=None): return self.request("DELETE", url, headers=headers, query_params=query_params, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, body=body) def POST(self, url, headers=None, query_params=None, post_params=None, - body=None, _preload_content=True, _request_timeout=None): + body=None, _preload_content=True, _request_timeout=None, + redirect=None): return self.request("POST", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, body=body) def PUT(self, url, headers=None, query_params=None, post_params=None, - body=None, _preload_content=True, _request_timeout=None): + body=None, _preload_content=True, _request_timeout=None, + redirect=None): return self.request("PUT", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, body=body) def PATCH(self, url, headers=None, query_params=None, post_params=None, - body=None, _preload_content=True, _request_timeout=None): + body=None, _preload_content=True, _request_timeout=None, + redirect=None): return self.request("PATCH", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, + redirect=redirect, body=body) diff --git a/cloudtower/utils.py b/cloudtower/utils.py index 554818b5..93f45bb5 100644 --- a/cloudtower/utils.py +++ b/cloudtower/utils.py @@ -97,17 +97,17 @@ def login(api_client, username, password, source="LOCAL"): :params api_client: (required) api client to set up login status :type api_client: ApiClient :param username: (required) username to login - :type username: str + :type username: str :param password: (required) password to login :type password: str :param source: login user's source, default is local - :type password: UserSource + :type source: UserSource """ user_api = UserApi(api_client) login_params = { "username": username, "password": password, - "source": source + "source": source, } if source == UserSource.LDAP: host = api_client.configuration.host @@ -132,7 +132,6 @@ def login(api_client, username, password, source="LOCAL"): pass login_res = user_api.login(login_params) api_client.configuration.api_key["Authorization"] = login_res.data.token - return def get_svt_image_version(path): @@ -140,4 +139,4 @@ def get_svt_image_version(path): with open(path, "rb") as file: file.seek(32*1024+190) p = file.read(128).decode("utf-8").strip() - return p \ No newline at end of file + return p diff --git a/test/test_active_passive_client.py b/test/test_active_passive_client.py new file mode 100644 index 00000000..49fe9c02 --- /dev/null +++ b/test/test_active_passive_client.py @@ -0,0 +1,438 @@ +# coding: utf-8 +from __future__ import absolute_import + +import pytest +try: + from unittest import mock +except ImportError: + import mock + +from cloudtower.active_passive_client import ( + ActivePassiveApiClient, + ActivePassiveNoEndpoints, + ActivePassiveDuplicateHost, + ActivePassiveNoActiveHost, + ActivePassiveMultipleActives, + ActivePassiveFailoverRequired, + ActivePassiveRetryExhausted, + FailoverStrategy, +) +from cloudtower.api_client import ApiClient +from cloudtower.exceptions import ApiException + + +class FakeResponse(object): + def __init__(self, status, data=b""): + self.status = status + self.data = data + self.reason = "OK" + + def getheaders(self): + return {} + + def getheader(self, name, default=None): + return default + + +def make_client(endpoints=("host-a", "host-b"), **kwargs): + return ActivePassiveApiClient( + endpoints=list(endpoints), + base_path="/v2/api", + schemes=["http"], + **kwargs + ) + + +class TestActivePassiveApiClientInit(object): + def test_exception_subclasses_have_default_reason(self): + assert ActivePassiveNoEndpoints(status=0).reason == ( + "active-passive client requires at least one endpoint" + ) + assert ActivePassiveFailoverRequired(status=307).reason == ( + "active-passive failover required" + ) + + def test_none_endpoints_raises(self): + with pytest.raises(ActivePassiveNoEndpoints): + ActivePassiveApiClient(endpoints=None) + + def test_no_endpoints_raises(self): + with pytest.raises(ActivePassiveNoEndpoints): + ActivePassiveApiClient(endpoints=[]) + + def test_duplicate_endpoints_raises(self): + with pytest.raises(ActivePassiveDuplicateHost) as exc_info: + ActivePassiveApiClient(endpoints=["host-a", "host-a"]) + assert exc_info.value.host == "host-a" + assert exc_info.value.endpoints == ["host-a", "host-a"] + + +class TestDiscovery(object): + def test_single_active_host(self): + client = make_client(endpoints=("host-a",)) + with mock.patch.object( + client.rest_client.pool_manager, "request" + ) as mock_req: + mock_req.return_value = FakeResponse(200) + host = client._ensure_active_host() + assert host == "http://host-a/v2/api" + assert client.current_active_host == "host-a" + + def test_passive_skipped_active_selected(self): + client = make_client(endpoints=("host-a", "host-b")) + + def side_effect(method, url, **kwargs): + if "host-a" in url: + return FakeResponse(307) + return FakeResponse(200) + + with mock.patch.object( + client.rest_client.pool_manager, "request", side_effect=side_effect + ): + host = client._ensure_active_host() + assert host == "http://host-b/v2/api" + assert client.current_active_host == "host-b" + + def test_no_active_raises(self): + client = make_client(endpoints=("host-a", "host-b")) + with mock.patch.object( + client.rest_client.pool_manager, "request" + ) as mock_req: + mock_req.return_value = FakeResponse(307) + with pytest.raises(ActivePassiveNoActiveHost) as exc_info: + client._ensure_active_host() + assert exc_info.value.endpoints == ["host-a", "host-b"] + assert exc_info.value.failures is None + + def test_multiple_active_raises(self): + client = make_client(endpoints=("host-a", "host-b")) + with mock.patch.object( + client.rest_client.pool_manager, "request" + ) as mock_req: + mock_req.return_value = FakeResponse(200) + with pytest.raises(ActivePassiveMultipleActives) as exc_info: + client._ensure_active_host() + assert exc_info.value.endpoints == ["host-a", "host-b"] + assert exc_info.value.active_hosts == ["host-a", "host-b"] + + def test_probe_error_recorded(self): + client = make_client(endpoints=("host-a",)) + with mock.patch.object( + client.rest_client.pool_manager, "request" + ) as mock_req: + mock_req.side_effect = Exception("connection refused") + with pytest.raises(ApiException) as exc_info: + client._ensure_active_host() + assert "connection refused" in str(exc_info.value) + assert exc_info.value.endpoints == ["host-a"] + assert exc_info.value.failures == ["host-a: connection refused"] + + def test_probe_uses_configured_timeout(self): + client = make_client(endpoints=("host-a",), probe_timeout=12) + with mock.patch.object( + client.rest_client.pool_manager, "request" + ) as mock_req: + def probe_side_effect(method, url, **kwargs): + if "host-a" in url: + return FakeResponse(200) + return FakeResponse(307) + + mock_req.side_effect = probe_side_effect + client._ensure_active_host() + + timeout = mock_req.call_args[1]["timeout"] + assert timeout.total == 12 + + +class TestRequestRouting(object): + def test_request_uses_active_host(self): + client = make_client(endpoints=("host-a",)) + with mock.patch.object( + client.rest_client.pool_manager, "request" + ) as mock_req: + def probe_side_effect(method, url, **kwargs): + if "host-a" in url: + return FakeResponse(200) + return FakeResponse(307) + + mock_req.side_effect = probe_side_effect + client._ensure_active_host() + + with mock.patch( + "cloudtower.api_client.ApiClient._ApiClient__call_api" + ) as mock_call: + mock_call.return_value = ("ok", 200, {}) + client.call_api("/test", "GET") + + assert client.configuration.host == "http://host-a/v2/api" + assert client.current_active_host == "host-a" + assert mock_call.call_count == 1 + + def test_request_disables_redirect_explicitly(self): + client = make_client(endpoints=("host-a",)) + calls = [] + + def request_side_effect(method, url, **kwargs): + calls.append((method, url, kwargs)) + return FakeResponse(200) + + with mock.patch.object( + client.rest_client.pool_manager, + "request", + side_effect=request_side_effect, + ): + client.request("GET", "http://host-a/v2/api/test") + + assert calls[0][2]["redirect"] is False + assert calls[1][2]["redirect"] is False + + def test_regular_api_client_keeps_default_redirect_behavior(self): + client = ApiClient() + + with mock.patch.object( + client.rest_client.pool_manager, + "request", + return_value=FakeResponse(200), + ) as mock_req: + client.request("GET", "http://host-a/v2/api/test") + + assert "redirect" not in mock_req.call_args[1] + + def test_307_triggers_failover_and_retry(self): + client = make_client(endpoints=("host-a", "host-b")) + + probe_calls = [] + + def first_probe_side_effect(method, url, **kwargs): + probe_calls.append(url) + if "host-a" in url: + return FakeResponse(200) + return FakeResponse(307) + + with mock.patch.object( + client.rest_client.pool_manager, "request", side_effect=first_probe_side_effect + ): + # First discover caches host-a + client._ensure_active_host() + + # Simulate host-a becoming passive on the first real request + call_count = [0] + + def call_api_side_effect(*args, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + raise ApiException(status=307, reason="switch") + return ("ok", 200, {}) + + with mock.patch( + "cloudtower.api_client.ApiClient._ApiClient__call_api", + side_effect=call_api_side_effect, + ): + # After the first 307, discover runs again. + # We need host-b to be active now. + def second_probe_side_effect(method, url, **kwargs): + if "host-a" in url: + return FakeResponse(307) + return FakeResponse(200) + + with mock.patch.object( + client.rest_client.pool_manager, + "request", + side_effect=second_probe_side_effect, + ): + result = client.call_api("/test", "GET") + + assert result == ("ok", 200, {}) + assert call_count[0] == 2 + + def test_second_307_raises_retry_exhausted(self): + client = make_client(endpoints=("host-a", "host-b")) + + def first_probe_side_effect(method, url, **kwargs): + if "host-a" in url: + return FakeResponse(200) + return FakeResponse(307) + + with mock.patch.object( + client.rest_client.pool_manager, "request", side_effect=first_probe_side_effect + ): + client._ensure_active_host() + + call_count = [0] + + def call_api_side_effect(*args, **kwargs): + call_count[0] += 1 + raise ApiException(status=307, reason="switch") + + with mock.patch( + "cloudtower.api_client.ApiClient._ApiClient__call_api", + side_effect=call_api_side_effect, + ): + def second_probe_side_effect(method, url, **kwargs): + if "host-a" in url: + return FakeResponse(307) + return FakeResponse(200) + + with mock.patch.object( + client.rest_client.pool_manager, + "request", + side_effect=second_probe_side_effect, + ): + with pytest.raises(ActivePassiveRetryExhausted): + client.call_api("/test", "GET") + + assert call_count[0] == 2 + + def test_manual_failover_on_307(self): + client = make_client( + endpoints=("host-a", "host-b"), + failover_strategy=FailoverStrategy.MANUAL_FAILOVER, + ) + + def probe_side_effect(method, url, **kwargs): + if "host-a" in url: + return FakeResponse(200) + return FakeResponse(307) + + with mock.patch.object( + client.rest_client.pool_manager, + "request", + side_effect=probe_side_effect, + ): + client._ensure_active_host() + + with mock.patch( + "cloudtower.api_client.ApiClient._ApiClient__call_api" + ) as mock_call: + mock_call.side_effect = ApiException(status=307, reason="switch") + with pytest.raises(ActivePassiveFailoverRequired) as exc_info: + client.call_api("/test", "GET") + assert exc_info.value.host == "host-a" + assert exc_info.value.endpoints == ["host-a", "host-b"] + assert exc_info.value.strategy == FailoverStrategy.MANUAL_FAILOVER + + def test_always_probe_strategy(self): + client = make_client( + endpoints=("host-a",), + failover_strategy=FailoverStrategy.ALWAYS_PROBE, + ) + probe_count = [0] + + def probe_side_effect(method, url, **kwargs): + probe_count[0] += 1 + return FakeResponse(200) + + with mock.patch.object( + client.rest_client.pool_manager, "request", side_effect=probe_side_effect + ): + with mock.patch( + "cloudtower.api_client.ApiClient._ApiClient__call_api" + ) as mock_call: + mock_call.return_value = ("ok", 200, {}) + client.call_api("/test", "GET") + client.call_api("/test", "GET") + + assert probe_count[0] == 2 + + def test_always_probe_307_requires_manual_retry(self): + client = make_client( + endpoints=("host-a",), + failover_strategy=FailoverStrategy.ALWAYS_PROBE, + ) + + with mock.patch.object( + client.rest_client.pool_manager, "request" + ) as mock_req: + mock_req.return_value = FakeResponse(200) + with mock.patch( + "cloudtower.api_client.ApiClient._ApiClient__call_api" + ) as mock_call: + mock_call.side_effect = ApiException(status=307, reason="switch") + with pytest.raises(ActivePassiveFailoverRequired): + client.call_api("/test", "GET") + + assert mock_call.call_count == 1 + + def test_explicit_host_bypasses_active_passive(self): + client = make_client(endpoints=("host-a",)) + with mock.patch.object( + client, "_ensure_active_host", + side_effect=AssertionError("probe should not run"), + ): + def call_api_side_effect(*args, **kwargs): + client.request("GET", "http://explicit/v2/api/test") + return ("ok", 200, {}) + + with mock.patch( + "cloudtower.api_client.ApiClient._ApiClient__call_api", + side_effect=call_api_side_effect, + ): + with mock.patch( + "cloudtower.api_client.ApiClient.request", + return_value=FakeResponse(200), + ): + client.call_api( + "/test", "GET", _host="http://explicit/v2/api" + ) + + # No probe should have been triggered because _host was supplied + assert client.current_active_host is None + + def test_recognizable_error_keeps_cache(self): + client = make_client(endpoints=("host-a",)) + with mock.patch.object( + client.rest_client.pool_manager, "request" + ) as mock_req: + mock_req.return_value = FakeResponse(200) + client._ensure_active_host() + + with mock.patch( + "cloudtower.api_client.ApiClient._ApiClient__call_api" + ) as mock_call: + mock_call.side_effect = ApiException(status=400, reason="bad request") + with pytest.raises(ApiException): + client.call_api( + "/test", + "GET", + response_types_map={200: "str", 400: "ErrorBody"}, + ) + + assert client.current_active_host == "host-a" + + def test_unrecognized_error_clears_cache(self): + client = make_client(endpoints=("host-a",)) + with mock.patch.object( + client.rest_client.pool_manager, "request" + ) as mock_req: + mock_req.return_value = FakeResponse(200) + client._ensure_active_host() + + assert client.current_active_host == "host-a" + + with mock.patch( + "cloudtower.api_client.ApiClient._ApiClient__call_api" + ) as mock_call: + mock_call.side_effect = ApiException(status=502, reason="bad gateway") + with pytest.raises(ApiException): + client.call_api("/test", "GET") + + assert client.current_active_host is None + + +class TestAsync(object): + def test_async_request(self): + client = make_client(endpoints=("host-a",)) + with mock.patch.object( + client.rest_client.pool_manager, "request" + ) as mock_req: + mock_req.return_value = FakeResponse(200) + client._ensure_active_host() + + with mock.patch( + "cloudtower.api_client.ApiClient._ApiClient__call_api" + ) as mock_call: + mock_call.return_value = ("ok", 200, {}) + future = client.call_api("/test", "GET", async_req=True) + result = future.get(timeout=5) + + assert result == ("ok", 200, {})