Source code for datenguidepy.output_transformer

import numpy as np
import pandas as pd
from typing import Dict, List, Any, Set, Container, cast

from datenguidepy.query_execution import (
    ExecutionResults,
    StatMeta,
    EnumMeta,
    UnitMeta,
    QueryResultsMeta,
)
import copy


[docs]class QueryOutputTransformer: """Transforms the query results into a DataFrame. :param query_response: Accepts the return type of the query executioner in case a non None value was return. This is a list of ExecutionResults as some python querys may internally be converted into several GraphQL queries to be executed, returnning one result each. :type query_response: List[ExecutionResults] """ def __init__(self, query_response: List[ExecutionResults]) -> None: self.query_response = query_response @staticmethod def _convert_results_to_frame( executioner_result: List[ExecutionResults], remove_duplicates: bool = False ) -> pd.DataFrame: """Converst raw query results to a DataFrame. This function converst thre return values from query_execution functinoality into a pandas DataFrame. :param executioner_result: Raw query results including meta data. :return: DataFrame with query results. """ result_frames = [] for single_query_response in executioner_result: for page in single_query_response.query_results: result_frames.append( QueryOutputTransformer._convert_regions_to_frame( page, single_query_response.meta_data, remove_duplicates ) ) return pd.concat(result_frames) @staticmethod def _convert_regions_to_frame( query_page: Dict[str, Any], meta_data: QueryResultsMeta, remove_duplicates: bool = True, ) -> pd.DataFrame: """Converts and combines raw results for one or more regions. This result converts region output from the API. The Graphql API has two distinct enpoints, one called "region" returning results for a single region and one called "allRegions" which returns results for multiple regions. This function identifies the endpoint that was used and then converts the results for the one or more regions that it finds. If multiple regions are found, their results are concatenated. :param query_page: Single page of API query results as a python dict representation of a json. :meta_data: Query relevant meta data. :return: Converted results possible combined across multiple regions. """ if "region" in query_page["data"]: return QueryOutputTransformer._convert_single_results_to_frame( query_page["data"]["region"], meta_data, remove_duplicates ) elif "allRegions" in query_page["data"]: allRegions = [] for region in query_page["data"]["allRegions"]["regions"]: allRegions.append( QueryOutputTransformer._convert_single_results_to_frame( region, meta_data, remove_duplicates ) ) return pd.concat(allRegions) else: raise RuntimeError( "Only queries containing" + '"region" or "regions" can be transformed' ) @staticmethod def _convert_single_results_to_frame( region_json: Dict[str, Any], meta: QueryResultsMeta, remove_duplicates: bool = False, ) -> pd.DataFrame: """Converts a region sub directory of raw output to a dataframe. This is the main internal method for converting raw API output to dataframes as results are composed of regions. This converts a single regions with the idea that results across regions can be concatenated. This function contains logic for joining data for several statistics in case more than one was queries. Furthermore the columns are conveniently sorted to put the most important information to the left. :param region_json: [description] :param meta: [description] :raises RuntimeError: The raised error is meant to cover the case where quert results were obtained but meta data wasn't possibly due to connection problems. :return: DataFrame with query results for a single region. """ if "error" in meta["statistics"]: raise RuntimeError( "No statistics meta data present. Try rerunning the query" ) statistic_frames = [ QueryOutputTransformer._create_statistic_frame(region_json[stat]) for stat in cast(StatMeta, meta["statistics"]).keys() ] if remove_duplicates: statistic_frames = [frame.drop_duplicates() for frame in statistic_frames] joined_results, join_cols = QueryOutputTransformer._join_statistic_results( statistic_frames, list(cast(StatMeta, meta["statistics"]).keys()) ) column_order = QueryOutputTransformer._determine_column_order( joined_results, join_cols ) general_fields = QueryOutputTransformer._get_general_fields( region_json, cast(StatMeta, meta["statistics"]) ) for field in general_fields: joined_results[field] = region_json[field] renamed_results = QueryOutputTransformer._rename_statistic_fields( joined_results[general_fields + column_order], cast(StatMeta, meta["statistics"]), ) return renamed_results @staticmethod def _get_general_fields( region_json: Dict[str, Any], stat_meta: Dict[str, str] ) -> List[str]: """Extract non statistic specific fields. For the purpouse of arranging dataframe columns this fuction extracts all dicionary fields that do not contain a statistic in their name. :param region_json: Dictionary for a specific region. :param stat_meta: Dictionary containg query meta data. :return: List of fields without statistics. """ return [ field for field in region_json if all(stat not in field for stat in stat_meta.keys()) ] @staticmethod def _rename_statistic_fields( statistic_result: pd.DataFrame, stat_meta: Dict[str, str] ) -> pd.DataFrame: """Renames fields containing the statistic values. By default all statistic related fields are prefixed with the statistic name. As such the reported statistic itself has a column name STATISTIC_value. As the value is the most central column it is renamed into the the simple name STATISTIC. :param statistic_result: Results of a query. :param stat_meta: Meta data related to the query. :return: Results with renamed statistic column. """ rename_mapping = {f"{stat}_value": stat for stat in stat_meta} return statistic_result.rename(columns=rename_mapping) @staticmethod def _create_statistic_frame(statistic_sub_json: Dict[str, Any]) -> pd.DataFrame: """Converst a json to a dataframe. This function converts the dictionary representation of a json to a pandas dataframe. Currenly this uses pandas directly. But it might be sensible to implement custom functionality as this function is the main reason for the pandas 1.0 requirement. :param statistic_sub_json: Python dictionary json representation. :return: Dataframe conversion of the dictionary. """ return pd.json_normalize(statistic_sub_json, sep="_", max_level=1) @staticmethod def _determine_join_columns(statistic_results: List[pd.DataFrame]) -> Set[str]: """Dertermines join columns. When several statistics are queried this functino determines the columns over which to join multiple statistics data frames. This will typically lead to joining over the year column and enums that the statistics have in common. Currently has hardcoded exclusion criteria to never join across columns containing "value" and "source". This is not expected to be a severe limmitation as such joins are considered corner cases and can be achieved by post-join filters should the need arise. :param statistic_results: Dataframes for individual statistics :return: Columns over which to join. """ candidates = { column for frame in statistic_results for column in frame if "value" not in column and "source" not in column } return { candidate for candidate in candidates if all(candidate in frame for frame in statistic_results) } @staticmethod def _prefix_frame_cols( frame: pd.DataFrame, prefix: str, exceptions: Container[str] ) -> pd.DataFrame: """Prefixes dataframe column names. This function prefixes dataframe column names with a given prefix but allows for exceptions to to be specified, i.e. columns that will not be prefixed. :param frame: Dataframe to be prefixed. :param prefix: Prefix to be used. :param exceptions: Columns that will not be prefixed. :return: Dataframe with prefixed columns. """ result_frame = frame.copy() result_frame.columns = [ prefix + "_" + col if col not in exceptions else col for col in result_frame.columns ] return result_frame @staticmethod def _join_statistic_results( statistic_results: List[pd.DataFrame], statistic_names: List[str] ) -> tuple: """Joins dataframes containing different statistics. When joining the frames, columns are first prefixed with statistic names. :param statistic_results: Dataframes with the statistics to be joined. :param statistic_names: Names of the statistics expected to be in the same order as the list of statistic results. :return: Joined frame and the columns over which was joined. """ assert len(statistic_results) == len(statistic_names) join_columns = list( QueryOutputTransformer._determine_join_columns(statistic_results) ) result = QueryOutputTransformer._prefix_frame_cols( statistic_results[0], statistic_names[0], join_columns ) if len(statistic_results) == 1: return result, join_columns else: for statistic, name in zip(statistic_results[1:], statistic_names[1:]): result = result.merge( QueryOutputTransformer._prefix_frame_cols( statistic, name, join_columns ), on=join_columns, how="outer", ) return result, join_columns @staticmethod def _determine_column_order( joined_frame: pd.DataFrame, join_columns: Set[str] ) -> List[str]: """Determines column order for joined dataframe. This function determines a rearrangement of the DataFrame's column list, grouping all source columns to the right and other information particularly the statistics values to the left. :param joined_frame: DataFrame with columns for all the statistics from the executed query :type joined_frame: pd.DataFrame :param join_columns: The columns that where used for joining different statistics :type join_columns: Set[str] :return: List of ordered columns :rtype: List[str] """ join_col_list = list(join_columns) value_columns = [col for col in joined_frame if "value" in col] source_cols = [col for col in joined_frame if "source" in col] remaining_cols = [ col for col in joined_frame if col not in join_columns and col not in value_columns and col not in source_cols ] return join_col_list + value_columns + remaining_cols + source_cols @staticmethod def _make_verbose_statistic_names( output: pd.DataFrame, meta: QueryResultsMeta ) -> pd.DataFrame: """Exchanges statistic column names for short descriptions. By default statistic columns display the statistic code. This function converts the code to the short description, while keeping the code afterward. The aim is to make the dataframe more readable. :param output: Query results results after conversion to a dataframe. :param meta: Query meta data. :return: Dataframe with converted column names. """ descriptions = cast(StatMeta, meta["statistics"]) name_changes = { statistic: f"{descriptions[statistic]} ({statistic})" for statistic in descriptions } return output.rename(columns=name_changes) @staticmethod def _make_verbose_enum_values( output: pd.DataFrame, meta: QueryResultsMeta ) -> pd.DataFrame: """Exchanges enum codes for short descriptions. By default enum codes are displayed in enum columns. This function converts the codes to short descriptions. The aim is to make the dataframe more readable. :param output: Query results results after conversion to a dataframe. :param meta: Query meta data. :return: Dataframe with converted column names. """ enum_mappings = copy.deepcopy(cast(EnumMeta, meta["enums"])) for enum in enum_mappings: enum_mappings[enum][None] = "Gesamt" mapped_frame = output.copy() for col, description_map in enum_mappings.items(): if col in mapped_frame: mapped_frame[col] = mapped_frame[col].map(description_map) else: col_name = next(c for c in mapped_frame if c.endswith(col)) mapped_frame[col_name] = mapped_frame[col_name].map(description_map) return mapped_frame @staticmethod def _add_units(output: pd.DataFrame, meta: QueryResultsMeta) -> pd.DataFrame: """Add units from meta_data to DataFrame. :param output: DataFrame with results :dtype output: pandas.DataFrame :param meta: Dictionary containing metadata for query. :dtype meta: QueryResultsMeta :return: Return DataFrame with results :dtype: pandas.DataFrame :raise NotImplementedError: More than one statistic in Query """ def add_unit(statistic: str, unit: str): if not isinstance(unit, str): raise NotImplementedError("Unit is not a single string.") mask = output.columns.str.contains(statistic) position = int(np.argmax(mask)) output.insert(loc=position + 1, column=f"{statistic}_unit", value=unit) # # ToDo: Uncertain if only one unit is possible per Statistic for statistic, unit in cast(UnitMeta, meta["units"].items()): add_unit(statistic, unit) return output
[docs] def transform( self, verbose_statistic_names: bool = False, verbose_enum_values: bool = False, add_units: bool = False, remove_duplicates: bool = False, ) -> pd.DataFrame: """Transform the queries results into a Pandas DataFrame. This function allows for different flags that make the results more readable by using meta information about the query. By default the dataframe is not enrichted by meta information assuming an experienced user familiar with a particular statistic. For data exploration it is recommended to turn on one or more flags. :param verbose_statistic_names: Toggles statistic codes to short descriptions. :param verbose_enum_values: Toggles enum codes to descriptions if enum columns are present. :param add_units: Toggles the addition of a unit column for each statistic to make it easier to interpret the numbers. :param remove_duplicates: Removes duplicates from query results, i.e. if the exact same number has been reported for the same statistic, year, region etc. from the same source it gets removed. Such duplications are sometimes caused on the API side and this is convenience functionality to remove them. The removal happens before potentially joining several different statistics. :return: Returns a pandas DataFrame of the queries results. """ output = self._convert_results_to_frame(self.query_response, remove_duplicates) if verbose_statistic_names: output = self._make_verbose_statistic_names( output, self.query_response[0].meta_data ) if verbose_enum_values: output = self._make_verbose_enum_values( output, self.query_response[0].meta_data ) if add_units: output = self._add_units(output, self.query_response[0].meta_data) return output