Source code for datenguidepy.query_execution

from typing import Dict, Any, cast, Optional, NamedTuple, List, Tuple, Union
from typing_extensions import Protocol
import requests
import re

from datenguidepy.schema_json_meta import get_schema_json, get_json_path

Json_Dict = Dict[str, Any]
Json_List = List[Json_Dict]
Json = Union[Json_Dict, Json_List]

StatMeta = Dict[str, str]
UnitMeta = Dict[str, str]
EnumMeta = Dict[str, Dict[Optional[str], str]]
QueryResultsMeta = Dict[str, Union[StatMeta, EnumMeta, UnitMeta]]


[docs]class ExecutionResults(NamedTuple): """Results of a query with the results itself and the according meta data. """ query_results: Json_List meta_data: QueryResultsMeta
[docs] def contains_undefined_region_result(self): query_results_with_empty_region = list( filter( lambda query_result: query_result["data"]["region"] is None, self.query_results, ) ) return len(query_results_with_empty_region) > 0
[docs]class TypeMetaData(NamedTuple): """The meta data of a field, which consist of the kind, fields and enum values. """ kind: str fields: Optional[Json_Dict] enum_values: Optional[Dict[str, str]]
[docs]class FieldMetaDict(dict): """[description] """
[docs] def get_return_type(self) -> str: """Returns the return type of the field of the FieldMetaDict. :return: The return type of the field. :rtype: str """ if self["type"]["kind"] == "LIST": return self["type"]["ofType"]["name"] else: return self["type"]["name"]
[docs] def get_arguments(self) -> Dict[str, Tuple[Optional[str], ...]]: """[summary] :return: [description] :rtype: Dict[str, Tuple[Optional[str], ...]] """ def get_type_of( argument: Dict[str, Any] ) -> Tuple[Optional[str], Optional[str]]: if argument["type"]["ofType"]: return ( argument["type"]["ofType"]["kind"], argument["type"]["ofType"]["name"], ) else: return None, None return { cast(str, arg["name"]): ( cast(Optional[str], arg.get("type", {}).get("kind", {})), cast(Optional[str], arg.get("type", {}).get("name")), *get_type_of(arg), ) for arg in self["args"] }
[docs]def check_http200_body_error(body_json: Json_Dict) -> None: if "errors" in body_json: raise RuntimeError( "Body contains the following error content\n" + str(body_json) )
[docs]class GraphQlSchemaMetaDataProvider(object): """ The GraphQlSchema meta data priovider helps to obtain meta data about the structure of the Graph QL api. As such it helps to privde information as to how structurally correct queries are build. It does not directly supply information about statistics. """ endpoint: str = "https://api-next.datengui.de/graphql" REQUEST_HEADER: Dict[str, str] = {"Content-Type": "application/json"} _META_DATA_CACHE: Json_Dict = dict() _meta_type_info: str = """ query TypeInfo($type: String!) { __type(name: $type) { kind enumValues { name description } fields { name type { ofType { name } kind name description } description args { name type { kind name ofType { name description kind } } } } } } """ def __init__(self, endpoint=None): if endpoint is not None: self.endpoint = endpoint
[docs] def get_type_info( self, graph_ql_type: str, verbose=False ) -> Optional[TypeMetaData]: """Returns a json which at top level is a dict with all the fields of the type :param graph_ql_type: [description] :type graph_ql_type: str :param verbose: [description], defaults to False :type verbose: bool, optional :return: [description] :rtype: Optional[TypeMetaData] """ if graph_ql_type in self.__class__._META_DATA_CACHE: if verbose: print("use cache") return self.__class__._META_DATA_CACHE[graph_ql_type] variables = {"type": graph_ql_type} query_json: Json_Dict = {} query_json["query"] = self._meta_type_info query_json["variables"] = variables if verbose: print("query REST API") info = self._send_request(query_json) if info: type_kind = info["data"]["__type"]["kind"] if type_kind == "OBJECT": field_meta: Optional[Json_Dict] = { f["name"]: FieldMetaDict(f) for f in info["data"]["__type"]["fields"] } else: field_meta = None if type_kind == "ENUM": enum_vals: Optional[Dict[str, str]] = { value["name"]: value["description"] for value in info["data"]["__type"]["enumValues"] } else: enum_vals = None type_meta = TypeMetaData(type_kind, field_meta, enum_vals) self.__class__._META_DATA_CACHE[graph_ql_type] = type_meta return type_meta else: return None
def _send_request(self, query_json: Json_Dict) -> Optional[Json_Dict]: resp = requests.post( self.endpoint, headers=self.REQUEST_HEADER, json=query_json ) if resp.status_code == 200: body_json = resp.json() check_http200_body_error(body_json) return body_json else: raise RuntimeError( self.endpoint + "\n" + f"No result, got HTML status code {resp.status_code}" )
[docs]class StatisticsMetaDataProvider(Protocol):
[docs] def get_query_stat_meta( self, query_fields_with_types: List[Tuple[str, str]] ) -> StatMeta: ...
[docs] def get_query_enum_meta( self, query_fields_with_types: List[Tuple[str, str]] ) -> EnumMeta: ...
[docs] def get_query_unit_meta( self, query_fields_with_types: List[Tuple[str, str]] ) -> UnitMeta: ...
[docs] def get_stat_descriptions(self) -> Dict[str, Tuple[str, str]]: ...
[docs] def is_statistic(self, stat_candidate: str) -> bool: ...
[docs]class StatisticsGraphQlMetaDataProvider(object): """ Statistics meta data providers help to supply informations about details pertaining to certain statistics that can be obtained via the API. This type of meta information is not API specific and can be obtained from different sources. This particular data provider uses graphql meta data information to provide results. """ def __init__(self, endpoint=None): self.schema_meta_data_provider = GraphQlSchemaMetaDataProvider( endpoint=endpoint )
[docs] def get_query_stat_meta( self, query_fields_with_types: List[Tuple[str, str]] ) -> StatMeta: # Region type contains all the statistics fields query_fields = [ field_with_type[0] for field_with_type in query_fields_with_types ] stat_descriptions = self.get_stat_descriptions() stat_meta = { stat: stat_descriptions[stat][0] for stat in stat_descriptions if stat in query_fields } return stat_meta
[docs] def get_query_unit_meta( self, query_fields_with_types: List[Tuple[str, str]] ) -> StatMeta: return { stat: "StatisticsGraphQlMetaDataProvider does not provide unit information." for stat, ty in query_fields_with_types if self.is_statistic(stat) }
[docs] def get_query_enum_meta( self, query_fields_with_types: List[Tuple[str, str]] ) -> EnumMeta: enum_meta: EnumMeta = {} for field, field_type in query_fields_with_types: type_info = self.schema_meta_data_provider.get_type_info(field_type) if type_info is None: enum_meta[field] = {"error": "ENUM META DATA COULD NOT BE LOADED"} if cast(TypeMetaData, type_info).kind == "ENUM": enum_meta[field] = cast( Dict[Optional[str], str], cast(TypeMetaData, type_info).enum_values ) return enum_meta
@staticmethod def _process_stat_meta_data(type_fields: Json_Dict) -> List[Json_Dict]: return [ type_fields[name] for name in type_fields if "statistics" in type_fields[name].get_arguments() ] @staticmethod def _extract_main_description(description: str) -> str: match = re.match(r"^\s*\*\*([^*]*)\*\*", description) if match: return match.group(1) else: return "NO DESCRIPTION FOUND"
[docs] def get_stat_descriptions(self) -> Dict[str, Tuple[str, str]]: """[summary] :return: [description] :rtype: [type] """ stat_meta = self.schema_meta_data_provider.get_type_info("Region") if stat_meta: stat_descriptions = self._create_stat_desc_dic( # casting given "Regions" type cast(Json_Dict, cast(TypeMetaData, stat_meta).fields) ) return stat_descriptions else: raise RuntimeError("Meta data provider was anable to fetch statistics")
[docs] def is_statistic(self, stat_candidate: str) -> bool: return stat_candidate in self.get_stat_descriptions()
@staticmethod def _create_stat_desc_dic(raw_response: Json_Dict) -> Dict[str, Tuple[str, str]]: return dict( ( field["name"], ( StatisticsGraphQlMetaDataProvider._extract_main_description( field["description"] ), field["description"], ), ) for field in StatisticsGraphQlMetaDataProvider._process_stat_meta_data( raw_response ) )
[docs]class StatisticsSchemaJsonMetaDataProvider(object): """ Statistics meta data providers help to supply informations about details pertaining to certain statistics that can be obtained via the API. This type of meta information is not API specific and can be obtained from different sources. This particular data provider the hard copy of a schema file from the SOAP cubes that datenguide extracts fron GENESIS and transfers into their API. """ def __init__(self): self._full_data_json = [get_schema_json()] @property def stat_names(self): return [ k for stat in get_json_path(self._full_data_json, ["..", "measures"]) for k in stat.keys() ]
[docs] def get_query_stat_meta( self, query_fields_with_types: List[Tuple[str, str]] ) -> StatMeta: fields = [field for field, _ in query_fields_with_types] sd = self.get_stat_descriptions() return {stat: sd[stat][0] for stat in sd if stat in fields}
[docs] def get_query_unit_meta( self, query_fields_with_types: List[Tuple[str, str]] ) -> UnitMeta: fields = [field for field, _ in query_fields_with_types] units = self.get_stat_units() return {stat: units[stat] for stat in units if stat in fields}
[docs] def get_query_enum_meta( self, query_fields_with_types: List[Tuple[str, str]] ) -> EnumMeta: enum_meta: EnumMeta = {} for field, _ in query_fields_with_types: enum_values = get_json_path( self._full_data_json, ["..", "measures", "..", "dimensions", field, "value_names"], ) if len(enum_values) > 0: gesamt_update = {"GESAMT": "Gesamt"} enum_meta[field] = dict(enum_values[0], **gesamt_update) return enum_meta
[docs] def is_statistic(self, stat_candidate: str) -> bool: return stat_candidate in self.stat_names
[docs] def get_stat_units(self) -> Dict[str, str]: def get_unit_info(unit_json): return unit_json[0]["measure_name_de"] stat_names = get_json_path( self._full_data_json, ["..", "measures", "..", "name"] ) units = map( get_unit_info, get_json_path(self._full_data_json, ["..", "measures", "..", "units"]), ) return dict(zip(stat_names, units))
[docs] def get_stat_descriptions(self) -> Dict[str, Tuple[str, str]]: stat_descriptions_short = get_json_path( self._full_data_json, ["..", "measures", "..", "title_de"] ) stat_descriptions_long = get_json_path( self._full_data_json, ["..", "measures", "..", "definition_de"] ) return { name: (short, long) for name, short, long in zip( self.stat_names, stat_descriptions_short, stat_descriptions_long ) }
[docs] def get_enum_values(self) -> Dict[str, Dict[str, str]]: names = get_json_path( self._full_data_json, ["..", "measures", "..", "dimensions", "..", "name"] ) values = get_json_path( self._full_data_json, ["..", "measures", "..", "dimensions", "..", "value_names"], ) gesamt_update = {"GESAMT": "Gesamt"} return {name: dict(vs, **gesamt_update) for name, vs in zip(names, values)}
DEFAULT_STATISTICS_META_DATA_PROVIDER = StatisticsSchemaJsonMetaDataProvider()
[docs]class QueryExecutioner(object): """Queries the Datenguide API for data and meta data. :param alternative_endpoint: [description], defaults to None :type alternative_endpoint: Optional[str], optional :return: [description] :rtype: None """ REQUEST_HEADER: Dict[str, str] = {"Content-Type": "application/json"} endpoint: str = "https://api-next.datengui.de/graphql" def __init__( self, alternative_endpoint: Optional[str] = None, statistics_meta_data_provider=None, ) -> None: if alternative_endpoint: self.endpoint = cast(str, alternative_endpoint) self.graph_ql_schema_meta_data_provider = GraphQlSchemaMetaDataProvider( self.endpoint ) if statistics_meta_data_provider is None: self.stat_meta_data_provider = DEFAULT_STATISTICS_META_DATA_PROVIDER else: self.stat_meta_data_provider = statistics_meta_data_provider
[docs] def get_type_info( self, graph_ql_type: str, verbose=False ) -> Optional[TypeMetaData]: """Returns a json which at top level is a dict with all the fields of the type :param graph_ql_type: [description] :type graph_ql_type: str :param verbose: [description], defaults to False :type verbose: bool, optional :return: [description] :rtype: Optional[TypeMetaData] """ return self.graph_ql_schema_meta_data_provider.get_type_info( graph_ql_type, verbose )
@staticmethod def _pagination_json(page: int) -> Json_Dict: return {"page": page, "itemsPerPage": 1000}
[docs] def run_query(self, query) -> Optional[List[ExecutionResults]]: """[summary] :param query: [description] :type query: [type] :return: [description] :rtype: Optional[List[ExecutionResults]] """ all_results = [ self._run_single_query_json(query_json, query._get_fields_with_types()) for query_json in self._generate_post_json(query) ] if not any(map(lambda r: r is None, all_results)): return [cast(ExecutionResults, r) for r in all_results] else: return None
def _run_single_query_json( self, query_json: Json_Dict, query_fields_with_types: List[Tuple[str, str]] ) -> Optional[ExecutionResults]: if "allRegions" in [ field_with_types[0] for field_with_types in query_fields_with_types ]: results = [] page = 0 while True: query_json["variables"] = self._pagination_json(page) result_page = self._send_request(query_json) if result_page is None: return None results.append(result_page) if (cast(Json_Dict, result_page)["data"]["allRegions"]["page"] + 1) * ( cast(Json_Dict, result_page)["data"]["allRegions"]["itemsPerPage"] ) >= cast(Json_Dict, result_page)["data"]["allRegions"]["total"]: break else: page += 1 else: single_result = self._send_request(query_json) if single_result is None: return None else: results = [single_result] if results: meta: QueryResultsMeta = dict() meta["statistics"] = self.stat_meta_data_provider.get_query_stat_meta( query_fields_with_types ) meta["enums"] = self.stat_meta_data_provider.get_query_enum_meta( query_fields_with_types ) meta["units"] = self.stat_meta_data_provider.get_query_unit_meta( query_fields_with_types ) return ExecutionResults( query_results=cast(Json_List, results), meta_data=meta ) print(meta) else: return None @staticmethod def _generate_post_json(query) -> List[Dict[str, str]]: jsons: List[Dict[str, str]] = [] for query_string in query.get_graphql_query(): post_json: Json_Dict = dict() post_json["query"] = query_string jsons.append(post_json) return jsons def _send_request(self, query_json: Json_Dict) -> Optional[Json_Dict]: resp = requests.post( self.endpoint, headers=self.REQUEST_HEADER, json=query_json ) if resp.status_code == 200: body_json = resp.json() check_http200_body_error(body_json) return body_json else: raise RuntimeError( self.endpoint + "\n" + f"No result, got HTML status code {resp.status_code}" )