Source code for aac_metrics.functional.cider_d

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from collections import Counter, defaultdict
from typing import Callable, Mapping, TypedDict, Union

import numpy as np
import torch
from torch import Tensor

from aac_metrics.utils.checks import check_metric_inputs

CIDErDScores = TypedDict("CIDErDScores", {"cider_d": Tensor})
CIDErDOuts = tuple[CIDErDScores, CIDErDScores]


[docs] def cider_d( candidates: list[str], mult_references: list[list[str]], return_all_scores: bool = True, *, n: int = 4, sigma: float = 6.0, tokenizer: Callable[[str], list[str]] = str.split, return_tfidf: bool = False, scale: float = 10.0, ) -> Union[CIDErDOuts, Tensor]: """Consensus-based Image Description Evaluation function. - Paper: https://arxiv.org/pdf/1411.5726.pdf .. warning:: This metric requires at least 2 candidates with 2 sets of references, otherwise it will raises a ValueError. :param candidates: The list of sentences to evaluate. :param mult_references: The list of list of sentences used as target. :param return_all_scores: If True, returns a tuple containing the globals and locals scores. Otherwise returns a scalar tensor containing the main global score. defaults to True. :param n: Maximal number of n-grams taken into account. defaults to 4. :param sigma: Standard deviation parameter used for gaussian penalty. defaults to 6.0. :param tokenizer: The fast tokenizer used to split sentences into words. defaults to str.split. :param return_tfidf: If True, returns the list of dictionaries containing the tf-idf scores of n-grams in the sents_score output. defaults to False. :param scale: CIDEr-D score factor. defaults to 10.0. :returns: A tuple of globals and locals scores or a scalar tensor with the main global score. """ cooked_cands, cooked_mrefs = _cider_d_update( candidates, mult_references, n, tokenizer, [], [], ) return _cider_d_compute( cooked_cands, cooked_mrefs, return_all_scores, n, sigma, return_tfidf, scale, )
def _cider_d_update( candidates: list[str], mult_references: list[list[str]], n: int, tokenizer: Callable[[str], list[str]], prev_cooked_cands: list[Counter], prev_cooked_mrefs: list[list[Counter]], ) -> tuple[list, list]: check_metric_inputs(candidates, mult_references) new_cooked_mrefs = [ [__cook_sentence(ref, n, tokenizer) for ref in refs] for refs in mult_references ] new_cooked_cands = [__cook_sentence(cand, n, tokenizer) for cand in candidates] prev_cooked_cands += new_cooked_cands prev_cooked_mrefs += new_cooked_mrefs return prev_cooked_cands, prev_cooked_mrefs def _cider_d_compute( cooked_cands: list[Counter], cooked_mrefs: list[list[Counter]], return_all_scores: bool, n: int, sigma: float, return_tfidf: bool, scale: float, ) -> Union[Tensor, CIDErDOuts]: if len(cooked_cands) < 2: raise ValueError( f"CIDEr-D metric does not support less than 2 candidates with 2 references. (found {len(cooked_cands)} candidates, but expected > 1)" ) # compute idf doc_frequencies = __compute_doc_freq(cooked_mrefs) # sanity check: assert to check document frequency assert len(cooked_cands) >= max(doc_frequencies.values()), "Sanity check failed." # compute log reference length log_n_refs = np.log(float(len(cooked_mrefs))) # compute cider score cider_d_scores, tfidf_lst = __compute_cider( cooked_cands, cooked_mrefs, doc_frequencies, log_n_refs, n, sigma, scale=scale, ) cider_d_score = cider_d_scores.mean() cider_d_scores = torch.from_numpy(cider_d_scores) cider_d_score = torch.as_tensor(cider_d_score, dtype=torch.float64) if return_all_scores: cider_d_outs_corpus = { "cider_d": cider_d_score, } cider_d_outs_sents = { "cider_d": cider_d_scores, } if return_tfidf: cider_d_outs_sents["tfidf_lst"] = tfidf_lst # type: ignore cider_d_outs = cider_d_outs_corpus, cider_d_outs_sents return cider_d_outs # type: ignore else: return cider_d_score def __cook_sentence( sentence: str, n: int, tokenizer: Callable[[str], list[str]], ) -> Counter[tuple[str, ...]]: """ Takes a string as input and returns an object that can be given to either cook_refs or cook_test. This is optional: cook_refs and cook_test can take string arguments as well. :param s: string : sentence to be converted into ngrams :param n: int : number of ngrams for which representation is calculated :return: term frequency vector for occuring ngrams """ words = tokenizer(sentence) counter = Counter() for k in range(1, n + 1): for i in range(len(words) - k + 1): ngram = tuple(words[i : i + k]) counter[ngram] += 1 return counter def __compute_doc_freq(cooked_mrefs: list[list[Counter]]) -> Counter[tuple[str, ...]]: """ Compute term frequency for reference data. This will be used to compute idf (inverse document frequency later) The term frequency is stored in the object :return: None """ doc_frequencies = Counter() for refs in cooked_mrefs: all_refs_ngrams = set(ngram for ref in refs for ngram in ref.keys()) for ngram in all_refs_ngrams: doc_frequencies[ngram] += 1 return doc_frequencies def __counter_to_vec( counters: dict[tuple, int], log_n_refs: float, n: int, doc_frequencies: Union[Mapping[tuple, int], Callable[[tuple], int]], ) -> tuple[list[defaultdict], np.ndarray, int]: """ Function maps counts of ngram to vector of tfidf weights. The function returns vec, an array of dictionary that store mapping of n-gram and tf-idf weights. The n-th entry of array denotes length of n-grams. :param cnts: :return: tf-idf of n-grams (array of n dict[tuple, float]), norm (array of n floats) (norms for n-grams), length (int) (number of distinct n-grams from 1 to n) """ vec = [defaultdict(float) for _ in range(n)] length = 0 norm = np.zeros((n,)) for ngram, term_freq in counters.items(): if isinstance(doc_frequencies, Mapping): count = doc_frequencies[ngram] else: count = doc_frequencies(ngram) # give ngram count 1 if it doesn't appear in reference corpus log_df = np.log(max(1.0, count)) # ngram index cur_n = len(ngram) - 1 # tf (term_freq) * idf (precomputed idf) for n-grams vec[cur_n][ngram] = float(term_freq) * (log_n_refs - log_df) # compute norm for the vector. the norm will be used for computing similarity norm[cur_n] += pow(vec[cur_n][ngram], 2) if cur_n == 1: length += term_freq norm = np.sqrt(norm) return vec, norm, length def __similarity( cand_vec: list[defaultdict], ref_vec: list[defaultdict], cand_norm: np.ndarray, ref_norm: np.ndarray, cand_len: int, ref_len: int, n: int, sigma: float, ) -> np.ndarray: """ Compute the cosine similarity of two vectors. :param cand_vec: (n, nb_ngrams_of_len_n), contains the TFIDF scores for one candidate :param ref_vec: (n, nb_ngrams_of_len_n), contains the TFIDF scores for one reference :param cand_norm: (n,), norms of the candidate n-grams vectors :param ref_norm: (n,), norms of the reference n-grams vectors :param cand_len: Size of the candidate :returns: N-grams similarities as array of shape (n,) """ delta = float(cand_len - ref_len) # measure consine similarity similarities = np.zeros((n,)) for ni in range(n): # ngram for ngram, count in cand_vec[ni].items(): # vrama91 : added clipping similarities[ni] += min(count, ref_vec[ni][ngram]) * ref_vec[ni][ngram] if (cand_norm[ni] != 0) and (ref_norm[ni] != 0): similarities[ni] /= cand_norm[ni] * ref_norm[ni] # vrama91: added a length based gaussian penalty similarities[ni] *= np.e ** (-(delta**2) / (2 * sigma**2)) return similarities def __compute_cider( cooked_cands: list[Counter], cooked_mrefs: list[list[Counter]], doc_frequencies: Union[Counter[tuple], Callable[[tuple], int]], log_n_refs: float, n: int, sigma: float, scale: float, ) -> tuple[np.ndarray, list[tuple[list, list]]]: scores = np.empty((len(cooked_cands),)) tfidf_lst = [] for i, (cand, refs) in enumerate(zip(cooked_cands, cooked_mrefs)): # compute vector for test captions vec, norm, length = __counter_to_vec(cand, log_n_refs, n, doc_frequencies) # compute vector for ref captions ngrams_scores = np.zeros((len(refs), n)) vec_refs = [] for j, ref in enumerate(refs): vec_ref, norm_ref, length_ref = __counter_to_vec( ref, log_n_refs, n, doc_frequencies ) vec_refs.append(vec_ref) ngrams_scores[j] = __similarity( vec, vec_ref, norm, norm_ref, length, length_ref, n, sigma ) # Use this weird mean calculation instead of ".mean()" because it can give slight differences compared to the original implementation score_avg = ngrams_scores.sum(axis=0).mean() / len(refs) scores[i] = score_avg tfidf_lst.append((vec, vec_refs)) scores = scores * scale return scores, tfidf_lst