Source code for mip_dmp.process.matching

# Copyright 2023 The HIP team, University Hospital of Lausanne (CHUV), Switzerland & Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module that provides functions to support the matching of dataset columns to CDEs."""

# External imports
import ast
import numpy as np
import pandas as pd
from fuzzywuzzy import fuzz

# Internal imports
from mip_dmp.process.mapping import MAPPING_TABLE_COLUMNS
from mip_dmp.process.embedding import (
    generate_embeddings,
    find_n_closest_embeddings,
)
from mip_dmp.process.utils import is_number


[docs]def match_columns_to_cdes( dataset, schema, nb_kept_matches=10, matching_method="fuzzy", ): """Initialize the mapping table by matching the dataset columns with the CDE codes. Different matching methods can be used: - "fuzzy": Fuzzy matching using the Levenshtein distance. (https://github.com/seatgeek/thefuzz) - "glove": Embedding matching using Glove embeddings at the character level. (https://nlp.stanford.edu/projects/glove/) - "chars2vec": Embedding matching using Chars2Vec embeddings. (https://github.com/IntuitionEngineeringTeam/chars2vec) Parameters ---------- dataset : pandas.DataFrame Dataset to be mapped. schema : pandas.DataFrame Schema to which the dataset is mapped. nb_kept_matches : int Number of matches to keep for each dataset column. matching_method : str Method to be used for matching the dataset columns with the CDE codes. Can be "fuzzy", "glove" or "chars2vec". Returns ------- pandas.DataFrame Mapping table represented as a Pandas DataFrame. matched_cde_codes : dict Dictionary of dictionaries storing the first 10 matched CDE codes with corresponding fuzzy ratio / cosine similarity (value) / and embedding vector for each dataset column (key). It has the form:: { "dataset_column_1": { "words": ["cde_code_1", "cde_code_2", ...], "distances": [0.9, 0.8, ...], "embeddings": [None, None, ...] }, "dataset_column_2": { "words": ["cde_code_1", "cde_code_2", ...], "distances": [0.9, 0.8, ...], "embeddings": [None, None, ...] }, ... } dataset_column_embeddings : list List of embedding vectors for the dataset columns. schema_code_embeddings : list List of embedding vectors for the CDE codes. """ # Create the mapping table. mapping_table = pd.DataFrame(MAPPING_TABLE_COLUMNS) # Add the dataset columns. mapping_table["dataset_column"] = dataset.columns # Initialize a dictionary to store the results of the # first 10 matched CDE codes for each dataset column. matched_cde_codes = {} if matching_method == "fuzzy": print(f"> Perform fuzzy matching with {nb_kept_matches} matches per column.") dataset_column_embeddings, schema_code_embeddings = ( None, None, ) # Not used for fuzzy matching. # Function to find the fuzzy matches for each dataset column. matches = mapping_table["dataset_column"].apply( lambda dataset_column: str( sorted( schema["code"], key=lambda cde_code: fuzz.ratio(dataset_column, cde_code), reverse=True, )[ 0:nb_kept_matches ] # Select the nb_kept_matches first matched CDE codes. ) ) # Store the first nb_fuzzy_matches matched CDE codes in the dictionary. for i, dataset_column in enumerate(dataset.columns): words = ast.literal_eval(matches.to_list()[i]) matched_cde_codes[dataset_column] = { "words": words, "distances": [ (1 - 0.01 * fuzz.ratio(dataset_column, match)) for match in words ], "embeddings": [None] * nb_kept_matches, } elif matching_method == "chars2vec" or matching_method == "glove": print( f"> Perform {matching_method} embedding matching with {nb_kept_matches} matches per column." ) dataset_column_embeddings, schema_code_embeddings = ( generate_embeddings(mapping_table["dataset_column"], matching_method), generate_embeddings(schema["code"], matching_method), ) print(f"> Find {nb_kept_matches} closest embeddings for each dataset column...") n_closest_matches = [ find_n_closest_embeddings( dataset_column_embedding, schema_code_embeddings, schema["code"], nb_kept_matches, ) for dataset_column_embedding in dataset_column_embeddings ] matched_cde_codes = { dataset_column: { "words": n_closest_matches[i]["embedding_words"], "distances": n_closest_matches[i]["distances"], "embeddings": n_closest_matches[i]["embeddings"], } for i, dataset_column in enumerate(mapping_table["dataset_column"]) } # Add the first matched CDE code for each dataset_column. mapping_table["cde_code"] = [ matched_cde_codes[k]["words"][0] for k in matched_cde_codes.keys() ] # Add the CDE type corresponding to the CDE code proposed by fuzzy matching. mapping_table["cde_type"] = [ schema[schema["code"] == cde_code]["type"].iloc[0] for cde_code in mapping_table["cde_code"] ] # Add the transform type based on the CDE type (integer, real, binominal, multinominal). mapping_table["transform_type"] = [ "scale" if cde_type in ["integer", "real"] else "map" for cde_type in mapping_table["cde_type"] ] # Add the transform. mapping_table["transform"] = [ make_initial_transform(dataset, schema, dataset_column, cde_code) for (dataset_column, cde_code) in zip( mapping_table["dataset_column"], mapping_table["cde_code"] ) ] return ( mapping_table, matched_cde_codes, dataset_column_embeddings, schema_code_embeddings, )
[docs]def make_initial_transform(dataset, schema, dataset_column, cde_code): """Make the initial transform. Parameters ---------- dataset : pandas.DataFrame Dataset to be mapped. schema : pandas.DataFrame Schema to which the dataset is mapped. dataset_column : str Dataset column. cde_code : str CDE code. Returns ------- dict Initial transform. """ # Get the CDE type. cde_type = schema[schema["code"] == cde_code]["type"].iloc[0] # Make the initial transform. if cde_type in ["integer", "real"]: return "1.0" elif cde_type in ["binominal", "multinominal", "nominal"]: # Extract the string CDE code encoded / text values from the corresponding cell of # the "values" column of the schema, and format it as a dictionary of the form: # {encoded_value_1: text_value_1, encoded_value_2: text_value_2, ...} cde_code_values_str = ( f'[{schema[schema["code"] == cde_code]["values"].iloc[0]}]' ) # Replace problematic characters. cde_code_values_str = cde_code_values_str.replace("“", '"') cde_code_values_str = cde_code_values_str.replace("”", '"') # Remove surrounding brackets cde_code_values_str = cde_code_values_str.replace("[", "") cde_code_values_str = cde_code_values_str.replace("]", "") # Convert the string to a dictionary. cde_code_values_dict = eval(cde_code_values_str) # Get the unique values of the dataset column and make sure they are strings. dataset_column_values = [ f"{str(dataset_column_value)}" for dataset_column_value in dataset[dataset_column].unique() ] # Extract the CDE code encoded / text values from the dictionary # previously created. if any(is_number(s) for s in dataset_column_values): # If the dataset column values contain numbers, # it means we relabel the encoded integer values # with the new corresponding encoded values of the schema. cde_code_values = [ f"{str(key)}" for key in cde_code_values_dict ] else: # If the dataset column values do not contain numbers, # it means we relabel the text values with the new # corresponding text values of the schema. cde_code_values = [ f"{str(cde_code_values_dict[key])}" for key in cde_code_values_dict ] # Define the initial transform. initial_transform = generate_initial_transform( dataset_column_values, cde_code_values, dataset_column, ) return initial_transform else: raise ValueError(f"Unknown CDE type: {cde_type}")
[docs]def generate_initial_transform(dataset_column_values, cde_code_values, dataset_column): """Generate the initial transform. Parameters ---------- dataset_column_values : list of str Dataset column values. cde_code_values : list of str CDE code values. dataset_column : str Dataset column. Returns ------- initial_transform : str Initial transform. """ # Handle the case where the dataset column values are all NaN. if ( len(dataset_column_values) == 1 and dataset_column_values[0] == "nan" and ("nan" not in cde_code_values) ): print(f"WARNING: The dataset column {dataset_column} has only one NaN value.") return "nan" elif "nan" in dataset_column_values: nb_nan_values = dataset_column_values.count("nan") if nb_nan_values == len(dataset_column_values): print(f"WARNING: The dataset column {dataset_column} has only NaN values.") return "nan" # Handle the case where we have the same number of dataset column values # and CDE code values. if len(dataset_column_values) == len(cde_code_values): # Fuzzy match dataset column values to the CDE code values cde_code_values = [ sorted( cde_code_values, key=lambda cde_code_value, dataset_column_value=dataset_column_value: fuzz.ratio( dataset_column_value, cde_code_value ), reverse=True, )[0] for dataset_column_value in dataset_column_values ] return str( { f"{dataset_column_value}": f"{cde_code_value}" for dataset_column_value, cde_code_value in zip( dataset_column_values, cde_code_values ) } ) # Handle the case where we have less dataset column values than CDE code values. # In this case, we map the dataset column values to the first CDE code values. # This is not ideal, but it is the best we can do. The user can fix this later. elif len(dataset_column_values) < len(cde_code_values): return str( { f"{dataset_column_value}": f"{cde_code_values[index]}" for index, dataset_column_value in enumerate(dataset_column_values) if (dataset_column_value == "nan" and cde_code_values[index] == "nan") or (dataset_column_value != "nan" and cde_code_values[index] != "nan") } ) # Handle the case where we have more dataset column values than CDE code values. # In this case, we map the dataset column values to NaN and MUST BE FIXED by the user. elif len(dataset_column_values) > len(cde_code_values): return str( { f"{dataset_column_value}": "nan" for dataset_column_value in dataset_column_values } )
[docs]def make_distance_vector(matchedCdeCodes, inputDatasetColumn): """Make the n closest match distance vector for a given input dataset column. Parameters ---------- matchedCdeCodes : dict Dictionary of the matching results in the form:: { "inputDatasetColumn1": { "words": ["word1", "word2", ...], "distances": [distance1, distance2, ...], "embeddings": [embedding1, embedding2, ...] }, "inputDatasetColumn2": { "words": ["word1", "word2", ...], "distances": [distance1, distance2, ...], "embeddings": [embedding1, embedding2, ...] }, ... } inputDatasetColumn : lstr Input dataset column name. Returns ------- distanceVector : numpy.ndarray Similarity/distance vector. """ # Get the matched CDE codes for the current input dataset column matches = matchedCdeCodes[inputDatasetColumn] # Initialize the similarity matrix similarityVector = np.zeros((1, len(matches["words"]))) # Update the similarity matrix similarityVector[0, :] = matches["distances"] # Return the similarity matrix return similarityVector
[docs]def match_column_to_cdes( dataset_column, schema ): """Match a dataset column to CDEs using fuzzy matching. Parameters ---------- dataset_column : str Dataset column. schema : pandas.DataFrame Schema to which the dataset is mapped. Returns ------- list List of matched CDE codes ordered by decreasing fuzzy ratio. """ # Function to find the fuzzy matches for each dataset column. matches = sorted( schema["code"], key=lambda cde_code: fuzz.ratio(dataset_column, cde_code), reverse=True, ) return matches